为了优化Spark应用提升运行速度,一般会把数据缓存到内存 或者 开启一些试验特性进行优化。本篇就来说说Spark SQL中的一些优化参数。
1 缓存
Spark SQL支持把数据缓存到内存,可以使用 spark.catalog.cacheTable("t") 或 df.cache()。这样Spark SQL会把需要的列进行压缩后缓存,避免使用和GC的压力。可以使用 spark.catalog.uncacheTable("t") 移除缓存。Spark也支持在SQL中控制缓存,如 cache table t 缓存表t,uncache table t 解除缓存。可以通过在 setConf 中配置下面的选项,优化缓存:
spark.sql.inMemoryColumnarStorage.compressed
spark.sql.inMemoryColumnarStorage.batchSize
这里要先介绍几种表结构存储的模式:
行式存储,即每一行数据为一个单元,多行组成一个文件进行存储。这种模式适合基于某个主键查询全部数据。典型的就是数据库,数据库为了加快主键或者某些特殊列的查询,一般会通过索引加速查询。
列式存储,即相同的列在一起存储。这种模式好处也不少,点就是基于每种列根据其类型选择合适的压缩模式可以极大的提高压缩效率;第二点是查询的时候可以选择某些特定的列查询,避免全部的列都查询出来(Spark本身是基于列式存储做的)。
混合存储,即部分行的数据划分成层,基于这层里面按照列再存储。
2 其他配置
下面的选项可以用于优化查询执行,这些配置未来也有可能被移除。
spark.sql.files.maxPartitionBytes
spark.sql.files.openCostInBytes
spark.sql.broadcastTimeout
spark.sql.autoBroadcastJoinThreshold
spark.sql.shuffle.partitions
3 广播join BHJ
广播 hash join,简称BHJ。广播特性是指在join的时候把其中的一个表的数据广播到另一个表所在的节点。当系统根据 spark.sql.autoBroadcastJoinThreshold 参数判断满足条件时,就会自动使用BHJ。如果两边都可以进行广播,spark会自动选择statistics统计值低的进行广播。注意spark不会确保每次选择广播表都是正确的,因为有的场景比如 full outer join 是不支持BHJ的。手动指定广播:
import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src"))
.join(spark.table("records"), "key")
.show()
参考资料:
- [Spark SQL表缓存](https://www.cnblogs.com/sh425/p/7596428.html)
- [Hive统计](https://blog.csdn.net/mhtian2015/article/details/78776122)