function | 名称 | 说明 | 定义 | Examples 1 | Examples 2 | |
aggregate | 聚合 | zeroValue同时是seqOp和combOp的初始值,是流式操作的第一个值; 也就是每个分区合并计算后,zeroValue是reduce步骤的初始值; 为什么有两个合并函数?第一个是将分区内的输入转为目标类型;注意U!=T;然后进行CombOp; | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | al z = sc.parallelize(List(1,2,3,4,5,6), 2) z.aggregate(0)(math.max(_, _), _ + _) res40: Int = 9
val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("")(_ + _, _+_) res115: String = abcdef
z.aggregate("x")(_ + _, _+_) res116: String = xxdefxabc
val z = sc.parallelize(List("12","23","345","4567"),2) z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res141: String = 42
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res142: String = 11
val z = sc.parallelize(List("12","23","345",""),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res143: String = 10 | val z = sc.parallelize(List("12","23","","345"),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res144: String = 11 | |
cartesian | 笛卡尔 | 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。(例如:第一个RDD的每个元素与第二个RDD的每个元素相交;)(Warning: 小心使用这个函数;内存的消耗很快造成崩溃!) | def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] | val x = sc.parallelize(List(1,2,3,4,5)) val y = sc.parallelize(List(6,7,8,9,10)) x.cartesian(y).collect res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10)) | | |
checkpoint | 设置检查点 | 当RDD下一步要计算时进行检查点操作;检查点以二进制文件的形式保存的目录里; 注意:如果是本地目录,必须每个节点都有这个目录; 当然也可以用HDFS代替; | def checkpoint() | sc.setCheckpointDir("my_directory_name") val a = sc.parallelize(1 to 4) a.checkpoint a.count 14/02/25 18:13:53 INFO SparkContext: Starting job: count at <console>:15 ... 14/02/25 18:13:53 INFO MemoryStore: Block broadcast_5 stored as values to memory (estimated size 115.7 KB, free 296.3 MB) 14/02/25 18:13:53 INFO RDDCheckpointData: Done checkpointing RDD 11 to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/my_directory_name/65407913-fdc6-4ec1-82c9-48a1656b95d6/rdd-11, new parent is RDD 12 res23: Long = 4 | | |
coalesce, repartition | 聚合接合;重新分区 | 重新分配分区个数;repartition(n)相当于coalesce(n,shuffer=true) | def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T] def repartition ( numPartitions : Int ): RDD [T] | val y = sc.parallelize(1 to 10, 10) val z = y.coalesce(2, false) z.partitions.length res9: Int = 2 | | |
cogroup [pair], groupWith [pair] | 分组 | 非常强大的一组分组功能;可以用key对3组key-value分组在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup | def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))] | val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.map((_, "b")) val c = a.map((_, "c")) b.cogroup(c).collect res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c))), (3,(ArrayBuffer(b),ArrayBuffer(c))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c))) )
val d = a.map((_, "d")) b.cogroup(c, d).collect res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d))) )
val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2) val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2) x.cogroup(y).collect res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), (2,(ArrayBuffer(banana),ArrayBuffer())), (3,(ArrayBuffer(orange),ArrayBuffer())), (1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))), (5,(ArrayBuffer(),ArrayBuffer(computer)))) | | |
collect, toArray | 输出集合 | 将rdd转换为scala数组形式输出,如果你提供一个标准的map-function (例如: f = T -> U),它将在输出到数组之前做转换; 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM | def collect(): Array[T] def collect[U:ClassTag](f:PartialFunction[T, U]): RDD[U] def toArray(): Array[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.collect res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat) | | |
collectAsMap [pair] | 输出Pair RDD为map结构 | 如同collect;但作用在key-value RDD上,转换为scala map结构对应的key-value | def collectAsMap(): Map[K, V] | val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.zip(a) b.collectAsMap res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3) | | |
combineByKey [pair] | 根据key合并 | 根据二元组一个接一个得应用合并器:创建,合并值,合并集合 | def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] | val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val c = b.zip(a) val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y) d.collect res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf))) | | |
compute | | 根据依赖开始计算,内部调用。 | def compute(split: Partition, context: TaskContext): Iterator[T] | | | |
context, sparkContext | SparkContext | 返回创建RDD使用的sparkContext | def compute(split: Partition, context: TaskContext): Iterator[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.context res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58c1c2f1 | | |
count | 元素数 | 返回RDD内部对应的数据集的元素个数 | def count(): Long | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.count res2: Long = 4 | | |
countApprox | | | def (timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] | | | |
countByKey [pair] | 根据key数元素个数 | 类似count,作用在key-value元素项上 | def countByKey(): Map[K, Long] | val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2) c.countByKey res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1) | | |
countByKeyApprox [pair] | | | | | | |
countByValue | 根据value数个数 | 数唯一的value,出现次数;(注意:这个操作最终会在一个reducer上计算聚合) | def countByValue(): Map[T, Long] | val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b.countByValue res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1) | | |
countByValueApprox | | | def countByValueApprox(timeout: Long, confidence: Double = 0.95): PartialResult[Map[T, BoundedDouble]] | | | |
countApproxDistinct | 近似排重数 | | def countApproxDistinct(relativeSD: Double = 0.05): Long | val a = sc.parallelize(1 to 10000, 20) val b = a++a++a++a++a b.countApproxDistinct(0.1) res14: Long = 8224
b.countApproxDistinct(0.05) res15: Long = 9750
b.countApproxDistinct(0.01) res16: Long = 9947
b.countApproxDistinct(0.001) res0: Long = 10000 | | |
countApproxDistinctByKey [pair] | key分组求近似排重数 | | def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] | val a = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) val b = sc.parallelize(a.takeSample(true, 10000, 0), 20) val c = sc.parallelize(1 to b.count().toInt, 20) val d = b.zip(c) d.countApproxDistinctByKey(0.1).collect res15: Array[(String, Long)] = Array((Rat,2567), (Cat,3357), (Dog,2414), (Gnu,2494))
d.countApproxDistinctByKey(0.01).collect res16: Array[(String, Long)] = Array((Rat,2555), (Cat,2455), (Dog,2425), (Gnu,2513))
d.countApproxDistinctByKey(0.001).collect res0: Array[(String, Long)] = Array((Rat,2562), (Cat,2464), (Dog,2451), (Gnu,2521)) | | |
dependencies | 依赖 | 依赖的Rdd链条 | final def dependencies: Seq[Dependency[_]] | val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:12 b.dependencies.length Int = 0
b.map(a => a).dependencies.length res40: Int = 1
b.cartesian(a).dependencies.length res41: Int = 2
b.cartesian(a).dependencies res42: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.rdd.CartesianRDD$$anon$1@576ddaaa, org.apache.spark.rdd.CartesianRDD$$anon$2@6d2efbbd) | | |
distinct | 排重 | 返回一个排重的即一个值只出现一次的新的RDD | def distinct(): RDD[T] def distinct(numPartitions: Int): RDD[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) a.distinct(2).partitions.length res16: Int = 2
a.distinct(3).partitions.length res17: Int = 3 | | |
first | 第一条 | 返回RDD的第一个数据元素 | def first(): T | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.first res1: String = Gnu | | |
filter | 过滤 | 根据函数返回值过滤记录生成新的RDD | def filter(f: T => Boolean): RDD[T] | val a = sc.parallelize(1 to 10, 3) val b = a.filter(_ % 2 == 0) b.collect res3: Array[Int] = Array(2, 4, 6, 8, 10) | val b = sc.parallelize(1 to 8) b.filter(_ < 4).collect res15: Array[Int] = Array(1, 2, 3)
val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.filter(_ < 4).collect <console>:15: error: value < is not a member of Any | val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.collect({case a: Int => "is integer" | case b: String => "is string" }).collect res17: Array[String] = Array(is string, is string, is integer, is string)
val myfunc: PartialFunction[Any, Any] = { case a: Int => "is integer" | case b: String => "is string" } myfunc.isDefinedAt("") res21: Boolean = true
myfunc.isDefinedAt(1) res22: Boolean = true
myfunc.isDefinedAt(1.5) res23: Boolean = false |
filterWith | 条件过滤 | 第一个函数将分区编号转换,第二个函数根据值和转换后的项生存过滤函数; 用该函数过滤生存新的RDD | def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] | val a = sc.parallelize(1 to 9, 3) val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0) b.collect res37: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5) a.filterWith(x=> x)((a, b) => b == 0).collect res30: Array[Int] = Array(1, 2)
a.filterWith(x=> x)((a, b) => a % (b+1) == 0).collect res33: Array[Int] = Array(1, 2, 4, 6, 8, 10)
a.filterWith(x=> x.toString)((a, b) => b == "2").collect res34: Array[Int] = Array(5, 6) | | |
flatMap | 扁平遍历 | 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) | def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | val a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
// The program below generates a random number of copies (up to 10) of the items in the list. val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) | | |
flatMapValues [pair] | 扁平遍历values | 非常像mapValues,但在mapping过程中把继承的序列结构折叠去掉扁平展开 | def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.flatMapValues("x" + _ + "x").collect res6: Array[(Int, Char)] = Array((3,x), (3,d), (3,o), (3,g), (3,x), (5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), (4,x), (4,l), (4,i), (4,o), (4,n), (4,x), (3,x), (3,c), (3,a), (3,t), (3,x), (7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x), (5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x)) | | |
flatMapWith | | 类似flatMap,但接受将分区编号转换;是否保留原来的分区 | def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U] | val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) | | |
fold | 聚合折叠 | 聚合每个分区,多个分区的结果;初始值都是zeroValue | def fold(zeroValue: T)(op: (T, T) => T): T | val a = sc.parallelize(List(1,2,3), 3) a.fold(0)(_ + _) res59: Int = 6 | | |
foldByKey [pair] | 根据key折叠 | | def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res84: Array[(Int, String)] = Array((3,dogcatowlgnuant)
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) | | |
foreach | 遍历 | 对每个元素进行一个屋返回值无参数的操作 | def foreach(f: T => Unit) | val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3) c.foreach(x => println(x + "s are yummy")) lions are yummy gnus are yummy crocodiles are yummy ants are yummy whales are yummy dolphins are yummy spiders are yummy | | |
foreachPartition | 分区遍历 | 对每个分区执行无参数操作;通过iterator参数传递分区的元素 | def foreachPartition(f: Iterator[T] => Unit) | val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) b.foreachPartition(x => println(x.reduce(_ + _))) 6 15 24 | | |
foreachWith | 条件分区遍历 | 同foreachPartition ,给分区编号 | def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) | val a = sc.parallelize(1 to 9, 3) a.foreachWith(i => i)((x,i) => if (x % 2 == 1 && i % 2 == 0) println(x) ) 1 3 7 9 | | |
generator, setGenerator | 依赖链条 | 为方便打印依赖,追加到RDD名称后的标示 | @transient var generator def setGenerator(_generator: String) | | | |
getCheckpointFile | 返回检查点文件 | 获取checkpoint文件,或者还没生成checkpoint返回null | def getCheckpointFile: Option[String] | sc.setCheckpointDir("/home/cloudera/Documents") val a = sc.parallelize(1 to 500, 5) val b = a++a++a++a++a b.getCheckpointFile res49: Option[String] = None
b.checkpoint b.getCheckpointFile res54: Option[String] = None
b.collect b.getCheckpointFile res57: Option[String] = Some(file:/home/cloudera/Documents/cb978ffb-a346-4820-b3ba-d56580787b20/rdd-40) | | |
preferredLocations | 位置偏好 | 偏好位置,依赖各种推测 | final def preferredLocations(split: Partition): Seq[String] | | | |
getStorageLevel | 存储级别 | 只能给还没存储的RDD设定存储类别 | def getStorageLevel | val a = sc.parallelize(1 to 100000, 2) a.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) a.getStorageLevel.description String = Disk Serialized 1x Replicated
a.cache java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level | | |
glom | 获取所有 | 分配一个大数组包含所有分区的所有的元素。 | def glom(): RDD[Array[T]] | val a = sc.parallelize(1 to 100, 3) a.glom.collect res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)) | | |
groupBy | 分组 | | def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] | val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))
val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc).collect res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(x => myfunc(x), 3).collect a.groupBy(myfunc(_), 1).collect res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
import org.apache.spark.Partitioner class MyPartitioner extends Partitioner { def numPartitions: Int = 2 def getPartition(key: Any): Int = { key match { case null => 0 case key: Int => key % numPartitions case _ => key.hashCode % numPartitions } } override def equals(other: Any): Boolean = { other match { case h: MyPartitioner => true case _ => false } } } val a = sc.parallelize(1 to 9, 3) val p = new MyPartitioner() val b = a.groupBy((x:Int) => { x }, p) val c = b.mapWith(i => i)((a, b) => (b, a)) c.collect res42: Array[(Int, (Int, Seq[Int]))] = Array((0,(4,ArrayBuffer(4))), (0,(2,ArrayBuffer(2))), (0,(6,ArrayBuffer(6))), (0,(8,ArrayBuffer(8))), (1,(9,ArrayBuffer(9))), (1,(3,ArrayBuffer(3))), (1,(1,ArrayBuffer(1))), (1,(7,ArrayBuffer(7))), (1,(5,ArrayBuffer(5)))) | | |
groupByKey [pair] | | 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task | def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) b.groupByKey.collect res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle))) | | |
histogram [Double] | | | def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] | val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3) a.histogram(5) res11: (Array[Double], Array[Long]) = (Array(1.1, 2.68, 4.26, 5.84, 7.42, 9.0),Array(5, 0, 0, 1, 4))
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.histogram(6) res18: (Array[Double], Array[Long]) = (Array(1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0),Array(6, 0, 1, 1, 3, 4)) | val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3) a.histogram(Array(0.0, 3.0, 8.0)) res14: Array[Long] = Array(5, 3)
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.histogram(Array(0.0, 5.0, 10.0)) res1: Array[Long] = Array(6, 9)
a.histogram(Array(0.0, 5.0, 10.0, 15.0)) res1: Array[Long] = Array(6, 8, 1) | |
id | id | 设备上下文分配的rdd id | val id: Int | val y = sc.parallelize(1 to 10, 10) y.id res16: Int = 19 | | |
intersection | 交集 | 返回相同的元素 | def intersection(other: RDD[T], numPartitions: Int): RDD[T] def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def intersection(other: RDD[T]): RDD[T] | val x = sc.parallelize(1 to 20) val y = sc.parallelize(10 to 30) val z = x.intersection(y)
z.collect res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11) | | |
isCheckpointed | 是否已设置检查点过 | 返回是否被checkpoint | def isCheckpointed: Boolean | sc.setCheckpointDir("/home/cloudera/Documents") c.isCheckpointed res6: Boolean = false
c.checkpoint c.isCheckpointed res8: Boolean = false
c.collect c.isCheckpointed res9: Boolean = true | | |
iterator | 返回一个实现iterator 特质的RDD分区遍历对象 | 系统内部调用 | final def iterator(split: Partition, context: TaskContext): Iterator[T] | | | |
join [pair] | inner join | 使用两个key-value RDD进行内链接;注意key必须是可比较的 | def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee))) | | |
keyBy | | 构建一个key-value结构的rdd,根据函数生存的值变为每个元素的key | def keyBy[K](f: T => K): RDD[(K, T)] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) b.collect res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)) | | |
keys [pair] | | 返回每个元组的key作为新的rdd | def keys: RDD[K] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.keys.collect res2: Array[Int] = Array(3, 5, 4, 3, 7, 5) | | |
leftOuterJoin [pair] | | left outer join ;key 必须可比较 | def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.leftOuterJoin(d).collect
res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None))) | | |
lookup [pair] | | 搜索key-value项;返回一个seq形式的key对应value的值序列 | def lookup(key: K): Seq[V] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.lookup(5) res0: Seq[String] = WrappedArray(tiger, eagle) | | |
map | 遍历 | map是对RDD中的每个元素都执行一个指定的变换函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 | def map[U: ClassTag](f: T => U): RDD[U] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) | scala> val a = sc.parallelize(1 to 9, 3) scala> val b = a.map(x => x*2) scala> a.collect res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> b.collect res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18) | |
mapPartitions | | 每个分区只执行一次回调函数; | def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) | val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
//using flatmap val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) | |
mapPartitionsWithContext | | 类似mapPartions,但允许查看运行状态 | def mapPartitionsWithContext[U: ClassTag](f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | val a = sc.parallelize(1 to 9, 3) import org.apache.spark.TaskContext def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = { tc.addOnCompleteCallback(() => println( "Partition: " + tc.partitionId + ", AttemptID: " + tc.attemptId )) iter.toList.filter(_ % 2 == 0).iterator } a.mapPartitionsWithContext(myfunc).collect
14/04/01 23:05:48 INFO SparkContext: Starting job: collect at <console>:20 ... 14/04/01 23:05:48 INFO Executor: Running task ID 0 Partition: 0, AttemptID: 0, Interrupted: false ... 14/04/01 23:05:48 INFO Executor: Running task ID 1 14/04/01 23:05:48 INFO TaskSetManager: Finished TID 0 in 470 ms on localhost (progress: 0/3) ... 14/04/01 23:05:48 INFO Executor: Running task ID 2 14/04/01 23:05:48 INFO TaskSetManager: Finished TID 1 in 23 ms on localhost (progress: 1/3) 14/04/01 23:05:48 INFO DAGScheduler: Completed ResultTask(0, 1)
? res0: Array[Int] = Array(2, 6, 4, 8) | | |
mapPartitionsWithIndex | | 类似mapPartitions,允许传入分区编号 | def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator } x.mapPartitionsWithIndex(myfunc).collect() res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10) | | |
mapPartitionsWithSplit | [废弃] | deprecated | def mapPartitionsWithSplit[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
| | | |
mapValues [pair] | | 遍历valus; | def mapValues[U](f: V => U): RDD[(K, U)] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.mapValues("x" + _ + "x").collect res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex)) | | |
mapWith (deprecated) | map扩展 | | def mapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U] | // generates 9 random numbers less than 1000. val x = sc.parallelize(1 to 9, 3) x.mapWith(a => new scala.util.Random)((x, r) => r.nextInt(1000)).collect res0: Array[Int] = Array(940, 51, 779, 742, 757, 982, 35, 800, 15)
val a = sc.parallelize(1 to 9, 3) val b = a.mapWith("Index:" + _)((a, b) => ("Value:" + a, b)) b.collect res0: Array[(String, String)] = Array((Value:1,Index:0), (Value:2,Index:0), (Value:3,Index:0), (Value:4,Index:1), (Value:5,Index:1), (Value:6,Index:1), (Value:7,Index:2), (Value:8,Index:2), (Value:9,Index) | | |
max | 最大值 | 最大值 | def max()(implicit ord: Ordering[T]): T | val y = sc.parallelize(10 to 30) y.max res75: Int = 30 | | |
mean [Double], meanApprox [Double] | 平均数 | 平均数 | def mean(): Double def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] | val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.mean res0: Double = 5.3 | | |
min | 最小值 | 最小值 | def min()(implicit ord: Ordering[T]): T | val y = sc.parallelize(10 to 30) y.min res75: Int = 10 | | |
name, setName | RDD别名 | 给RDD命名 | @transient var name: String def setName(_name: String) | val y = sc.parallelize(1 to 10, 10) y.name res13: String = null y.setName("Fancy RDD Name") y.name res15: String = Fancy RDD Name | | |
partitionBy [Pair] | | 重新分区 | def partitionBy(partitioner: Partitioner): RDD[(K, V)] | | | |
partitioner | | 为groupBy, subtract, reduceByKey (from PairedRDDFunctions)指定分区函数 | @transient val partitioner: Option[Partitioner] | | | |
partitions | | RDD分区数组 | final def partitions: Array[Partition] | val b = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) b.partitions res48: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@18aa, org.apache.spark.rdd.ParallelCollectionPartition@18ab) | | |
persist, cache | 保存 | 无参数的persist() cache(),使用StorageLevel.MEMORY_ONLY保存 storage level 不能重复设定 | def cache(): RDD[T] def persist(): RDD[T] def persist(newLevel: StorageLevel): RDD[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.getStorageLevel res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1) c.cache c.getStorageLevel res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1) | | |
pipe | | 每个分区数据通过stdin执行一个shell命令 | def pipe(command: String): RDD[String] def pipe(command: String, env: Map[String, String]): RDD[String] def pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null): RDD[String] | val a = sc.parallelize(1 to 9, 3) a.pipe("head -n 1").collect res2: Array[String] = Array(1, 4, 7) | | |
randomSplit | | 根据Array百分比随机拆分RDD为多个RDDs | def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] | val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) val test = splits(1) training.collect res:85 Array[Int] = Array(1, 4, 5, 6, 8, 10) test.collect res86: Array[Int] = Array(2, 3, 7, 9)
val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.1, 0.3, 0.6))
val rdd1 = splits(0) val rdd2 = splits(1) val rdd3 = splits(2)
rdd1.collect res87: Array[Int] = Array(4, 10) rdd2.collect res88: Array[Int] = Array(1, 3, 5, 8) rdd3.collect res91: Array[Int] = Array(2, 6, 7, 9) | | |
reduce | | 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行 | def reduce(f: (T, T) => T): T | val a = sc.parallelize(1 to 100, 3) a.reduce(_ + _) res41: Int = 5050 | | |
reduceByKey [Pair], reduceByKeyLocally[Pair], reduceByKeyToDriver[Pair] | 同上 | 同上 | def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def reduceByKeyLocally(func: (V, V) => V): Map[K, V] def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) | | |
rightOuterJoin [Pair] | | right outer join | def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.rightOuterJoin(d).collect
res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear))) | | |
sample | 抽样 | 生存抽样RDD | def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] | val a = sc.parallelize(1 to 10000, 3) a.sample(false, 0.1, 0).count res24: Long = 960
a.sample(true, 0.3, 0).count res25: Long = 2888
a.sample(true, 0.3, 13).count res26: Long = 2985 | | |
saveAsHodoopFile [Pair], saveAsHadoopDataset [Pair], saveAsNewAPIHadoopFile [Pair] | 保存到hadoop | | def saveAsHadoopDataset(conf: JobConf) def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]) def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration) | | | |
saveAsObjectFile | 保存到二进制对象文件 | | def saveAsObjectFile(path: String) | val x = sc.parallelize(1 to 100, 3) x.saveAsObjectFile("objFile") val y = sc.objectFile[Array[Int]]("objFile") y.collect res52: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33) | | |
saveAsSequenceFile [SeqFile] | 保存为hadoop sequence file | | def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) | val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2) v.saveAsSequenceFile("hd_seq_file") 14/04/19 05:45:43 INFO FileOutputCommitter: Saved output of task 'attempt_201404190545_0000_m_000001_191' to file:/home/cloudera/hd_seq_file
[cloudera@localhost ~]$ ll ~/hd_seq_file total 8 -rwxr-xr-x 1 cloudera cloudera 117 Apr 19 05:45 part-00000 -rwxr-xr-x 1 cloudera cloudera 133 Apr 19 05:45 part-00001 -rwxr-xr-x 1 cloudera cloudera 0 Apr 19 05:45 _SUCCESS | | |
saveAsTextFile | 保存到文本 | 一条一行 | def saveAsTextFile(path: String) def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) | val a = sc.parallelize(1 to 10000, 3) a.saveAsTextFile("mydata_a") 14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a
[cloudera@localhost ~]$ head -n 5 ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/part-00000 1 2 3 4 5
// Produces 3 output files since we have created the a RDD with 3 partitions [cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/ -rwxr-xr-x 1 cloudera cloudera 15558 Apr 3 21:11 part-00000 -rwxr-xr-x 1 cloudera cloudera 16665 Apr 3 21:11 part-00001 -rwxr-xr-x 1 cloudera cloudera 16671 Apr 3 21:11 part-00002 | import org.apache.hadoop.io.compress.GzipCodec a.saveAsTextFile("mydata_b", classOf[GzipCodec])
[cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_b/ total 24 -rwxr-xr-x 1 cloudera cloudera 7276 Apr 3 21:29 part-00000.gz -rwxr-xr-x 1 cloudera cloudera 6517 Apr 3 21:29 part-00001.gz -rwxr-xr-x 1 cloudera cloudera 6525 Apr 3 21:29 part-00002.gz
val x = sc.textFile("mydata_b") x.count res2: Long = 10000 | val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)x.saveAsTextFile("hdfs://localhost:8020/user/cloudera/test"); val sp = sc.textFile("hdfs://localhost:8020/user/cloudera/sp_data")sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/cloudera/sp_x") |
stats [Double] | 统计 | 类似mean,但包含更多信息 | def stats(): StatCounter | val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.stats res16: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859) | | |
sortBy | 排序 | 根据函数返回值进行排序 | def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] | val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1)) y.sortBy(c => c, true).collect res101: Array[Int] = Array(1, 1, 2, 3, 5, 7)
y.sortBy(c => c, false).collect res102: Array[Int] = Array(7, 5, 3, 2, 1, 1)
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5))) z.sortBy(c => c._1, true).collect res109: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))
z.sortBy(c => c._2, true).collect res108: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26)) | | |
sortByKey [Ordered] | | | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = sc.parallelize(1 to a.count.toInt, 2) val c = a.zip(b) c.sortByKey(true).collect res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)) c.sortByKey(false).collect res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
val a = sc.parallelize(1 to 100, 5) val b = a.cartesian(a) val c = sc.parallelize(b.takeSample(true, 5, 13), 2) val d = c.sortByKey(false) res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4)) | | |
stdev [Double], sampleStdev [Double] | 标准偏差 | 估算标准偏差 | def stdev(): Double def sampleStdev(): Double | val d = sc.parallelize(List(0.0, 0.0, 0.0), 3) d.stdev res10: Double = 0.0 d.sampleStdev res11: Double = 0.0
val d = sc.parallelize(List(0.0, 1.0), 3) d.stdev d.sampleStdev res18: Double = 0.5 res19: Double = 0.7071067811865476
val d = sc.parallelize(List(0.0, 0.0, 1.0), 3) d.stdev res14: Double = 0.4714045207910317 d.sampleStdev res15: Double = 0.5773502691896257 | | |
subtract | 去除 | A - B | def subtract(other: RDD[T]): RDD[T] def subtract(other: RDD[T], numPartitions: Int): RDD[T] def subtract(other: RDD[T], p: Partitioner): RDD[T] | val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8) | | |
subtractByKey [Pair] | 根据Key去除 | | def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) val c = sc.parallelize(List("ant", "falcon", "squid"), 2) val d = c.keyBy(_.length) b.subtractByKey(d).collect res15: Array[(Int, String)] = Array((4,lion)) | | |
sum [Double], sumApprox[Double] | 求和 | 相加 | def sum(): Double def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] | val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.sum res17: Double = 101.39999999999999 | | |
take | | 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用) | def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] | | | |
takeOrdered | | 取排序后的前n个 | def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] | val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2) b.takeOrdered(2) res19: Array[String] = Array(ape, cat) | | |
takeSample | | 相对sample,返回一个Array;确切返回n个;顺序随机 | def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] | val x = sc.parallelize(1 to 1000, 3) x.takeSample(true, 100, 1) res3: Array[Int] = Array(339, 718, 810, 105, 71, 268, 333, 360, 341, 300, 68, 848, 431, 449, 773, 172, 802, 339, 431, 285, 937, 301, 167, 69, 330, 864, 40, 645, 65, 349, 613, 468, 982, 314, 160, 675, 232, 794, 577, 571, 805, 317, 136, 860, 522, 45, 628, 178, 321, 482, 657, 114, 332, 728, 901, 290, 175, 876, 227, 130, 863, 773, 559, 301, 694, 460, 839, 952, 664, 851, 260, 729, 823, 880, 792, 964, 614, 821, 683, 364, 80, 875, 813, 951, 663, 344, 546, 918, 436, 451, 397, 670, 756, 512, 391, 70, 213, 896, 123, 858) | | |
toDebugString | | 依赖信息 | def toDebugString: String | val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.toDebugString res6: String = MappedRDD[15] at subtract at <console>:16 (3 partitions) SubtractedRDD[14] at subtract at <console>:16 (3 partitions) MappedRDD[12] at subtract at <console>:16 (3 partitions) ParallelCollectionRDD[10] at parallelize at <console>:12 (3 partitions) MappedRDD[13] at subtract at <console>:16 (3 partitions) ParallelCollectionRDD[11] at parallelize at <console>:12 (3 partitions) | | |
toJavaRDD | | 转换为JavaRDD对象 | def toJavaRDD() : JavaRDD[T] | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.toJavaRDD res3: org.apache.spark.api.java.JavaRDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:12 | | |
top | | 排序取前n个到Array | def top(num: Int)(implicit ord: Ordering[T]): Array[T] | val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2) c.top(2) res28: Array[Int] = Array(9, 8) | | |
toString | | human-readable textual | override def toString: String | val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.toString res7: String = MappedRDD[15] at subtract at <console>:16 | | |
union, ++ | A union B | 返回一个新的数据集,由原数据集和参数联合而成 | def ++(other: RDD[T]): RDD[T] def union(other: RDD[T]): RDD[T] | val a = sc.parallelize(1 to 3, 1) val b = sc.parallelize(5 to 7, 1) (a ++ b).collect res0: Array[Int] = Array(1, 2, 3, 5, 6, 7) | | |
unpersist | | 从保存的内存中擦除;RDD对象保留 | def unpersist(blocking: Boolean = true): RDD[T] | val y = sc.parallelize(1 to 10, 10) val z = (y++y) z.collect z.unpersist(true) 14/04/19 03:04:57 INFO UnionRDD: Removing RDD 22 from persistence list 14/04/19 03:04:57 INFO BlockManager: Removing RDD 22 | | |
values [Pair] | 值RDD | 返回所有的值作为新的RDD元素 | def values: RDD[V] | val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.values.collect res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle) | | |
variance [Double], sampleVariance [Double] | 方差 | | def variance(): Double def sampleVariance(): Double | val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.variance res70: Double = 10.605333333333332
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.variance res14: Double = 66.04584444444443
x.sampleVariance res13: Double = 74.30157499999999 | | |
zip | 拉链 | 拉链函数 | def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] | val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) a.zip(b).collect res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,...
val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) val c = sc.parallelize(201 to 300, 3) a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)... | | |
zipPartitions | | 类似zip,但提供更多控制选项 | def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] | val a = sc.parallelize(0 to 9, 3) val b = sc.parallelize(10 to 19, 3) val c = sc.parallelize(100 to 109, 3) def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] = { var res = List[String]() while (aiter.hasNext && biter.hasNext && citer.hasNext) { val x = aiter.next + " " + biter.next + " " + citer.next res ::= x } res.iterator } a.zipPartitions(b, c)(myfunc).collect res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106) | | |
zipWithIndex | | 会执行一个spark job,因为序号要跨partition | def zipWithIndex(): RDD[(T, Long)] | val z = sc.parallelize(Array("A", "B", "C", "D")) val r = z.zipWithIndex res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))
val z = sc.parallelize(100 to 120, 5) val r = z.zipWithIndex r.collect res11: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20)) | | |
zipWithUniquId | | 不会执行sparkjob,序号不与顺序有关; | def zipWithUniqueId(): RDD[(T, Long)] | val z = sc.parallelize(100 to 120, 5) val r = z.zipWithUniqueId r.collect
res12: Array[(Int, Long)] = Array((100,0), (101,5), (102,10), (103,15), (104,1), (105,6), (106,11), (107,16), (108,2), (109,7), (110,12), (111,17), (112,3), (113,8), (114,13), (115,18), (116,4), (117,9), (118,14), (119,19), (120,24)) | | |