详解flink中Look up维表的使用

发布网友 发布时间:2024-10-24 14:52

我来回答

1个回答

热心网友 时间:2024-10-29 06:16

背景

在流式计算领域,维表是一种常用概念,主要用于SQL的JOIN操作,以实现对流数据的补充。比如,我们的数据源stream是订单日志,日志中仅记录了订单商品的ID,缺乏其他信息。但在数据分析时,我们需要商品名称、价格等详细信息,这时可以通过查询维表对数据进行补充。

维表通常存储在外部存储中,如MySQL、HBase、Redis等。本文以MySQL为例,介绍Flink中维表的使用。

LookupableTableSource

Flink提供LookupableTableSource接口,用于实现维表功能。通过特定的key列查询外部存储,获取相关信息,以补充stream数据。

LookupableTableSource有三个方法

在Flink中,实现LookupableTableSource接口的主要有四个类:JdbcTableSource、HBaseTableSource、CsvTableSource和HiveTableSource。本文以JDBC为例,讲解如何进行维表查询。

实例讲解

以下是一个示例,首先定义stream source,使用Flink 1.11提供的datagen生成数据。

我们模拟生成用户数据,范围在1-100之间。

datagen具体的使用方法请参考:

聊聊Flink 1.11中的随机数据生成器-DataGen connector

然后创建一个MySQL维表信息:

该MySQL表中样例数据如下:

最后执行SQL查询,流表关联维表:

结果示例如下:

对于维表中存在的数据,已关联出来,对于维表中不存在的数据,显示为null。

完整代码请参考:github.com/zhangjun0x01...

源码解析JdbcTableSource

以JDBC为例,看看Flink底层是如何实现的。

JdbcTableSource#isAsyncEnabled方法返回false,即不支持异步查询,因此进入JdbcTableSource#getLookupFunction方法。

最终构造一个JdbcLookupFunction对象。

JdbcLookupFunction

接下来看看JdbcLookupFunction类,它是TableFunction的子类,具体使用可参考以下文章:

Flink实战教程-自定义函数之TableFunction

TableFunction的核心是eval方法,在该方法中,主要工作是使用多个keys拼接成SQL查询数据,首先查询缓存,缓存有数据则直接返回,缓存无数据则查询数据库,并将查询结果返回并放入缓存。下次查询时,直接查询缓存。

为什么要加缓存?默认情况下不开启缓存,每次查询都会向维表发送请求,如果数据量较大,会给存储维表的系统造成压力。因此,Flink提供了LRU缓存,查询维表时,先查询缓存,缓存无数据则查询外部系统。如果某个数据查询频率较高,一直被命中,则无法获取新数据。因此,缓存需要设置超时时间,超过这个时间则强制删除该数据,查询外部系统获取新数据。

如何开启缓存?请参考JdbcLookupFunction#open方法:

即cacheMaxSize和cacheExpireMs需要同时设置,构造缓存对象cache来缓存数据。这两个参数对应的DDL属性为lookup.cache.max-rows和lookup.cache.ttl。

对于具体的缓存大小和超时时间的设置,用户需要根据自身情况自行定义,在数据准确性和系统吞吐量之间进行权衡。

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com