绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Flink深入浅出:JDBC Source从理论到实战
2020-07-08 00:18:57

Flink 1.10之后针对Table API&SQL提供了很多外部连接器,可以使用DDL快速创建外部表,从而可以在Flink中基于SQL直接读取外部数据源。在大数据环境中,有一个很经典的场景是数据交换——即以一定的时间周期把业务库中的数据同步到hive或者hdfs中,下面就先介绍官方的使用方法,再通过源码分析其中的技术细节。

1 表定义

jdbc表的定义跟普通的表定义,区别就在于with中的参数:

CREATE TABLE MyUserTable (  ... ) WITH ( 
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'jdbc_table_name',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'name',
'connector.password' = 'password',
-- 以上都是必填参数
'connector.read.partition.column' = 'column_name',
'connector.read.partition.num' = '50',
'connector.read.partition.lower-bound' = '500',
'connector.read.partition.upper-bound' = '1000',
'connector.read.fetch-size' = '100'
)

password前面的参数都是必填项,后面的connector.read相关的参数都是与读取有关的。如果没有配置这些参数,那么flink不会采用任何分区,无论你的并行度设置的多大,其实只有一个slot在工作,并且执行的是全量数据的查询,这个细节稍后再说


2 表使用

如果想实现数据的抽取同步,一种比较简单的方法就是:

insert overwrite ods.xxx partition (ods_date='2020-01-03') 
select * from jdbc_table_name  
where time>'2020-05-08' and time < '2020-05-09'

这样就实现了数据交换的过程,想法很简单,现实很残酷。如果仔细观察flink ui中各个slot的执行情况就不难发现,无论并行度设置多大,总是其中一个slot执行非常缓慢....

3 原理分析

Flink通过SPI寻找到JDBC对应的TableSource,内部会执行下面的方法创建inputformat,其中164行的query就是真正提交给jdbc执行的代码。

仔细看代码165行,conditionFields传入的值总是为空,因此这坨sql根本不会出现where条件(只有当分区字段存在时,flink才会基于分区字段生成where条件;而我们自己sql中的where条件直接被忽略了)。回头再来说说第二部分我们的查询sql:

select * from jdbc_table_name 
where time>'2020-05-08' and time < '2020-05-09'

会终形成两个步骤:

1 基于jdbc执行select * from jdbc_table_name

2 在flink中基于time过滤数据

假设现在的并行度为3,也就是3个slot在工作,但是其实只有一个slot执行了查询任务。这样就相当于某个slot需要全表扫描数据,然后在内存中过滤数据,返回我们想要的那部分。如果源表数据量非常大,那这个sql几乎是跑不通的。

一种优化方案是使用flink提供的partition.column字段指定分区字段,然后指定上下界,flink内部会基于上下界和分区个数,形成一个个的分区范围并行查询。

比如还是上面的sql,假如我配置为:

'connector.read.partition.column' = 'id', 
'connector.read.partition.num' = '3', 
'connector.read.partition.lower-bound' = '1000', 
'connector.read.partition.upper-bound' = '10000',

那么终形成的查询sql为:

select * from jdbc_table_name where id between 1000 and 3999; 
select * from jdbc_table_name where id between 4000 and 6999; 
select * from jdbc_table_name where id between 7000 and 9999;

假设flink source的并行度为3,那么”很有可能“每个slot都分配了一个查询任务,三个slot并行查询,这样效率就提升很多了。

注意,是很有可能!flink中source的分区任务分配并不是均衡的,而是抢占式的,所以我们需要尽量把sql的分区数设置的大一些,才能保证每个slot执行的任务是尽量均衡的。


如果想要利用上面的机制加速数据抽取的速度,可以采用投机取巧的方法:查询目标时间范围内的大id和小id,设置为分区字段的上下界。不过这样也不一定能解决问题,如果上下界范围过大,效率一样提不上来。


4 优化实战

由于flink官方仅仅提供了一种基于long类型字段的数字分区方法,因此不能实现基于时间范围的查询,这个时间范围也无法直接下推到jdbc层。所以可以自定义一种split方法,上下界改成时间范围的上下界,中间基于分区数自动计算时间范围。比如时间范围为2020-05-08 00:00:00~2020-05-09 00:00:00,假设分区数为24,那么一个小时就是一个分区范围,这样优化后速度就快很多了。

如果再考虑的全面点,每天的数据变化是不一样的,往往呈正态分布的趋势。比如白天时间段数据两会很大,夜间数据量往往比较小。如果希望分区的数据量更平均一点,可以使用加权分区方法,把各个时间段的数据量考虑进来,这样就能尽量保证每个分区的数据均衡了。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Flink专区
创建时间:2020-06-19 13:29:19
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • ?
    专家
戳我,来吐槽~