绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
[大数据之Spark]——Actions算子操作入门实例
2020-05-12 18:24:50

Actions

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

这个方法会传入两个参数,计算这两个参数返回一个结果。返回的结果与下一个参数一起当做参数继续进行计算。

比如,计算一个数组的和。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)

scala> data.collect
res6: Array[Int] = Array(1, 2, 3)

//collect计算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

返回数据集的所有元素,通常是在使用filter或者其他操作的时候,返回的数据量比较少时使用。

比如,显示刚刚定义的数据集内容。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)

scala> data.collect
res6: Array[Int] = Array(1, 2, 3)

count()

Return the number of elements in the dataset.

计算数据集的数据个数,一般都是统计内部元素的个数。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)

//统计个数
scala> data.count
res7: Long = 3

scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2

first()

Return the first element of the dataset (similar to take(1)).

返回数据集的个元素,类似take(1)

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))

//获取条元素
scala> data.first
res9: (String, Int) = (A,1)

take(n)

Return an array with the first n elements of the dataset.

返回数组的头n个元素

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))

scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))

//如果n大于总数,则会返回所有的数据
scala> data.take(8)
res12: Array[(String, Int)] = Array((A,1), (B,1))

//如果n小于等于0,会返回空数组
scala> data.take(-1)
res13: Array[(String, Int)] = Array()

scala> data.take(0)
res14: Array[(String, Int)] = Array()

takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

这个方法与sample还是有一些不同的,主要表现在:

  • 返回具体个数的样本(第二个参数指定)
  • 直接返回array而不是RDD
  • 内部会将返回结果随机打散
//创建数据集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

//随机2个数据
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)

//随机4个数据,注意随机的数据可能是重复的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)

//个参数是是否重复
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)

scala> data.takeSample(false,5,1)
res3: Array[Int] = Array(3, 5, 7, 1)

takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

基于内置的排序规则或者自定义的排序规则排序,返回前n个元素

//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21

//返回排序数据
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c)

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

将数据集作为文本文件保存到指定的文件系统、hdfs、或者hadoop支持的其他文件系统中。

//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21

//保存为test_data_save文件
scala> data.saveAsTextFile("test_data_save")

scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
<console>:24: error: not found: type GzipCodec
              data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
                                                            ^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec

//保存为压缩文件
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])

查看文件

[xingoo@localhost bin]$ ll
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2
[xingoo@localhost bin]$ cd test_data_save2
[xingoo@localhost test_data_save2]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save2]$ cd ..
[xingoo@localhost bin]$ cd test_data_save
[xingoo@localhost test_data_save]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save]$ cat part-00000 
b
a
e
f
c

saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on * that are implicitly convertible to Writable (Spark includes conversions for basic * like Int, Double, String, etc).

保存为sequence文件

scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:22

scala> data.saveAsSequenceFile("kv_test")

[xingoo@localhost bin]$ cd kv_test/
[xingoo@localhost kv_test]$ ll
total 12
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:25 _SUCCESS

saveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

基于Java序列化保存文件

scala> var data = sc.parallelize(List("a","b","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:22

scala> data.saveAsObjectFile("str_test")

scala> var data2 = sc.objectFile[Array[String]]("str_test")
data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at <console>:22

scala> data2.collect

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

统计KV中,相同K的V的个数

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22

//统计个数
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

针对每个参数执行,通常在更新互斥或者与外部存储系统交互的时候使用

// 创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22

// 遍历
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello

福利部分:

《大数据成神之路》大纲

大数据成神之路shimo.im图标

《几百TJava和大数据资源下载》

资源下载shimo.im
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Apache Spark技术专区
创建时间:2020-05-08 17:16:40
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 栈栈
    专家
戳我,来吐槽~