1. 前序
Greenplum 是一款的 mpp 数据库产品,官方推荐了几种将外部数据写入 Greenplum 方式,包含:通用的 Jdbc,gpload 以及 Pivotal Greenplum-Spark Connector 等。
- Jdbc:Jdbc 方式,写大数据量会很慢。
- gpload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括 gpfdist 等依赖,安装起来很麻烦。需要了解可以参考 gpload。
- Greenplum-Spark Connector:基于 Spark 并行处理,并行写入 Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。
2. Greenplum-Spark Connector 读数据架构
一个 Spark application,是由 Driver 和 Executor 节点构成。当 Spark Application 使用 Greenplum-Spark Connector 加载 Greenplum 数据时,其 Driver 端会通过 JDBC 的方式请求 Greenplum 的 master 节点获取相关的元数据信息。Connector 将会根据这些元数据信息去决定 Spark 的 Executor 去怎样去并行的读取该表的数据。
Greenplum 数据库存储数据是按 segment 组织的,Greenplum-Spark Connector 在加载 Greenplum 数据时,需要指定 Greenplum 表的一个字段作为 Spark 的 partition 字段,Connector 会使用这个字段的值来计算,该 Greenplum 表的某个 segment 该被哪一个或多个 Spark partition 读取。
其读取过程如下:
- Spark Driver 通过 Jdbc 的方式连接 Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定 segment 怎么分配。
- Spark Executor 端会通过 Jdbc 的方式连接 Greenplum master,创建 Greenplum 外部表。
- 然后 Spark Executor 通过 Http 方式连接 Greenplum 的数据节点,获取指定的 segment 的数据。该获取数据的操作在 Spark Executor 并行执行。
3. Greenplum-Spark Connector 写数据流程
1.GSC 在 Spark Executor 端通过 Jetty 启动一个 Http 服务,将该服务封装为支持 Greenplum 的 gpfdist 协议。
2.GSC 在 Spark Executor 端通过 Jdbc 方式连接 Greenplum master,创建 Greenplum 外部表,该外部表文件地址指向该 Executor 所启动的 gpfdist 协议地址。SQL示例如下:
CREATE READABLE EXTERNAL TABLE"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')FORMAT 'CSV'(DELIMITER AS '|' NULL AS '')ENCODING 'UTF-8'
复制代码
3.GSC 在 Spark Executor 端通过 Jdbc 方式连接 Greenplum master,然后执行 insert 语句至真实的表中,数据来源于这张外部表。SQL 示例如下:
INSERT INTO "public"."rank_a1"SELECT *FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"
复制代码
至于这张外部表的数据,是否落地当前 Executor 服务器,不清楚。猜测不会落地,而是直接通过 Http 直接传递给了 Greenplum 对应的 Segment。
4.GSC监听 onApplicationEnd 事件,在 Spark application 结束后,删除创建的外部表。
4. Greenplum-Spark Connector 使用
1.下载 GSC Jar 包。 下载地址:network.pivotal.io/products/pi… 可直接下载新版本的 GSC,即1.6.2,支持 Greenplum5.0 之后的版本。如:
greenplum-spark_2.11-1.6.2.jar
复制代码
2.Maven 中引入:
<dependency> <groupId>io.pivotal.greenplum.spark</groupId> <artifactId>greenplum-spark_2.11</artifactId> <version>1.6.2</version> </dependency>
复制代码
3. Spark 提交引入:
- spark-shell 或 spark-submit 时候,通过**-jars**加入 greenplum-spark_2.11-1.6.2.jar。
- 将 greenplum-spark_2.11-1.6.2.jar 与 Spark application 包打成 uber jar 提交。
5. Greenplum-Spark Connector 参数
6. 从 Greenplum 读取数据
1.DataFrameReader.load() 方式:
val gscReadOptionMap = Map( "url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table1", "partitionColumn" -> "id")val gpdf = spark.read.format("greenplum") .options(gscReadOptionMap) .load()
复制代码
2.spark.read.greenplum() 方式:
val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"val tblname = "avgdelay"val jprops = new Properties()jprops.put("user", "user2")jprops.put("password", "changeme")jprops.put("partitionColumn", "airlineid")val gpdf = spark.read.greenplum(url, tblname, jprops)
复制代码
然而,这种方式必然需要引入一个隐式转换,官网也没介绍。
7. 写数据至 Greenplum
1.写数据示例:
val gscWriteOptionMap = Map( "url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table2",)dfToWrite.write.format("greenplum") .options(gscWriteOptionMap) .save()
复制代码
在通过 GSC 写到 Greenplum 表时,如果表已经存在或表中已经存在数据,可通过 DataFrameWriter.mode(SaveMode savemode) 方式指定其输出模式。相关模式行为如下:
2.GSC 自动建表
2.1 创建的 Greenplum 表将不会有 distribution 列,如下为 GSC 生成的建表语句:
CREATE TABLE "public"."rank_a1" ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
复制代码
2.2 创建的 Greenplum 表的字段名将会使用 Spark DataFrame 中的字段名。
2.3 在 GSC 自动建表时,将会为字段名加上双引号,这将使 Greenplum 区分大小写。
2.4 当 Spark DataFrame 的字段不为 nullable 时,GSC 自动建表的字段将是 NOT NULL。
2.5 将会对应的 Spark DataFrame 字段类型映射为 Greenplum 的字段类型。参考,字段类型映射表。
3.提前手动建表
3.1 将 Spark DataFrame 的字段名的数据写至 Greenplum 表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
3.2 写至 Greenplum 的字段的数据类型,与对应的 Spark DataFrame 一致,具体参见字段类型映射。
3.3 如果 Spark 数据中某列包含空数据,需确保对应的 Greenplum 表的列没有被指定为 NOT NULL。
3.4 Greenplum 表中建表时其字段顺序可以与 Spark DataFrame 中不一致。但 Greenplum 表中不能出现不存在在 Spark DataFrame 中的字段。如下例子:
// Greenplum 中的字段CREATE TABLE public.rank_a1 ( id int4 NOT NULL, "rank" text NULL, "year" int4 NOT NULL, gender int4 NOT NULL, count int4 NOT NULL)DISTRIBUTED BY (id);// Spark DataFrame中的字段var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")// 在写数据至public.rank_a1表时,将会报错如下Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.Old column names (5): _1, _2, _3, _4, _5New column names (4): id, rank, year, gender at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44) at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14) at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)
复制代码
3.5 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。
8. Troubleshooting
1. 端口相关问题
2. Greenplum 连接数问题
当连接 Greenplum 的连接数接近 Greenplum 数据库配置的大连接数(max_connections)时。Spark application 将会抛出 connection limit exceeded 错误。
排查过程:
2.1 查询 Greenplum 数据的大连接数:
postgres=# show max_connections; max_connections----------------- 250(1 row)
复制代码
2.2 查询当前连接Greenplum数据库的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity;
复制代码
2.3 查询指定的用户连接 Greenplum 数据的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
复制代码
2.4 查询 Greenplum 数据库空闲和活动的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
复制代码
2.5 查询连接 Greenplum 数据库名,用户名,客户端地址,客户端ip,当前查询语句:
postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;
复制代码
如果确认是 Spark application 使用连接数过多,则配置 JDBC Connection Pooling 相关参数,减少连接数。
3. Greenplum Database Data Length Errors
在使用 Greenplum 4.x 或 5.x 的时候,可能会报出“data line too long”错误。这是因为在 Greenplum 数据库中参数项“gp_max_csv_line_length”默认值是1M。需要登陆 Greenplum master 修改这个参数值。示例如下,通过 gpconfig 修改该参数的值为5M:
gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880gpadmin@gpmaster$ gpstop -u
复制代码
9. 类型映射表
1. Greenplum to Spark
2. Spark to Greenplum
10. 参考
- Greenplum-Spark Connector 官方文档:greenplum-spark.docs.pivotal.io/1-6/overvie…
- Greenplum 建表语句文档:gpdb.docs.pivotal.io/510/ref\_gu…
- Greenplum 参数配置官方文档:gpdb.docs.pivotal.io/5250/ref\_g…
作者:Greenplum中文社区
链接:https://juejin.cn/post/6857320287286624270