Actions
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
这个方法会传入两个参数,计算这两个参数返回一个结果。返回的结果与下一个参数一起当做参数继续进行计算。
比如,计算一个数组的和。
//创建数据集
scala> var data = sc.parallelize(1 to 3,1)
scala> data.collect
res6: Array[Int] = Array(1, 2, 3)
//collect计算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6
collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
返回数据集的所有元素,通常是在使用filter或者其他操作的时候,返回的数据量比较少时使用。
比如,显示刚刚定义的数据集内容。
//创建数据集
scala> var data = sc.parallelize(1 to 3,1)
scala> data.collect
res6: Array[Int] = Array(1, 2, 3)
count()
Return the number of elements in the dataset.
计算数据集的数据个数,一般都是统计内部元素的个数。
//创建数据集
scala> var data = sc.parallelize(1 to 3,1)
//统计个数
scala> data.count
res7: Long = 3
scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2
first()
Return the first element of the dataset (similar to take(1)).
返回数据集的个元素,类似take(1)
//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))
//获取条元素
scala> data.first
res9: (String, Int) = (A,1)
take(n)
Return an array with the first n elements of the dataset.
返回数组的头n个元素
//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))
//如果n大于总数,则会返回所有的数据
scala> data.take(8)
res12: Array[(String, Int)] = Array((A,1), (B,1))
//如果n小于等于0,会返回空数组
scala> data.take(-1)
res13: Array[(String, Int)] = Array()
scala> data.take(0)
res14: Array[(String, Int)] = Array()
takeSample(withReplacement, num, [seed])
Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
这个方法与sample还是有一些不同的,主要表现在:
- 返回具体个数的样本(第二个参数指定)
- 直接返回array而不是RDD
- 内部会将返回结果随机打散
//创建数据集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
//随机2个数据
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)
//随机4个数据,注意随机的数据可能是重复的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)
//个参数是是否重复
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)
scala> data.takeSample(false,5,1)
res3: Array[Int] = Array(3, 5, 7, 1)
takeOrdered(n, [ordering])
Return the first n elements of the RDD using either their natural order or a custom comparator.
基于内置的排序规则或者自定义的排序规则排序,返回前n个元素
//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
//返回排序数据
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c)
saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
将数据集作为文本文件保存到指定的文件系统、hdfs、或者hadoop支持的其他文件系统中。
//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
//保存为test_data_save文件
scala> data.saveAsTextFile("test_data_save")
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
<console>:24: error: not found: type GzipCodec
data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec
//保存为压缩文件
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
查看文件
[xingoo@localhost bin]$ ll
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2
[xingoo@localhost bin]$ cd test_data_save2
[xingoo@localhost test_data_save2]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save2]$ cd ..
[xingoo@localhost bin]$ cd test_data_save
[xingoo@localhost test_data_save]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save]$ cat part-00000
b
a
e
f
c
saveAsSequenceFile(path)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on * that are implicitly convertible to Writable (Spark includes conversions for basic * like Int, Double, String, etc).
保存为sequence文件
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:22
scala> data.saveAsSequenceFile("kv_test")
[xingoo@localhost bin]$ cd kv_test/
[xingoo@localhost kv_test]$ ll
total 12
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002
-rw-r--r--. 1 xingoo xingoo 0 Oct 10 23:25 _SUCCESS
saveAsObjectFile(path)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
基于Java序列化保存文件
scala> var data = sc.parallelize(List("a","b","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:22
scala> data.saveAsObjectFile("str_test")
scala> var data2 = sc.objectFile[Array[String]]("str_test")
data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at <console>:22
scala> data2.collect
countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
统计KV中,相同K的V的个数
//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22
//统计个数
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)
foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
针对每个参数执行,通常在更新互斥或者与外部存储系统交互的时候使用
// 创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22
// 遍历
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello
福利部分:
《大数据成神之路》大纲
大数据成神之路《几百TJava和大数据资源下载》
资源下载