Spark SQL概述
Spark SQL是用于结构化数据处理的Spark模块。与Spark RDD API不同,Spark SQL提供的接口可以为Spark提供更多的数据及其计算时的结构信息。Spark使用这些额外的信息实现一些优化。用户可以使用SQL或者Dataset API对SparkSQL进行操作。由于它和用户使用的API或者开发语言无关,这意味着开发人员可以在多个API中轻松地切换
SQL
SparkSQL可以用来执行SQL查询,从一个Hive实例中查询数据。可以从Hive Tables - Spark 3.1.1 Documentation (apache.org)中了解更多的配置。
当使用另外一种编程语言中运行SQL时,结果将作为Dataset/DataFrame
返回。也能使用命令行或者JDBC对SQL进行操作
Datasets和DataFrames
dataset是分布式的数据集。Dataset在Spark1.6之后加入,结合了RDD的优点(强类型,lambda函数的能力),和Spark SQL的优化执行引擎的优点。数据集能从JVM对象中创建,并使用transformation
对其进行操作。python暂不支持Dataset API。但是由于python的动态特性,Dataset API的很多优点实际在python中可以体现。
DataFrame是按列为名称构造的Dataset。它在概念上等价于关系型数据库或者R/Python中的视图,不过在引擎层面有着更加丰富的优化。DataFrame可以从各种各样的数据源中构建,例如:结构数据文件,Hive表,第三方数据库,或者是一个存在的RDD。DataDrameAPI可以供Scala、Java、Python、R使用。在Java或者Scala中,DataFrame是由Rows的Dataset来表示
起步
SparkSession
Spark中所有功能的入口都是SparkSession
类。我们使用SparkSession.builder()
创建一个基础的SparkSession
scala> import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.
| builder().
| appName("Spark SQL basic example").
| config("spark.some.config.option", "some-value").
| getOrCreate
SparkSession是Spark2.0开始为Hive特性提供的内部支持,包括使用HiveQL进行查询,访问Hive UDFs以及读取Hive table的数据。
创建DataFrame
当Spark Session创建完成之后,我们就能使用RDD、Hive table或者是其他Spark数据源来创建一个DataFrame。
scala> val df = spark.read.json("examples/src/main/resources/people.json")
scala> df.show
// people.json
{"name":"Michael"}
{"name":"Andy", "age":30},
{"name":"Justin", "age":19}
Untyped Dataset操作(又名DataFrame操作)
如上文所述,DataFrame是Java和Scala API中Rows类型的Dataset。和Java/Scala中的强类型Dataset不同,这些操作以一种无类型transformation的形式引用。
这里举出了一些使用Dataset的结构化数据的例子:
import spark.implicits._
// 以树形的格式打印视图(元数据)
df.printScheme()
// 选择名称为"name"的一列
df.select("name").show()
// 选择"name",和"age",并将age的数值加一
df.select($"name", $"age" + 1).show()
// 选择age > 21的成员
df.filter($"age" > 21).show()
// 以年龄为分组
df.groupBy("age").count().show()
“完整的操作见API Documentation
除了简单的列引用和表达式,Dataset还提供了丰富的类库,包括:字符串操作,日期算术,常见的数学运算等等。参考DataFrame Function Reference
”
SQL编程
Spark的sql
函数能够让SQL以编程形式运行并将一个DataFrame作为结果返回
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show
全局临时视图
Spark SQL的临时视图作用域是会话域,如果创建视图的会话终止,那么视图也会消失。如果希望能够在所有会话中-共享一个临时视图,并让其保持活动状态直到Spark应用程序终止,我们可以使用global temporary view
。全局临时视图域系统保留的数据库global_temp
绑定。我们需要使用正确的名称创建它,如SELECT * FROM global_temp.view1
// 注册一个全局临时视图
df.createGlobalTempView("people")
// 全局临时视图与global_temp关联
spark.sql("SELECT * FROM global_temp.people").show()
// 全局临时视图的作用域
spark.newSession().sql("SELECT * FROM global_temp.people")
创建Datasets
Datasets类似于RDDs,不过没有使用Java serialization或者Kryo,而是使用专门的编码器来序列化对象,以便在网络进行处理或传输。
编码器和标准序列化都能将一个对象转化成一个字符数组,但是编码器是动态生成的,而且不需要进行阶码就可以支持Spark的很多操作,例如filtering
,sorting
,hashing
case class Person(name: String, age: Long)
// 为case class创建编码器
val caseClassDs = Seq(Person("Any", 32)).toDS()
// 大多数常见类型的编码器是由`spark.implicits._`库自动导入
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect()
// 通过传递一个类,可以将DataFrames转换成Dataset。映射根据名称进行
val path = "src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
RDD互操作
Spark SQL支持两种不同的方法,将现有RDD转换为数据集。个方法是通过反射来推断RDD的schema。基于反射的方式在你了解Spark应用程序的所有schema时,可以让程序变得更加简洁。
第二种方式是允许用户通过一个动态接口创建一个schema并将其应用到一个现有的RDD中。这种方式更加啰嗦,但是它允许你在程序不知道列名或者类型的情况下创建datasets
使用反射进行推断
如前文所述,Spark SQL的Scala接口支持将包含case class
的RDD转换成DataFrame。这个case class定义了表的视图。case class参数的名称被映射为列名称。case class也是能够嵌套的,它可以包含复杂类型例如Seqs
和Arrays
。这个RDD能够也能隐式转换成DataFrame,然后注册成一个表。
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager()).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive * and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
# people.txt
# Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo
Michael, 29
Andy, 30
Justin, 19
编程方式创建schema
当无法提前定义case类的时候(比如编码为字符串或者对文本数据集进行解析,并为不同的用户投影不同的字段),需要用来以下三步以编程的方式创建DataFrame
从原始RDD创建一个Rows类型的RDD 创建一个代表schema的StructType,与步骤一的RDD进行匹配 通过SparkSession中的 createDataFrame
方法将schema应用成RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.*._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes()).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
标量函数
与聚合函数多行得到一个值返回不同,标量函数是每行返回一个值。Spark SQL支持多种Built-in Scalar Functions,也支持User Defined Scalar Functions
聚合函数
Aggregate function
返回多行的一个值,Built-in Aggregation Functions提供了多种聚合函数——count()
,avg()
等等。用户也能自己创建聚合函数,更多细节参考User Defined Aggregate Functions。