绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Spark RDD概述
2020-05-09 16:33:38

本文主要是翻译了Spark的官方rdd-programming-guide的文档,链接是:

RDD Programming Guidespark.apache.org图标

RDD Operations

Basic

为了描述RDD基础,我们看看下面的这个简单的程序:

lines = sc.textFile("data.txt“)
lineLengths = lines.map(lambdas s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

行使用外部文件定义了一个RDD。该数据并未被加载到内存中,仅仅只得到了一个指向文件的指针。第二行定义了LineLengths,作为map这个transformation操作的结果。同样的,这个操作并没有立即执行,因为transformation是惰性的,lazy的。后,我们执行了reduce,这是一个action操作,此时Spark将计算任务分配到不同机器上,并且每个机器执行属于自己部分的map和本地的reduction,后将结果返回driver。 如果我们仍然想使用lineLengths,可以加上一行代码

lineLength.persist()

reduce之前,lineLengths被计算完成时,它就会被保存在内存中。

Passing Functions to Spark

cluster上运行driver programe时,Spark API很依赖传递函数。建议使用下面三种方式:

  1. lambda表达式,这是python用来简化函数的一种方式
  2. 局部定义的方程
  3. 在模块中定义的方程
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)
    sc = SparkCOntext(...)
    sc.textFile("file.txt").map(myFunc)
class MyClass(object):
    def doStuff(self, rdd):
        field = self.field
        return rdd.map(lambda s: field + s)

了解闭包

在Spark中,一个比较难的知识点是通过cluster执行代码,如何理解变量和方法的作用范围和生命周期。在RDD操作中,修改超出其范围的变量经常会引起混乱。下面我们看一个例子: 对于简单的RDD进行加和:

counter = 0
rdd = s.parallelize(data)

# 错误代码
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increament_counter)

print("Counter value: ", counter)

Local vs cluster modes

上面的代码并不能按照预期执行,为了执行jobs,Spark将任务分解为tasks,每个都被一个executor执行。在执行之前,Spark计算task的闭包。闭包就是那些变量和方法只对执行自己计算的executor可见。闭包是序列化并且被发送到每个executor. 发送给executor的闭包中的变量已经被复制,因此再被foreach函数引用时,已经不是driver node中的counter了,而是每个executor中闭包里面的counter了。在driver node的内存中仍然有counter,但是对于executor不可见。因此,这个函数并不能达到预期效果,执行print之后仍然是0。 在local模式中,在某些情况下,counter可能会正常更新。 总的来书,闭包机制就像循环或者局部定义的方法,不应该去更新全局状态。对于闭包之外的对象引用,Spark并不一定成功执行。有些或许会在local mode执行成功,但是在分布式执行时,可能就会出问题。

print elements of an RDD

使用rdd.foreach(println)或者rdd.map(println)打印一个RDD是常见方法。在单独机器上,我们可以得到期望的输出,并且打印出所有RDD的元素。但是,在cluster mode,输出是正在向executor's stdout写入的那个executor,而不是在driver中的,因此得不到预期输出。为了得到driver中的全部元素,我们可以是使用collect()函数,先将RDD移动到driver node,然后在打印,rdd.collect().foreach(println)。但是这可能导致driver内存爆了,因为collect()会将全部的RDD都移动到同一个机器上;如果只需要查看部分RDD,更好的办法是使用take()函数:rdd.take(100).foreach(println)

Working with Key-Value Pairs

大部分的RDD操作可以处理任意类型的数据,少数特操作只能处理key-value pairs的RDD,常见的是分布式的shuffle操作,比如通过key对元素与进行分组或者聚合的操作。 在Python中,这种使用Python内置的tuples元组,就可以很容易创建一个键值对。举个例子,下面的代码是在一个key-value pairs上面使用 reduceByKey操作,去计算文件中每行出现的次数。

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

Transformations

下面是常用的triansformations操作;

| Transformation | Meaning |

| map(func) | 把源数据每个元素传进函数func中进行处理,终得到一个新的分布式数据 |

| filter(func) | 把源数据的每个元素传进func中,若是返回为true则保留,否则不要,终返回新的数据集 |

Actions

下面的表示常用的action操作

| Actions | Meaning |

| reduce(func) | 使用函数func对数据集的元素进行聚合,这个func需要两个参数,并且返回一个结果。为了在并行化是正确计算,这个func必须满足交换律和结合律|

Shuffle operations

在Spark中,一些操作会触发shuffleshuffle操作是Spark中用来重新分布数据的一个机制,用来跨分区对数据进行不同的分组。由于shuffle操作通常涉及跨机器和跨executor,因此shuffle操作十分复杂且昂贵。

Background

为了理解在shuffle时发生了什么,我们用reduceByKey操作举例。reduceByKey会生成一个新的RDD,所有key相同的values会结合通过reduce函数结合起来,同时生成新的key-value pair。但是问题在于,在reduce之前,那些key相同的key-value pair并不一定在同一个分区partition上,甚至不再同一个机器上,但是为了得到计算结果,它们必须被放在同一位置上。 在Spark中,数据通常不会分布在各个分区中,而是位于某些特定操作的必要位置。在计算中,一个task会操作一个partiton分区,因此,为了给一个reduceByKey这个reduce task组织所有数据去执行,Spark需要执行一个all-to-all的操作。必须从所有的partitons中为所有keys找到所有values,然后在跨分区把所有values汇总计算。整个这个过程就被称作shuffle。 尽管在每个分区中,所有重新被shuffle的元素集合是确定的,分区本身也是有序的,但是元素集合中元素的是无序的。如果我们希望在shuffle之后元素是有序的,我们可以使用下面的方法:

  • mapPartitions,对每个partititon进行排序
  • repartitionAndSortWithinPartitions,在重新分区时,能够高效地进行排序
  • sortBy,创建一个全局有序的RDD

能够导致shuffle的操作包括:

  • 区操作,比如repartitioncoalesce
  • ByKey操作,比如gruopByKeyreduceByKey
  • join操作,比如cogroupjoin

Performance Impact

shuffle操作代价很大,设计了磁盘I/O,数据序列化,网络I/O。为了给shuffle组织所有数据,Spark会开启很多task,其中包括很多map task来组织数据,很多reduce task来聚合数据。这个属于来自于MapReduce的概念,和Spark中的mapreduce操作没有关系。 在内部,单个map task的结果会一直保存在内存中,直到放不下为止。然后,数据被在目标分区被排好序,然后被写入一个单独的文件中。reduce阶段,task读取相应的排好序的数据块。 shuffle操作为耗费大量的堆内存,当内存不足时,数据会被写入磁盘中,从而产生额外的磁盘I/O开销和垃圾回收。 可以通过一些配置参数来调整shullfe过程。

RDD Persistence

Spark一个重要的功能就是持久化在内存中的数据,这个操作叫做persisting或者叫caching。这里的持久化可以理解为保留,保存,缓存。当我们持久化RDD之后,在后续操作中使用到该RDD时,速度就会快10几倍。 对于要持久化的RDD,我们可以使用persist()cache方法。当投被计算后,会被保存在node的内存中。 此外,每个被持久化的RDD可以进行不同级别的存储,比如持久化到磁盘上,内存中,在node之间复制等。在persist()方法中,这个级别由storageLevel对象进行控制。在cache()方法中,有默认的存储级别。全部的存储级别如下所示:

| Storeage Level | Meaning |

| --- | --- |

| MEMORY_ONLY | 将RDD作为反序列化的Java对象,存储在JVM中。如果内存不足,某些分区不会被缓存,而是在需要时重新计算,这是默认的存储级别 |

| MEMORY_AND_DISK | 同上,但是内存不足时,把内存不足的分区存储在磁盘上。 |

| DISK_ONLY | 只在磁盘上存储RDD分区 |

注意:在python中,被存储的对象使用Pickle库进行序列化。

shuffle时,会自动调用persist来持久化一些中间数据,避免某个节点故障,还得重新计算所有输入。强烈建议,如果我们需要重用某个RDD,一定调用persist进持久化。 一般建议使用MEMORY_ONLY,这种方式比较消耗CPU,但是很快。如果数据量特别大,才去用另外的方法。 Spark会自动丢弃长时间未被使用的缓存,如果想要手动删除,可以使用RDD.unpersist()方法。

Shared Variables

一般来说,当一个函数与被传进一个Spark操作中(比如mapreduce),在函数中处理的变量是driver中拷贝的副本。每个机器中的副本都不会再传回driver中了。在不同task之间支持这中读-写共享变量会让效率变低。但是Spark还是提供了两个共享变量的类型:broadcast variablesaccumulators

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Apache Spark技术专区
创建时间:2020-05-08 17:16:40
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~