1 表定义
jdbc表的定义跟普通的表定义,区别就在于with中的参数:
CREATE TABLE MyUserTable ( ... ) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'jdbc_table_name',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'name',
'connector.password' = 'password',
-- 以上都是必填参数
'connector.read.partition.column' = 'column_name',
'connector.read.partition.num' = '50',
'connector.read.partition.lower-bound' = '500',
'connector.read.partition.upper-bound' = '1000',
'connector.read.fetch-size' = '100'
)
password前面的参数都是必填项,后面的connector.read相关的参数都是与读取有关的。如果没有配置这些参数,那么flink不会采用任何分区,无论你的并行度设置的多大,其实只有一个slot在工作,并且执行的是全量数据的查询,这个细节稍后再说。
2 表使用
如果想实现数据的抽取同步,一种比较简单的方法就是:
insert overwrite ods.xxx partition (ods_date='2020-01-03')
select * from jdbc_table_name
where time>'2020-05-08' and time < '2020-05-09'
这样就实现了数据交换的过程,想法很简单,现实很残酷。如果仔细观察flink ui中各个slot的执行情况就不难发现,无论并行度设置多大,总是其中一个slot执行非常缓慢....
3 原理分析
Flink通过SPI寻找到JDBC对应的TableSource,内部会执行下面的方法创建inputformat,其中164行的query就是真正提交给jdbc执行的代码。
仔细看代码165行,conditionFields传入的值总是为空,因此这坨sql根本不会出现where条件(只有当分区字段存在时,flink才会基于分区字段生成where条件;而我们自己sql中的where条件直接被忽略了)。回头再来说说第二部分我们的查询sql:
select * from jdbc_table_name
where time>'2020-05-08' and time < '2020-05-09'
会终形成两个步骤:
1 基于jdbc执行select * from jdbc_table_name
2 在flink中基于time过滤数据
假设现在的并行度为3,也就是3个slot在工作,但是其实只有一个slot执行了查询任务。这样就相当于某个slot需要全表扫描数据,然后在内存中过滤数据,返回我们想要的那部分。如果源表数据量非常大,那这个sql几乎是跑不通的。
一种优化方案是使用flink提供的partition.column字段指定分区字段,然后指定上下界,flink内部会基于上下界和分区个数,形成一个个的分区范围并行查询。
比如还是上面的sql,假如我配置为:
'connector.read.partition.column' = 'id',
'connector.read.partition.num' = '3',
'connector.read.partition.lower-bound' = '1000',
'connector.read.partition.upper-bound' = '10000',
那么终形成的查询sql为:
select * from jdbc_table_name where id between 1000 and 3999;
select * from jdbc_table_name where id between 4000 and 6999;
select * from jdbc_table_name where id between 7000 and 9999;
假设flink source的并行度为3,那么”很有可能“每个slot都分配了一个查询任务,三个slot并行查询,这样效率就提升很多了。
注意,是很有可能!flink中source的分区任务分配并不是均衡的,而是抢占式的,所以我们需要尽量把sql的分区数设置的大一些,才能保证每个slot执行的任务是尽量均衡的。
如果想要利用上面的机制加速数据抽取的速度,可以采用投机取巧的方法:查询目标时间范围内的大id和小id,设置为分区字段的上下界。不过这样也不一定能解决问题,如果上下界范围过大,效率一样提不上来。
4 优化实战
由于flink官方仅仅提供了一种基于long类型字段的数字分区方法,因此不能实现基于时间范围的查询,这个时间范围也无法直接下推到jdbc层。所以可以自定义一种split方法,上下界改成时间范围的上下界,中间基于分区数自动计算时间范围。比如时间范围为2020-05-08 00:00:00~2020-05-09 00:00:00,假设分区数为24,那么一个小时就是一个分区范围,这样优化后速度就快很多了。
如果再考虑的全面点,每天的数据变化是不一样的,往往呈正态分布的趋势。比如白天时间段数据两会很大,夜间数据量往往比较小。如果希望分区的数据量更平均一点,可以使用加权分区方法,把各个时间段的数据量考虑进来,这样就能尽量保证每个分区的数据均衡了。