绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Spark基础:读写Hive
2020-07-01 17:10:17

1 Hive相关的操作

Spark SQL支持读写Hive,不过Hive本身包含了大量的依赖,这些依赖spark默认是没有的。如果Hive的依赖在Classpath中,那么Spark可以自动加载(注意Spark的worker节点也需要提供这些依赖)。默认配置Hive只需要把相关的hive-site.xml core-site.xml hdfs-site.xml 放到conf目录下即可。

当使用hive时,需要在 SparkSession 中开启hive,从而获得hive相关的serdes以及函数。如果没有现成的Hive环境们也可以使用,spark会自动在当前目录创建metastore_db,目录的位置可以通过参数 spark.sql.warehouse.dir 指定, 默认是启动Spark应用程序的目录。 注意在spark2.0之前使用的参数hive.metastore.warehouse.dir属性,已经废弃。 另外不要忘记赋予spark程序读写对应目录的权限。

// 创建spark session 并 指定hive地址
val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

// 执行hive操作
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 查询hive表
sql("SELECT * FROM src").show()

// 执行聚合操作
sql("SELECT COUNT(*) FROM src").show()

// sql转换DataFrame
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()

// 在sparksession中创建虚拟表
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

// 使用hive命令创建表
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
sql("SELECT * FROM hive_records").show()

// 创建hive外部表
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
sql("SELECT * FROM hive_bigints").show()

// 动态配置hive属性
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write
  .partitionBy("key")
  .format("hive")
  .saveAsTable("hive_part_tbl")

2 Hive存储相关的配置

当创建hive表时,需要定义表如何读写文件系统,比如 input format 和 output format,还需要配置如何序列化反序列化每一行数据,比如 serde。下面的命令可以用于指定存储的serde, input format, output format, 比如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet') 。默认会把文件当做普通文本读写。注意hive storage handler不能再DDL中配置,目前可以在hive端定义,然后使用spark sql来读取。

相关的配置属性如下:

2.1 fileFormat

存储的格式,包含serde, input format, output format。目前支持6中文件格式,sequencefile, rcfile, orc, parquet, textfile, avro。

2.2 inputFormat, outputFormat

这两个选项用于配置hive表的InputFormat和OutputFormat类,如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果使用了fileFormat就不能再配置这两个属性了。

2.3 serde

配置序列化反序列化类,如果配置fileFormat,也不能配置该属性。目前 sequencefile textfile rcfile 不包含serde,因此可以使用这个选项进行配置。

2.4 fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim

这些选项用于textfile文件格式时,配置相关的分隔符。

3 与不同版本的Hive metastore交互

Spark SQL与Hive metastore交互是很常见的使用场景,这样spark就可以直接操作hive中的元数据了。从spark 1.4开始,spark sql可以与不同的hive版本交互。默认spark使用的是hive 1.2.1进行编译,包含对应的serde, udf, udaf等。

3.1 spark.sql.hive.metastore.version

hive版本,默认是1.2.1。支持从0.12.0到2.3.3。

3.2 spark.sql.hive.metastore.jars

HiveMetastoreClient相关的jar包地址,默认是buildin。可以配置成三种属性: - builtin, 使用hive 1.2.1, 在spark编译是,使用-Phive开启。当关闭时,需要指定spark.sql.hive.metastore.version为1.2.1 - maven, 从maven仓库下载编译,这个选项不推荐在生产环境使用。 - jvm中的classpath,这个路径需要包含hive以及对应的依赖,以及hadoop对应版本的依赖。这些资源只需要在driver端提供,如果使用yarn cluster模式,需要保证相关的资源都打包到应用jar中。

3.3 spark.sql.hive.metastore.sharedPrefixes

在Spark SQL和hive中共享的jar包前缀,默认com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc。典型的例子就是与metastore沟通的JDBC驱动相关的jar。其他需要共享的类,如 log4j中的自定义appender。

3.4 spark.sql.hive.metastore.barrierPrefixes

配置Spark SQL使用时需要重新加载的类,如hive中使用共享包名定义的udf。

更多内容关注公众号:

weixin.qq.com/r/-yjG3iT (二维码自动识别)

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Hive专区
创建时间:2020-07-01 14:09:32
Hive是一个基于Hadoop的数据仓库平台。通过hive,我们可以方便地进行ETL的工作。hive定义了一个类似于SQL的查询语言:HQL,能 够将用户编写的QL转化为相应的Mapreduce程序基于Hadoop执行。 Hive是Facebook 2008年8月刚开源的一个数据仓库框架,其系统目标与 Pig 有相似之处,但它有一些Pig目前还不支持的机制,比如:更丰富的类型系统、更类似SQL的查询语言、Table/Partition元数据的持久化等。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • markriver
    专家
戳我,来吐槽~