Spark RDD学习

RDD(resilient distributed dataset)是spark核心概念之一,spark中数据的处理过程其实是rdd之间的互相转换,在此对rdd的一些东西进行整理。

概念

RDD: resilient distributed dataset, 弹性分布式数据集

  1. A collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. (集合、分布式,并行计算)
  2. Created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. (数据可以来自scala集合或外部存储,可以进行transform和action)
  3. Persist an RDD in memory, allowing it to be reused efficiently across parallel operations. (持久化中间RDD,加速计算)
  4. RDDs automatically recover from node failures. (失败后基于lineage重建)

创建RDD

来自集合

1
2
3
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val distData2 = sc.parallelize(data, 10) //指定partition数量

来自外部数据

Text File

1
2
val localFile = sc.textFile("file://path/data.txt")
val hdfsFile = sc.textFile("hdfs://path/data.txt")

Sequence File

1
sc.sequenceFile[K, V]

Hadoop InputFormat

1
2
sc.hadoopRDD //old api: org.apache.hadoop.mapred
sc.newAPIHadoopRDD //new api: org.apache.hadoop.mapreduce

RDD操作

RDD包括两种类型的操作: Transformation 和 Action。Transformation从已有的RDD生成新的RDD,lazy执行;Action触发RDD上的计算,返回结果到driver。

Transformation

Transformation Example
map(func) rdd.map(x => x*x)
filter(func) rdd.filter(x => x/2==0)
flatMap(func) rdd.flatMap(x => )
union(otherRDD) rdd1.union(rdd2)
distinct([numTasks]) rdd.distinct()
groupByKey([numTasks]) rdd.map((_, 1)).groupByKey()
reduceByKey(func, [numTasks]) rdd.map((, 1)).reduceByKey(+_)
sortByKey([ascending], [numTasks]) rdd.sortByKey()
join(otherDataset, [numTasks]) rdd1.join(rdd2)
cogroup(otherDataset, [numTasks]) rdd1.cogroup(rdd2)
cartesian(otherDataset) rdd1.cartesian(rdd2)
coalesce(numPartitions) rdd.coalesce(50)
repartition(numPartitions) rdd.repartition(100)

Action

Action Example
reduce(func) rdd.reduce(+)
collect() rdd.collect()
count() rdd.count()
first() rdd.first()
take(n) rdd.take(10)
saveAsTextFile(path) rdd.saveAsTextFile(“/output”)
saveAsSequenceFile(path) rdd.saveAsSequenceFile(“/output”)
saveAsObjectFile(path) rdd.saveAsObjectFile(“/output”)
countByKey() rdd.countByKey()
foreach(func) rdd.foreach(func)

示例 WordCount

  • step-1: 数据读入, sc.textFile 创建RDD
1
val rdd1 = sc.textFile("README.md")
  • step-2: map操作, flatMap, map Transformation
1
val rdd2 = rdd1.flatMap(x => x.split(" ")).map(x => (x, 1))
  • step-3: reduce操作,reduceByKey Transformation(shuffle)
1
val rdd3 = rdd2.reduceByKey(v => v + v)
  • step-4: 结果输出, collect Action
1
rdd3.collect()

RDD持久化

使用cache()/persist()对RDD进行持久化,默认内存.

storage level

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

经验

  • 最好选用MEMORY_ONLY
  • 若内存不够,尝试MEMORY_ONLY_SER(Kryo serialization效率高于Java serialization)
  • 除非RDD的计算很复杂或迭代次数很多, 再将其spill到磁盘;否则,其效率与从源端重算差不多。
  • 若想提高容错, 提升其replicated storage level
  • 使用Tachyon OFF_HEAP:executor挂掉缓存数据不丢失、减少executor的gc开销、不同executor间共享缓存

共享变量

Broadcast Variables

  • 使用sc.broadcase()将变量分发到所有工作节点
  • 工作节点使用value获取变量值
  • 一旦被分发,不能更改
1
2
3
4
5
scala> val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res1: Array[Int] = Array(1, 2, 3)

Accumulator

  • executor使用 “+=” 操作符增加计数
  • driver使用value读取数值
1
2
3
4
5
6
7
scala> val acc = sc.accumulator(0, "test accumulator")
acc: org.apache.spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1,2,3,4)).foreach(x=> acc+=x)
scala> acc.value
res3: Int = 10

RDD依赖关系

窄依赖(narrow dependencies)

  • 子RDD的每个分区依赖于常数个父RDD分区(即与数据规模无关)
  • 输入输出一对一的算子,且结果RDD的分区结构不变,如map、flatMap
  • 输入输出一对一,但结果RDD的分区结构发生变化,如union、coalesce
  • 从输入中选择部分元素的算子,如filter、distinct、subtract、sample

宽依赖(wide dependencies)

  • 子RDD的每个分区依赖于所有父RDD分区
  • 对单个RDD基于Key进行重组和Reduce,如groupByKey、reduceByKey
  • 对两个RDD基于Key进行Join和重组,如join

RDD容错

支持容错通常采用两种方式:数据复制或日志记录。对于以数据为中心的系统而言,这两种方式都非常昂贵,因为它需要跨集群网络拷贝大量数据,毕竟带宽的数据远远低于内存。

RDD天生是支持容错的。首先,它自身是一个不变的(immutable)数据集,其次,它能够记住构建它的操作图(Graph of Operation),因此当执行任务的Worker失败时,完全可以通过操作图获得之前执行的操作,进行重新计算。由于无需采用replication方式支持容错,很好地降低了跨网络的数据传输成本。

不过,在某些场景下,Spark也需要利用记录日志的方式来支持容错。例如,在Spark Streaming中,针对数据进行update操作,或者调用Streaming提供的window操作时,就需要恢复执行过程的中间状态。此时,需要通过Spark提供的checkpoint机制,以支持操作能够从checkpoint得到恢复。

针对RDD的wide dependency,最有效的容错方式同样还是采用checkpoint机制。不过,似乎Spark的最新版本仍然没有引入auto checkpointing机制。

参考