1 Hive相关的操作
Spark SQL支持读写Hive,不过Hive本身包含了大量的依赖,这些依赖spark默认是没有的。如果Hive的依赖在Classpath中,那么Spark可以自动加载(注意Spark的worker节点也需要提供这些依赖)。默认配置Hive只需要把相关的hive-site.xml core-site.xml hdfs-site.xml 放到conf目录下即可。
当使用hive时,需要在 SparkSession 中开启hive,从而获得hive相关的serdes以及函数。如果没有现成的Hive环境们也可以使用,spark会自动在当前目录创建metastore_db,目录的位置可以通过参数 spark.sql.warehouse.dir 指定, 默认是启动Spark应用程序的目录。 注意在spark2.0之前使用的参数hive.metastore.warehouse.dir属性,已经废弃。 另外不要忘记赋予spark程序读写对应目录的权限。
// 创建spark session 并 指定hive地址
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
// 执行hive操作
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 查询hive表
sql("SELECT * FROM src").show()
// 执行聚合操作
sql("SELECT COUNT(*) FROM src").show()
// sql转换DataFrame
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// 在sparksession中创建虚拟表
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// 使用hive命令创建表
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
sql("SELECT * FROM hive_records").show()
// 创建hive外部表
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
sql("SELECT * FROM hive_bigints").show()
// 动态配置hive属性
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write
.partitionBy("key")
.format("hive")
.saveAsTable("hive_part_tbl")
2 Hive存储相关的配置
当创建hive表时,需要定义表如何读写文件系统,比如 input format 和 output format,还需要配置如何序列化反序列化每一行数据,比如 serde。下面的命令可以用于指定存储的serde, input format, output format, 比如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet') 。默认会把文件当做普通文本读写。注意hive storage handler不能再DDL中配置,目前可以在hive端定义,然后使用spark sql来读取。
相关的配置属性如下:
2.1 fileFormat
存储的格式,包含serde, input format, output format。目前支持6中文件格式,sequencefile, rcfile, orc, parquet, textfile, avro。
2.2 inputFormat, outputFormat
这两个选项用于配置hive表的InputFormat和OutputFormat类,如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果使用了fileFormat就不能再配置这两个属性了。
2.3 serde
配置序列化反序列化类,如果配置fileFormat,也不能配置该属性。目前 sequencefile textfile rcfile 不包含serde,因此可以使用这个选项进行配置。
2.4 fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
这些选项用于textfile文件格式时,配置相关的分隔符。
3 与不同版本的Hive metastore交互
Spark SQL与Hive metastore交互是很常见的使用场景,这样spark就可以直接操作hive中的元数据了。从spark 1.4开始,spark sql可以与不同的hive版本交互。默认spark使用的是hive 1.2.1进行编译,包含对应的serde, udf, udaf等。
3.1 spark.sql.hive.metastore.version
hive版本,默认是1.2.1。支持从0.12.0到2.3.3。
3.2 spark.sql.hive.metastore.jars
HiveMetastoreClient相关的jar包地址,默认是buildin。可以配置成三种属性: - builtin, 使用hive 1.2.1, 在spark编译是,使用-Phive开启。当关闭时,需要指定spark.sql.hive.metastore.version为1.2.1 - maven, 从maven仓库下载编译,这个选项不推荐在生产环境使用。 - jvm中的classpath,这个路径需要包含hive以及对应的依赖,以及hadoop对应版本的依赖。这些资源只需要在driver端提供,如果使用yarn cluster模式,需要保证相关的资源都打包到应用jar中。
3.3 spark.sql.hive.metastore.sharedPrefixes
在Spark SQL和hive中共享的jar包前缀,默认com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc。典型的例子就是与metastore沟通的JDBC驱动相关的jar。其他需要共享的类,如 log4j中的自定义appender。
3.4 spark.sql.hive.metastore.barrierPrefixes
配置Spark SQL使用时需要重新加载的类,如hive中使用共享包名定义的udf。
更多内容关注公众号: