
| RDD是什么? | resilient distributed dataset 弹性分布式数据集 |
| RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,一个只读的,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。 | |
| 如何创建RDD? | |
| RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 | |
| 举例 | 从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。 |
| scala> val a = sc.parallelize(1 to 9, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12 | |
| 举例 | 读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素 |
| scala> val b = sc.textFile("README.md") b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12 | |
| 为什么会产生RDD? | (1)传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。RDD正是解决这一缺点的抽象方法 |
| (2)RDD的具体描述RDD(弹性数据集)是Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作。可以将RDD理解为一个具有容错机制的特殊集合,它提供了一种只读、只能有已存在的RDD变换而来的共享内存,然后将所有数据都加载到内存中,方便进行多次重用。a.他是分布式的,可以分布在多台机器上,进行计算。b.他是弹性的,计算过程中内错不够时它会和磁盘进行数据交换。c.这些限制可以极大的降低自动容错开销d.实质是一种更为通用的迭代并行计算框架,用户可以显示的控制计算的中间结果,然后将其自由运用于之后的计算。 | |
| (3)RDD的容错机制实现分布式数据集容错方法有两种:数据检查点和记录更新RDD采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块上执行的单个操作,然后创建某个RDD的变换序列(血统)存储下来;变换序列指,每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统”容错。 要实现这种“血统”容错机制,最大的难题就是如何表达父RDD和子RDD之间的依赖关系。实际上依赖关系可以分两种,窄依赖和宽依赖:窄依赖:子RDD中的每个数据块只依赖于父RDD中对应的有限个固定的数据块;宽依赖:子RDD中的一个数据块可以依赖于父RDD中的所有数据块。例如:map变换,子RDD中的数据块只依赖于父RDD中对应的一个数据块;groupByKey变换,子RDD中的数据块会依赖于多有父RDD中的数据块,因为一个key可能错在了父RDD的任何一个数据块中 将依赖关系分类的两个特性:第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。 | |
| (4)RDD内部的设计每个RDD都需要包含以下四个部分:a.源数据分割后的数据块,源代码中的splits变量b.关于“血统”的信息,源码中的dependencies变量c.一个计算函数(该RDD如何通过父RDD计算得到),源码中的iterator(split)和compute函数d.一些关于如何分块和数据存放位置的元信息,如源码中的partitioner和preferredLocations例如:a.一个从分布式文件系统中的文件得到的RDD具有的数据块通过切分各个文件得到的,它是没有父RDD的,它的计算函数只是读取文件的每一行并作为一个元素返回给RDD;b.对与一个通过map函数得到的RDD,它会具有和父RDD相同的数据块,它的计算函数式对每个父RDD中的元素所执行的一个函数 | |
| RDD在Spark中的地位及作用 | (1)为什么会有Spark?因为传统的并行计算模型无法有效的解决迭代计算(iterative)和交互式计算(interactive);而Spark的使命便是解决这两个问题,这也是他存在的价值和理由。 |
| (2)Spark如何解决迭代计算?其主要实现思想就是RDD,把所有计算的数据保存在分布式的内存中。迭代计算通常情况下都是对同一个数据集做反复的迭代计算,数据在内存中将大大提升IO操作。这也是Spark涉及的核心:内存计算。 | |
| (3)Spark如何实现交互式计算?因为Spark是用scala语言实现的,Spark和scala能够紧密的集成,所以Spark可以完美的运用scala的解释器,使得其中的scala可以像操作本地集合对象一样轻松操作分布式数据集。 | |
| (4)Spark和RDD的关系?可以理解为:RDD是一种具有容错性基于内存的集群计算抽象方法,Spark则是这个抽象方法的实现。 | |
| 如何操作RDD? | (1)如何获取RDDa.从共享的文件系统获取,(如:HDFS)b.通过已存在的RDD转换c.将已存在scala集合(只要是Seq对象)并行化 ,通过调用SparkContext的parallelize方法实现d.改变现有RDD的之久性;RDD是懒散,短暂的。(RDD的固化:cache缓存至内存; save保存到分布式文件系统) |
| (2)操作RDD的两个动作a.Actions:对数据集计算后返回一个数值value给驱动程序;例如:Reduce将数据集的所有元素用某个函数聚合后,将最终结果返回给程序。b.Transformation:根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:Map将数据的每个元素经过某个函数计算后,返回一个新的分布式数据集。 | |
| 下一页是关于RDD的方法的说明和样例 | |
| The RDD API By Example | RDD is short for Resilient Distributed Dataset. RDDs are the workhorse of the Spark system. As a user, one can consider a RDD as a handle for a collection of individual data partitions, which are the result of some computation. RDD弹性分布式数据集合;Spark系统核心;作为用户,可以认为rdd是对给定数据分区做相应计算的结果集合 However, an RDD is actually more than that. On cluster installations, separate data partitions can be on separate nodes. Using the RDD as a handle one can access all partitions and perform computations and transformations using the contained data. Whenever a part of a RDD or an entire RDD is lost, the system is able to reconstruct the data of lost partitions by using lineage information. Lineage refers to the sequence of transformations used to produce the current RDD. As a result, Spark is able to recover automatically from most failures. 事实上很复杂;在集群上,数据分散在不同节点上;使用Rdd可以处理一个或所有分区的数据,应用上回调算法变换;当一个或整个rdd丢失,系统可以自动根据lineage信息恢复;历史信息是当前rdd的上一个产生者;spark可以自动恢复大多数失败。 |
All RDDs available in Spark derive either directly or indirectly from the class RDD. This class comes with a large set of methods that perform operations on the data within the associated partitions. The class RDD is abstract. Whenever, one uses a RDD, one is actually using a concertized implementation of RDD. These implementations have to overwrite some core functions to make the RDD behave as expected. RDD可以直接或间接的创建;都需要继承实现核心的方法; One reason why Spark has lately become a very popular system for processing big data is that it does not impose restrictions regarding what data can be stored within RDD partitions. The RDD API already contains many useful operations. But, because the creators of Spark had to keep the core API of RDDs common enough to handle arbitrary data-types, many convenience functions are missing. spark最近比较火的一个原因是:大数据的分区没有限定必须保存;rdd api包含了许多有用的操作;但spark开发者必须保证能处理任意的数据类型,失去了许多便利的方法; | |
| The basic RDD API considers each data item as a single value. However, users often want to work with key-value pairs. Therefore Spark extended the interface of RDD to provide additional functions (PairRDDFunctions), which explicitly work on key-value pairs. Currently, there are four extensions to the RDD API available in spark. They are as follows: 一般情况下rdd将单个value作为项,但有时需要key-value 键值对;spark扩展了rdd提供了增加的方法;目前提供如下几种: DoubleRDDFunctions This extension contains many useful methods for aggregating numeric values. They become available if the data items of an RDD are implicitly convertible to the Scala data-type double. 数值类型的聚合方法;rdd的数据项被自动转换为scala数据类型的double; PairRDDFunctions Methods defined in this interface extension become available when the data items have a two component tuple structure. Spark will interpret the first tuple item (i.e. tuplename. 1) as the key and the second item (i.e. tuplename. 2) as the associated value. 键值对方式的Rdd,是一个有2个项的元组组成的元素; OrderedRDDFunctions Methods defined in this interface extension become available if the data items are two-component tuples where the key is implicitly sortable. 对key-value方式的2项元组,自动按key进行排序 SequenceFileRDDFunctions This extension contains several methods that allow users to create Hadoop sequence- les from RDDs. The data items must be two compo- nent key-value tuples as required by the PairRDDFunctions. However, there are additional requirements considering the convertibility of the tuple components to Writable types. sequenceFile方式的Rdd,扩展的方法里,能让用户从hadoop sequence类型的文件来创建rdd;数据项必须是key-value的元组;实际上是需要转换元组为可序列化类型 | |
| Since Spark will make methods with extended functionality automatically available to users when the data items fulfill the above described requirements, we decided to list all possible available functions in strictly alphabetical order. We will append either of the followingto the function-name to indicate it belongs to an extension that requires the data items to conform to a certain format or type. 以下标识rdd方法对应的所需数据结构 [Double] - Double RDD Functions [Ordered] - OrderedRDDFunctions [Pair] - PairRDDFunctions [SeqFile] - SequenceFileRDDFunctions | |
| function | 名称 | 说明 | 定义 | Examples 1 | Examples 2 | |
| aggregate | 聚合 | zeroValue同时是seqOp和combOp的初始值,是流式操作的第一个值; 也就是每个分区合并计算后,zeroValue是reduce步骤的初始值; 为什么有两个合并函数?第一个是将分区内的输入转为目标类型;注意U!=T;然后进行CombOp; | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | al z = sc.parallelize(List(1,2,3,4,5,6), 2) z.aggregate(0)(math.max(_, _), _ + _) res40: Int = 9 val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("")(_ + _, _+_) res115: String = abcdef z.aggregate("x")(_ + _, _+_) res116: String = xxdefxabc val z = sc.parallelize(List("12","23","345","4567"),2) z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res141: String = 42 z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res142: String = 11 val z = sc.parallelize(List("12","23","345",""),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res143: String = 10 | val z = sc.parallelize(List("12","23","","345"),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res144: String = 11 | |
| cartesian | 笛卡尔 | 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。(例如:第一个RDD的每个元素与第二个RDD的每个元素相交;)(Warning: 小心使用这个函数;内存的消耗很快造成崩溃!) | def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] | val x = sc.parallelize(List(1,2,3,4,5)) val y = sc.parallelize(List(6,7,8,9,10)) x.cartesian(y).collect res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10)) | ||
| checkpoint | 设置检查点 | 当RDD下一步要计算时进行检查点操作;检查点以二进制文件的形式保存的目录里; 注意:如果是本地目录,必须每个节点都有这个目录; 当然也可以用HDFS代替; | def checkpoint() | sc.setCheckpointDir("my_directory_name") val a = sc.parallelize(1 to 4) a.checkpoint a.count 14/02/25 18:13:53 INFO SparkContext: Starting job: count at <console>:15 ... 14/02/25 18:13:53 INFO MemoryStore: Block broadcast_5 stored as values to memory (estimated size 115.7 KB, free 296.3 MB) 14/02/25 18:13:53 INFO RDDCheckpointData: Done checkpointing RDD 11 to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/my_directory_name/65407913-fdc6-4ec1-82c9-48a1656b95d6/rdd-11, new parent is RDD 12 res23: Long = 4 | ||
| coalesce, repartition | 聚合接合;重新分区 | 重新分配分区个数;repartition(n)相当于coalesce(n,shuffer=true) | def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T] def repartition ( numPartitions : Int ): RDD [T] | val y = sc.parallelize(1 to 10, 10) val z = y.coalesce(2, false) z.partitions.length res9: Int = 2 | ||
| cogroup [pair], groupWith [pair] | 分组 | 非常强大的一组分组功能;可以用key对3组key-value分组在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup | def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))] | val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.map((_, "b")) val c = a.map((_, "c")) b.cogroup(c).collect res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c))), (3,(ArrayBuffer(b),ArrayBuffer(c))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c))) ) val d = a.map((_, "d")) b.cogroup(c, d).collect res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d))) ) val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2) val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2) x.cogroup(y).collect res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), (2,(ArrayBuffer(banana),ArrayBuffer())), (3,(ArrayBuffer(orange),ArrayBuffer())), (1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))), (5,(ArrayBuffer(),ArrayBuffer(computer)))) | ||
| collect, toArray | 输出集合 | 将rdd转换为scala数组形式输出,如果你提供一个标准的map-function (例如: f = T -> U),它将在输出到数组之前做转换; 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM | def collect(): Array[T] def collect[U:ClassTag](f:PartialFunction[T, U]): RDD[U] def toArray(): Array[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.collect res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat) | ||
| collectAsMap [pair] | 输出Pair RDD为map结构 | 如同collect;但作用在key-value RDD上,转换为scala map结构对应的key-value | def collectAsMap(): Map[K, V] | val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.zip(a) b.collectAsMap res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3) | ||
| combineByKey [pair] | 根据key合并 | 根据二元组一个接一个得应用合并器:创建,合并值,合并集合 | def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] | val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val c = b.zip(a) val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y) d.collect res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf))) | ||
| compute | 根据依赖开始计算,内部调用。 | def compute(split: Partition, context: TaskContext): Iterator[T] | ||||
| context, sparkContext | SparkContext | 返回创建RDD使用的sparkContext | def compute(split: Partition, context: TaskContext): Iterator[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.context res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58c1c2f1 | ||
| count | 元素数 | 返回RDD内部对应的数据集的元素个数 | def count(): Long | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.count res2: Long = 4 | ||
| countApprox | def (timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] | |||||
| countByKey [pair] | 根据key数元素个数 | 类似count,作用在key-value元素项上 | def countByKey(): Map[K, Long] | val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2) c.countByKey res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1) | ||
| countByKeyApprox [pair] | ||||||
| countByValue | 根据value数个数 | 数唯一的value,出现次数;(注意:这个操作最终会在一个reducer上计算聚合) | def countByValue(): Map[T, Long] | val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b.countByValue res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1) | ||
| countByValueApprox | def countByValueApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[T, BoundedDouble]] | |||||
| countApproxDistinct | 近似排重数 | def countApproxDistinct(relativeSD: Double = 0.05): Long | val a = sc.parallelize(1 to 10000, 20) val b = a++a++a++a++a b.countApproxDistinct(0.1) res14: Long = 8224 b.countApproxDistinct(0.05) res15: Long = 9750 b.countApproxDistinct(0.01) res16: Long = 9947 b.countApproxDistinct(0.001) res0: Long = 10000 | |||
| countApproxDistinctByKey [pair] | key分组求近似排重数 | def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] | val a = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) val b = sc.parallelize(a.takeSample(true, 10000, 0), 20) val c = sc.parallelize(1 to b.count().toInt, 20) val d = b.zip(c) d.countApproxDistinctByKey(0.1).collect res15: Array[(String, Long)] = Array((Rat,2567), (Cat,3357), (Dog,2414), (Gnu,2494)) d.countApproxDistinctByKey(0.01).collect res16: Array[(String, Long)] = Array((Rat,2555), (Cat,2455), (Dog,2425), (Gnu,2513)) d.countApproxDistinctByKey(0.001).collect res0: Array[(String, Long)] = Array((Rat,2562), (Cat,2464), (Dog,2451), (Gnu,2521)) | |||
| dependencies | 依赖 | 依赖的Rdd链条 | final def dependencies: Seq[Dependency[_]] | val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:12 b.dependencies.length Int = 0 b.map(a => a).dependencies.length res40: Int = 1 b.cartesian(a).dependencies.length res41: Int = 2 b.cartesian(a).dependencies res42: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.rdd.CartesianRDD$$anon$1@576ddaaa, org.apache.spark.rdd.CartesianRDD$$anon$2@6d2efbbd) | ||
| distinct | 排重 | 返回一个排重的即一个值只出现一次的新的RDD | def distinct(): RDD[T] def distinct(numPartitions: Int): RDD[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat) val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) a.distinct(2).partitions.length res16: Int = 2 a.distinct(3).partitions.length res17: Int = 3 | ||
| first | 第一条 | 返回RDD的第一个数据元素 | def first(): T | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.first res1: String = Gnu | ||
| filter | 过滤 | 根据函数返回值过滤记录生成新的RDD | def filter(f: T => Boolean): RDD[T] | val a = sc.parallelize(1 to 10, 3) val b = a.filter(_ % 2 == 0) b.collect res3: Array[Int] = Array(2, 4, 6, 8, 10) | val b = sc.parallelize(1 to 8) b.filter(_ < 4).collect res15: Array[Int] = Array(1, 2, 3) val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.filter(_ < 4).collect <console>:15: error: value < is not a member of Any | val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.collect({case a: Int => "is integer" | case b: String => "is string" }).collect res17: Array[String] = Array(is string, is string, is integer, is string) val myfunc: PartialFunction[Any, Any] = { case a: Int => "is integer" | case b: String => "is string" } myfunc.isDefinedAt("") res21: Boolean = true myfunc.isDefinedAt(1) res22: Boolean = true myfunc.isDefinedAt(1.5) res23: Boolean = false |
| filterWith | 条件过滤 | 第一个函数将分区编号转换,第二个函数根据值和转换后的项生存过滤函数; 用该函数过滤生存新的RDD | def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] | val a = sc.parallelize(1 to 9, 3) val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0) b.collect res37: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9) val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5) a.filterWith(x=> x)((a, b) => b == 0).collect res30: Array[Int] = Array(1, 2) a.filterWith(x=> x)((a, b) => a % (b+1) == 0).collect res33: Array[Int] = Array(1, 2, 4, 6, 8, 10) a.filterWith(x=> x.toString)((a, b) => b == "2").collect res34: Array[Int] = Array(5, 6) | ||
| flatMap | 扁平遍历 | 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) | def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | val a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3) // The program below generates a random number of copies (up to 10) of the items in the list. val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) | ||
| flatMapValues [pair] | 扁平遍历values | 非常像mapValues,但在mapping过程中把继承的序列结构折叠去掉扁平展开 | def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.flatMapValues("x" + _ + "x").collect res6: Array[(Int, Char)] = Array((3,x), (3,d), (3,o), (3,g), (3,x), (5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), (4,x), (4,l), (4,i), (4,o), (4,n), (4,x), (3,x), (3,c), (3,a), (3,t), (3,x), (7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x), (5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x)) | ||
| flatMapWith | 类似flatMap,但接受将分区编号转换;是否保留原来的分区 | def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U] | val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) | |||
| fold | 聚合折叠 | 聚合每个分区,多个分区的结果;初始值都是zeroValue | def fold(zeroValue: T)(op: (T, T) => T): T | val a = sc.parallelize(List(1,2,3), 3) a.fold(0)(_ + _) res59: Int = 6 | ||
| foldByKey [pair] | 根据key折叠 | def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res84: Array[(Int, String)] = Array((3,dogcatowlgnuant) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) | |||
| foreach | 遍历 | 对每个元素进行一个屋返回值无参数的操作 | def foreach(f: T => Unit) | val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3) c.foreach(x => println(x + "s are yummy")) lions are yummy gnus are yummy crocodiles are yummy ants are yummy whales are yummy dolphins are yummy spiders are yummy | ||
| foreachPartition | 分区遍历 | 对每个分区执行无参数操作;通过iterator参数传递分区的元素 | def foreachPartition(f: Iterator[T] => Unit) | val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) b.foreachPartition(x => println(x.reduce(_ + _))) 6 15 24 | ||
| foreachWith | 条件分区遍历 | 同foreachPartition ,给分区编号 | def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) | val a = sc.parallelize(1 to 9, 3) a.foreachWith(i => i)((x,i) => if (x % 2 == 1 && i % 2 == 0) println(x) ) 1 3 7 9 | ||
| generator, setGenerator | 依赖链条 | 为方便打印依赖,追加到RDD名称后的标示 | @transient var generator def setGenerator(_generator: String) | |||
| getCheckpointFile | 返回检查点文件 | 获取checkpoint文件,或者还没生成checkpoint返回null | def getCheckpointFile: Option[String] | sc.setCheckpointDir("/home/cloudera/Documents") val a = sc.parallelize(1 to 500, 5) val b = a++a++a++a++a b.getCheckpointFile res49: Option[String] = None b.checkpoint b.getCheckpointFile res54: Option[String] = None b.collect b.getCheckpointFile res57: Option[String] = Some(file:/home/cloudera/Documents/cb978ffb-a346-4820-b3ba-d56580787b20/rdd-40) | ||
| preferredLocations | 位置偏好 | 偏好位置,依赖各种推测 | final def preferredLocations(split: Partition): Seq[String] | |||
| getStorageLevel | 存储级别 | 只能给还没存储的RDD设定存储类别 | def getStorageLevel | val a = sc.parallelize(1 to 100000, 2) a.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) a.getStorageLevel.description String = Disk Serialized 1x Replicated a.cache java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level | ||
| glom | 获取所有 | 分配一个大数组包含所有分区的所有的元素。 | def glom(): RDD[Array[T]] | val a = sc.parallelize(1 to 100, 3) a.glom.collect res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)) | ||
| groupBy | 分组 | def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] | val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9))) val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc).collect res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9))) val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(x => myfunc(x), 3).collect a.groupBy(myfunc(_), 1).collect res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9))) import org.apache.spark.Partitioner class MyPartitioner extends Partitioner { def numPartitions: Int = 2 def getPartition(key: Any): Int = { key match { case null => 0 case key: Int => key % numPartitions case _ => key.hashCode % numPartitions } } override def equals(other: Any): Boolean = { other match { case h: MyPartitioner => true case _ => false } } } val a = sc.parallelize(1 to 9, 3) val p = new MyPartitioner() val b = a.groupBy((x:Int) => { x }, p) val c = b.mapWith(i => i)((a, b) => (b, a)) c.collect res42: Array[(Int, (Int, Seq[Int]))] = Array((0,(4,ArrayBuffer(4))), (0,(2,ArrayBuffer(2))), (0,(6,ArrayBuffer(6))), (0,(8,ArrayBuffer(8))), (1,(9,ArrayBuffer(9))), (1,(3,ArrayBuffer(3))), (1,(1,ArrayBuffer(1))), (1,(7,ArrayBuffer(7))), (1,(5,ArrayBuffer(5)))) | |||
| groupByKey [pair] | 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task | def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) b.groupByKey.collect res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle))) | |||
| histogram [Double] | def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] | val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3) a.histogram(5) res11: (Array[Double], Array[Long]) = (Array(1.1, 2.68, 4.26, 5.84, 7.42, 9.0),Array(5, 0, 0, 1, 4)) val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.histogram(6) res18: (Array[Double], Array[Long]) = (Array(1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0),Array(6, 0, 1, 1, 3, 4)) | val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3) a.histogram(Array(0.0, 3.0, 8.0)) res14: Array[Long] = Array(5, 3) val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.histogram(Array(0.0, 5.0, 10.0)) res1: Array[Long] = Array(6, 9) a.histogram(Array(0.0, 5.0, 10.0, 15.0)) res1: Array[Long] = Array(6, 8, 1) | |||
| id | id | 设备上下文分配的rdd id | val id: Int | val y = sc.parallelize(1 to 10, 10) y.id res16: Int = 19 | ||
| intersection | 交集 | 返回相同的元素 | def intersection(other: RDD[T], numPartitions: Int): RDD[T] def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def intersection(other: RDD[T]): RDD[T] | val x = sc.parallelize(1 to 20) val y = sc.parallelize(10 to 30) val z = x.intersection(y) z.collect res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11) | ||
| isCheckpointed | 是否已设置检查点过 | 返回是否被checkpoint | def isCheckpointed: Boolean | sc.setCheckpointDir("/home/cloudera/Documents") c.isCheckpointed res6: Boolean = false c.checkpoint c.isCheckpointed res8: Boolean = false c.collect c.isCheckpointed res9: Boolean = true | ||
| iterator | 返回一个实现iterator 特质的RDD分区遍历对象 | 系统内部调用 | final def iterator(split: Partition, context: TaskContext): Iterator[T] | |||
| join [pair] | inner join | 使用两个key-value RDD进行内链接;注意key必须是可比较的 | def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.join(d).collect res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee))) | ||
| keyBy | 构建一个key-value结构的rdd,根据函数生存的值变为每个元素的key | def keyBy[K](f: T => K): RDD[(K, T)] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) b.collect res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)) | |||
| keys [pair] | 返回每个元组的key作为新的rdd | def keys: RDD[K] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.keys.collect res2: Array[Int] = Array(3, 5, 4, 3, 7, 5) | |||
| leftOuterJoin [pair] | left outer join ;key 必须可比较 | def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.leftOuterJoin(d).collect res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None))) | |||
| lookup [pair] | 搜索key-value项;返回一个seq形式的key对应value的值序列 | def lookup(key: K): Seq[V] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.lookup(5) res0: Seq[String] = WrappedArray(tiger, eagle) | |||
| map | 遍历 | map是对RDD中的每个元素都执行一个指定的变换函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 | def map[U: ClassTag](f: T => U): RDD[U] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) | scala> val a = sc.parallelize(1 to 9, 3) scala> val b = a.map(x => x*2) scala> a.collect res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> b.collect res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18) | |
| mapPartitions | 每个分区只执行一次回调函数; | def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) | val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) //using flatmap val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) | ||
| mapPartitionsWithContext | 类似mapPartions,但允许查看运行状态 | def mapPartitionsWithContext[U: ClassTag](f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | val a = sc.parallelize(1 to 9, 3) import org.apache.spark.TaskContext def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = { tc.addOnCompleteCallback(() => println( "Partition: " + tc.partitionId + ", AttemptID: " + tc.attemptId )) iter.toList.filter(_ % 2 == 0).iterator } a.mapPartitionsWithContext(myfunc).collect 14/04/01 23:05:48 INFO SparkContext: Starting job: collect at <console>:20 ... 14/04/01 23:05:48 INFO Executor: Running task ID 0 Partition: 0, AttemptID: 0, Interrupted: false ... 14/04/01 23:05:48 INFO Executor: Running task ID 1 14/04/01 23:05:48 INFO TaskSetManager: Finished TID 0 in 470 ms on localhost (progress: 0/3) ... 14/04/01 23:05:48 INFO Executor: Running task ID 2 14/04/01 23:05:48 INFO TaskSetManager: Finished TID 1 in 23 ms on localhost (progress: 1/3) 14/04/01 23:05:48 INFO DAGScheduler: Completed ResultTask(0, 1) ? res0: Array[Int] = Array(2, 6, 4, 8) | |||
| mapPartitionsWithIndex | 类似mapPartitions,允许传入分区编号 | def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator } x.mapPartitionsWithIndex(myfunc).collect() res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10) | |||
| mapPartitionsWithSplit | [废弃] | deprecated | def mapPartitionsWithSplit[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | |||
| mapValues [pair] | 遍历valus; | def mapValues[U](f: V => U): RDD[(K, U)] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.mapValues("x" + _ + "x").collect res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex)) | |||
| mapWith (deprecated) | map扩展 | def mapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U] | // generates 9 random numbers less than 1000. val x = sc.parallelize(1 to 9, 3) x.mapWith(a => new scala.util.Random)((x, r) => r.nextInt(1000)).collect res0: Array[Int] = Array(940, 51, 779, 742, 757, 982, 35, 800, 15) val a = sc.parallelize(1 to 9, 3) val b = a.mapWith("Index:" + _)((a, b) => ("Value:" + a, b)) b.collect res0: Array[(String, String)] = Array((Value:1,Index:0), (Value:2,Index:0), (Value:3,Index:0), (Value:4,Index:1), (Value:5,Index:1), (Value:6,Index:1), (Value:7,Index:2), (Value:8,Index:2), (Value:9,Index) | |||
| max | 最大值 | 最大值 | def max()(implicit ord: Ordering[T]): T | val y = sc.parallelize(10 to 30) y.max res75: Int = 30 | ||
| mean [Double], meanApprox [Double] | 平均数 | 平均数 | def mean(): Double def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] | val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.mean res0: Double = 5.3 | ||
| min | 最小值 | 最小值 | def min()(implicit ord: Ordering[T]): T | val y = sc.parallelize(10 to 30) y.min res75: Int = 10 | ||
| name, setName | RDD别名 | 给RDD命名 | @transient var name: String def setName(_name: String) | val y = sc.parallelize(1 to 10, 10) y.name res13: String = null y.setName("Fancy RDD Name") y.name res15: String = Fancy RDD Name | ||
| partitionBy [Pair] | 重新分区 | def partitionBy(partitioner: Partitioner): RDD[(K, V)] | ||||
| partitioner | 为groupBy, subtract, reduceByKey (from PairedRDDFunctions)指定分区函数 | @transient val partitioner: Option[Partitioner] | ||||
| partitions | RDD分区数组 | final def partitions: Array[Partition] | val b = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) b.partitions res48: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@18aa, org.apache.spark.rdd.ParallelCollectionPartition@18ab) | |||
| persist, cache | 保存 | 无参数的persist() cache(),使用StorageLevel.MEMORY_ONLY保存 storage level 不能重复设定 | def cache(): RDD[T] def persist(): RDD[T] def persist(newLevel: StorageLevel): RDD[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.getStorageLevel res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1) c.cache c.getStorageLevel res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1) | ||
| pipe | 每个分区数据通过stdin执行一个shell命令 | def pipe(command: String): RDD[String] def pipe(command: String, env: Map[String, String]): RDD[String] def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null): RDD[String] | val a = sc.parallelize(1 to 9, 3) a.pipe("head -n 1").collect res2: Array[String] = Array(1, 4, 7) | |||
| randomSplit | 根据Array百分比随机拆分RDD为多个RDDs | def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] | val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) val test = splits(1) training.collect res:85 Array[Int] = Array(1, 4, 5, 6, 8, 10) test.collect res86: Array[Int] = Array(2, 3, 7, 9) val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.1, 0.3, 0.6)) val rdd1 = splits(0) val rdd2 = splits(1) val rdd3 = splits(2) rdd1.collect res87: Array[Int] = Array(4, 10) rdd2.collect res88: Array[Int] = Array(1, 3, 5, 8) rdd3.collect res91: Array[Int] = Array(2, 6, 7, 9) | |||
| reduce | 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行 | def reduce(f: (T, T) => T): T | val a = sc.parallelize(1 to 100, 3) a.reduce(_ + _) res41: Int = 5050 | |||
| reduceByKey [Pair], reduceByKeyLocally[Pair], reduceByKeyToDriver[Pair] | 同上 | 同上 | def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def reduceByKeyLocally(func: (V, V) => V): Map[K, V] def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res86: Array[(Int, String)] = Array((3,dogcatowlgnuant)) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) | ||
| rightOuterJoin [Pair] | right outer join | def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.rightOuterJoin(d).collect res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear))) | |||
| sample | 抽样 | 生存抽样RDD | def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] | val a = sc.parallelize(1 to 10000, 3) a.sample(false, 0.1, 0).count res24: Long = 960 a.sample(true, 0.3, 0).count res25: Long = 2888 a.sample(true, 0.3, 13).count res26: Long = 2985 | ||
| saveAsHodoopFile [Pair], saveAsHadoopDataset [Pair], saveAsNewAPIHadoopFile [Pair] | 保存到hadoop | def saveAsHadoopDataset(conf: JobConf) def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]) def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration) | ||||
| saveAsObjectFile | 保存到二进制对象文件 | def saveAsObjectFile(path: String) | val x = sc.parallelize(1 to 100, 3) x.saveAsObjectFile("objFile") val y = sc.objectFile[Array[Int]]("objFile") y.collect res52: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33) | |||
| saveAsSequenceFile [SeqFile] | 保存为hadoop sequence file | def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) | val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2) v.saveAsSequenceFile("hd_seq_file") 14/04/19 05:45:43 INFO FileOutputCommitter: Saved output of task 'attempt_201404190545_0000_m_000001_191' to file:/home/cloudera/hd_seq_file [cloudera@localhost ~]$ ll ~/hd_seq_file total 8 -rwxr-xr-x 1 cloudera cloudera 117 Apr 19 05:45 part-00000 -rwxr-xr-x 1 cloudera cloudera 133 Apr 19 05:45 part-00001 -rwxr-xr-x 1 cloudera cloudera 0 Apr 19 05:45 _SUCCESS | |||
| saveAsTextFile | 保存到文本 | 一条一行 | def saveAsTextFile(path: String) def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) | val a = sc.parallelize(1 to 10000, 3) a.saveAsTextFile("mydata_a") 14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a [cloudera@localhost ~]$ head -n 5 ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/part-00000 1 2 3 4 5 // Produces 3 output files since we have created the a RDD with 3 partitions [cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/ -rwxr-xr-x 1 cloudera cloudera 15558 Apr 3 21:11 part-00000 -rwxr-xr-x 1 cloudera cloudera 16665 Apr 3 21:11 part-00001 -rwxr-xr-x 1 cloudera cloudera 16671 Apr 3 21:11 part-00002 | import org.apache.hadoop.io.compress.GzipCodec a.saveAsTextFile("mydata_b", classOf[GzipCodec]) [cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_b/ total 24 -rwxr-xr-x 1 cloudera cloudera 7276 Apr 3 21:29 part-00000.gz -rwxr-xr-x 1 cloudera cloudera 6517 Apr 3 21:29 part-00001.gz -rwxr-xr-x 1 cloudera cloudera 6525 Apr 3 21:29 part-00002.gz val x = sc.textFile("mydata_b") x.count res2: Long = 10000 | val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)x.saveAsTextFile("hdfs://localhost:8020/user/cloudera/test"); val sp = sc.textFile("hdfs://localhost:8020/user/cloudera/sp_data")sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/cloudera/sp_x") |
| stats [Double] | 统计 | 类似mean,但包含更多信息 | def stats(): StatCounter | val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.stats res16: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859) | ||
| sortBy | 排序 | 根据函数返回值进行排序 | def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] | val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1)) y.sortBy(c => c, true).collect res101: Array[Int] = Array(1, 1, 2, 3, 5, 7) y.sortBy(c => c, false).collect res102: Array[Int] = Array(7, 5, 3, 2, 1, 1) val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5))) z.sortBy(c => c._1, true).collect res109: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1)) z.sortBy(c => c._2, true).collect res108: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26)) | ||
| sortByKey [Ordered] | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = sc.parallelize(1 to a.count.toInt, 2) val c = a.zip(b) c.sortByKey(true).collect res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)) c.sortByKey(false).collect res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5)) val a = sc.parallelize(1 to 100, 5) val b = a.cartesian(a) val c = sc.parallelize(b.takeSample(true, 5, 13), 2) val d = c.sortByKey(false) res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4)) | ||||
| stdev [Double], sampleStdev [Double] | 标准偏差 | 估算标准偏差 | def stdev(): Double def sampleStdev(): Double | val d = sc.parallelize(List(0.0, 0.0, 0.0), 3) d.stdev res10: Double = 0.0 d.sampleStdev res11: Double = 0.0 val d = sc.parallelize(List(0.0, 1.0), 3) d.stdev d.sampleStdev res18: Double = 0.5 res19: Double = 0.7071067811865476 val d = sc.parallelize(List(0.0, 0.0, 1.0), 3) d.stdev res14: Double = 0.4714045207910317 d.sampleStdev res15: Double = 0.5773502691896257 | ||
| subtract | 去除 | A - B | def subtract(other: RDD[T]): RDD[T] def subtract(other: RDD[T], numPartitions: Int): RDD[T] def subtract(other: RDD[T], p: Partitioner): RDD[T] | val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8) | ||
| subtractByKey [Pair] | 根据Key去除 | def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) val c = sc.parallelize(List("ant", "falcon", "squid"), 2) val d = c.keyBy(_.length) b.subtractByKey(d).collect res15: Array[(Int, String)] = Array((4,lion)) | |||
| sum [Double], sumApprox[Double] | 求和 | 相加 | def sum(): Double def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] | val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.sum res17: Double = 101.39999999999999 | ||
| take | 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用) | def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] | ||||
| takeOrdered | 取排序后的前n个 | def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] | val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2) b.takeOrdered(2) res19: Array[String] = Array(ape, cat) | |||
| takeSample | 相对sample,返回一个Array;确切返回n个;顺序随机 | def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] | val x = sc.parallelize(1 to 1000, 3) x.takeSample(true, 100, 1) res3: Array[Int] = Array(339, 718, 810, 105, 71, 268, 333, 360, 341, 300, 68, 848, 431, 449, 773, 172, 802, 339, 431, 285, 937, 301, 167, 69, 330, 864, 40, 645, 65, 349, 613, 468, 982, 314, 160, 675, 232, 794, 577, 571, 805, 317, 136, 860, 522, 45, 628, 178, 321, 482, 657, 114, 332, 728, 901, 290, 175, 876, 227, 130, 863, 773, 559, 301, 694, 460, 839, 952, 664, 851, 260, 729, 823, 880, 792, 964, 614, 821, 683, 364, 80, 875, 813, 951, 663, 344, 546, 918, 436, 451, 397, 670, 756, 512, 391, 70, 213, 896, 123, 858) | |||
| toDebugString | 依赖信息 | def toDebugString: String | val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.toDebugString res6: String = MappedRDD[15] at subtract at <console>:16 (3 partitions) SubtractedRDD[14] at subtract at <console>:16 (3 partitions) MappedRDD[12] at subtract at <console>:16 (3 partitions) ParallelCollectionRDD[10] at parallelize at <console>:12 (3 partitions) MappedRDD[13] at subtract at <console>:16 (3 partitions) ParallelCollectionRDD[11] at parallelize at <console>:12 (3 partitions) | |||
| toJavaRDD | 转换为JavaRDD对象 | def toJavaRDD() : JavaRDD[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.toJavaRDD res3: org.apache.spark.api.java.JavaRDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:12 | |||
| top | 排序取前n个到Array | def top(num: Int)(implicit ord: Ordering[T]): Array[T] | val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2) c.top(2) res28: Array[Int] = Array(9, 8) | |||
| toString | human-readable textual | override def toString: String | val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.toString res7: String = MappedRDD[15] at subtract at <console>:16 | |||
| union, ++ | A union B | 返回一个新的数据集,由原数据集和参数联合而成 | def ++(other: RDD[T]): RDD[T] def union(other: RDD[T]): RDD[T] | val a = sc.parallelize(1 to 3, 1) val b = sc.parallelize(5 to 7, 1) (a ++ b).collect res0: Array[Int] = Array(1, 2, 3, 5, 6, 7) | ||
| unpersist | 从保存的内存中擦除;RDD对象保留 | def unpersist(blocking: Boolean = true): RDD[T] | val y = sc.parallelize(1 to 10, 10) val z = (y++y) z.collect z.unpersist(true) 14/04/19 03:04:57 INFO UnionRDD: Removing RDD 22 from persistence list 14/04/19 03:04:57 INFO BlockManager: Removing RDD 22 | |||
| values [Pair] | 值RDD | 返回所有的值作为新的RDD元素 | def values: RDD[V] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.values.collect res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle) | ||
| variance [Double], sampleVariance [Double] | 方差 | def variance(): Double def sampleVariance(): Double | val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.variance res70: Double = 10.605333333333332 val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.variance res14: Double = 66.04584444444443 x.sampleVariance res13: Double = 74.30157499999999 | |||
| zip | 拉链 | 拉链函数 | def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] | val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) a.zip(b).collect res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,... val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) val c = sc.parallelize(201 to 300, 3) a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)... | ||
| zipPartitions | 类似zip,但提供更多控制选项 | def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] | val a = sc.parallelize(0 to 9, 3) val b = sc.parallelize(10 to 19, 3) val c = sc.parallelize(100 to 109, 3) def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] = { var res = List[String]() while (aiter.hasNext && biter.hasNext && citer.hasNext) { val x = aiter.next + " " + biter.next + " " + citer.next res ::= x } res.iterator } a.zipPartitions(b, c)(myfunc).collect res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106) | |||
| zipWithIndex | 会执行一个spark job,因为序号要跨partition | def zipWithIndex(): RDD[(T, Long)] | val z = sc.parallelize(Array("A", "B", "C", "D")) val r = z.zipWithIndex res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3)) val z = sc.parallelize(100 to 120, 5) val r = z.zipWithIndex r.collect res11: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20)) | |||
| zipWithUniquId | 不会执行sparkjob,序号不与顺序有关; | def zipWithUniqueId(): RDD[(T, Long)] | val z = sc.parallelize(100 to 120, 5) val r = z.zipWithUniqueId r.collect res12: Array[(Int, Long)] = Array((100,0), (101,5), (102,10), (103,15), (104,1), (105,6), (106,11), (107,16), (108,2), (109,7), (110,12), (111,17), (112,3), (113,8), (114,13), (115,18), (116,4), (117,9), (118,14), (119,19), (120,24)) |
没有评论:
发表评论