本文主要是翻译了Spark的官方rdd-programming-guide的文档,链接是:
RDD Programming GuideRDD Operations
Basic
为了描述RDD基础,我们看看下面的这个简单的程序:
lines = sc.textFile("data.txt“)
lineLengths = lines.map(lambdas s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
行使用外部文件定义了一个RDD。该数据并未被加载到内存中,仅仅只得到了一个指向文件的指针。第二行定义了LineLengths
,作为map
这个transformation操作的结果。同样的,这个操作并没有立即执行,因为transformation
是惰性的,lazy的。后,我们执行了reduce
,这是一个action
操作,此时Spark将计算任务分配到不同机器上,并且每个机器执行属于自己部分的map
和本地的reduction
,后将结果返回driver
。 如果我们仍然想使用lineLengths
,可以加上一行代码
lineLength.persist()
在reduce
之前,lineLengths
被计算完成时,它就会被保存在内存中。
Passing Functions to Spark
在cluster
上运行driver programe
时,Spark API很依赖传递函数。建议使用下面三种方式:
- lambda表达式,这是python用来简化函数的一种方式
- 局部定义的方程
- 在模块中定义的方程
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkCOntext(...)
sc.textFile("file.txt").map(myFunc)
class MyClass(object):
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)
了解闭包
在Spark中,一个比较难的知识点是通过cluster
执行代码,如何理解变量和方法的作用范围和生命周期。在RDD操作中,修改超出其范围的变量经常会引起混乱。下面我们看一个例子: 对于简单的RDD进行加和:
counter = 0
rdd = s.parallelize(data)
# 错误代码
def increment_counter(x):
global counter
counter += x
rdd.foreach(increament_counter)
print("Counter value: ", counter)
Local vs cluster modes
上面的代码并不能按照预期执行,为了执行jobs
,Spark将任务分解为tasks
,每个都被一个executor
执行。在执行之前,Spark计算task
的闭包。闭包就是那些变量和方法只对执行自己计算的executor
可见。闭包是序列化并且被发送到每个executor
. 发送给executor
的闭包中的变量已经被复制,因此再被foreach
函数引用时,已经不是driver node
中的counter了,而是每个executor
中闭包里面的counter了。在driver node
的内存中仍然有counter,但是对于executor
不可见。因此,这个函数并不能达到预期效果,执行print
之后仍然是0。 在local模式中,在某些情况下,counter可能会正常更新。 总的来书,闭包机制就像循环或者局部定义的方法,不应该去更新全局状态。对于闭包之外的对象引用,Spark并不一定成功执行。有些或许会在local mode
执行成功,但是在分布式执行时,可能就会出问题。
print elements of an RDD
使用rdd.foreach(println)
或者rdd.map(println)
打印一个RDD是常见方法。在单独机器上,我们可以得到期望的输出,并且打印出所有RDD的元素。但是,在cluster mode
,输出是正在向executor's stdout
写入的那个executor
,而不是在driver
中的,因此得不到预期输出。为了得到driver
中的全部元素,我们可以是使用collect()
函数,先将RDD移动到driver node
,然后在打印,rdd.collect().foreach(println)
。但是这可能导致driver
内存爆了,因为collect()
会将全部的RDD都移动到同一个机器上;如果只需要查看部分RDD,更好的办法是使用take()
函数:rdd.take(100).foreach(println)
Working with Key-Value Pairs
大部分的RDD操作可以处理任意类型的数据,少数特操作只能处理key-value pairs
的RDD,常见的是分布式的shuffle
操作,比如通过key
对元素与进行分组或者聚合的操作。 在Python中,这种使用Python内置的tuples元组,就可以很容易创建一个键值对。举个例子,下面的代码是在一个key-value pairs
上面使用 reduceByKey
操作,去计算文件中每行出现的次数。
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
Transformations
下面是常用的triansformations
操作;
| Transformation | Meaning |
| map(func) | 把源数据每个元素传进函数func
中进行处理,终得到一个新的分布式数据 |
| filter(func) | 把源数据的每个元素传进func
中,若是返回为true
则保留,否则不要,终返回新的数据集 |
Actions
下面的表示常用的action
操作
| Actions | Meaning |
| reduce(func) | 使用函数func
对数据集的元素进行聚合,这个func
需要两个参数,并且返回一个结果。为了在并行化是正确计算,这个func
必须满足交换律和结合律|
Shuffle operations
在Spark中,一些操作会触发shuffle
。shuffle
操作是Spark中用来重新分布数据的一个机制,用来跨分区对数据进行不同的分组。由于shuffle
操作通常涉及跨机器和跨executor
,因此shuffle
操作十分复杂且昂贵。
Background
为了理解在shuffle
时发生了什么,我们用reduceByKey
操作举例。reduceByKey
会生成一个新的RDD,所有key
相同的values
会结合通过reduce
函数结合起来,同时生成新的key-value pair
。但是问题在于,在reduce
之前,那些key
相同的key-value pair
并不一定在同一个分区partition
上,甚至不再同一个机器上,但是为了得到计算结果,它们必须被放在同一位置上。 在Spark中,数据通常不会分布在各个分区中,而是位于某些特定操作的必要位置。在计算中,一个task
会操作一个partiton
分区,因此,为了给一个reduceByKey
这个reduce task
组织所有数据去执行,Spark需要执行一个all-to-all
的操作。必须从所有的partitons
中为所有keys
找到所有values
,然后在跨分区把所有values
汇总计算。整个这个过程就被称作shuffle
。 尽管在每个分区中,所有重新被shuffle
的元素集合是确定的,分区本身也是有序的,但是元素集合中元素的是无序的。如果我们希望在shuffle
之后元素是有序的,我们可以使用下面的方法:
-
mapPartitions
,对每个partititon
进行排序 -
repartitionAndSortWithinPartitions
,在重新分区时,能够高效地进行排序 -
sortBy
,创建一个全局有序的RDD
能够导致shuffle
的操作包括:
- 区操作,比如
repartition
和coalesce
-
ByKey
操作,比如gruopByKey
和reduceByKey
-
join
操作,比如cogroup
和join
等
Performance Impact
shuffle
操作代价很大,设计了磁盘I/O,数据序列化,网络I/O。为了给shuffle
组织所有数据,Spark会开启很多task
,其中包括很多map task
来组织数据,很多reduce task
来聚合数据。这个属于来自于MapReduce
的概念,和Spark中的map
和reduce
操作没有关系。 在内部,单个map task
的结果会一直保存在内存中,直到放不下为止。然后,数据被在目标分区被排好序,然后被写入一个单独的文件中。reduce
阶段,task
读取相应的排好序的数据块。 shuffle
操作为耗费大量的堆内存,当内存不足时,数据会被写入磁盘中,从而产生额外的磁盘I/O开销和垃圾回收。 可以通过一些配置参数来调整shullfe
过程。
RDD Persistence
Spark一个重要的功能就是持久化在内存中的数据,这个操作叫做persisting
或者叫caching
。这里的持久化可以理解为保留,保存,缓存。当我们持久化RDD之后,在后续操作中使用到该RDD时,速度就会快10几倍。 对于要持久化的RDD,我们可以使用persist()
和cache
方法。当投被计算后,会被保存在node
的内存中。 此外,每个被持久化的RDD可以进行不同级别的存储,比如持久化到磁盘上,内存中,在node
之间复制等。在persist()
方法中,这个级别由storageLevel
对象进行控制。在cache()
方法中,有默认的存储级别。全部的存储级别如下所示:
| Storeage Level | Meaning |
| --- | --- |
| MEMORY_ONLY | 将RDD作为反序列化的Java对象,存储在JVM中。如果内存不足,某些分区不会被缓存,而是在需要时重新计算,这是默认的存储级别 |
| MEMORY_AND_DISK | 同上,但是内存不足时,把内存不足的分区存储在磁盘上。 |
| DISK_ONLY | 只在磁盘上存储RDD分区 |
注意:在python中,被存储的对象使用Pickle
库进行序列化。
shuffle
时,会自动调用persist
来持久化一些中间数据,避免某个节点故障,还得重新计算所有输入。强烈建议,如果我们需要重用某个RDD,一定调用persist
进持久化。 一般建议使用MEMORY_ONLY
,这种方式比较消耗CPU,但是很快。如果数据量特别大,才去用另外的方法。 Spark会自动丢弃长时间未被使用的缓存,如果想要手动删除,可以使用RDD.unpersist()
方法。
Shared Variables
一般来说,当一个函数与被传进一个Spark操作中(比如map
和reduce
),在函数中处理的变量是driver
中拷贝的副本。每个机器中的副本都不会再传回driver
中了。在不同task
之间支持这中读-写共享变量会让效率变低。但是Spark还是提供了两个共享变量的类型:broadcast variables
和accumulators
。