一、前言
使用语言:scala
术语:Resilient Distributed Datasets(RDD)
二、弹性分布式数据集(RDD)定义
Spark围绕着弹性分布式数据集(RDD)的概念,是一个可以并行操作的容错容器集合,有两种方法来创建RDD:
- 并行化驱动程序中的现有集合
- 外部存储系统中的数据集,例如共享文件系统,HDFS,HBase或提供Hadoop InputFormat的任何数据源。
1) Parallelized Collections(并行数据集)
使用SparkContext的parallelize方法创建一个并行数据集
1 | val data = Array(1, 2, 3, 4, 5) |
2) 外部数据集(共享文件系统,HDFS,HBase…)
下面使用SparkContext的textFile方法创建一个文件数据集
1 | val distFile = sc.textFile("data.txt") |
注意:
- Spark的所有基于文件的输入方法(包括textFile)都支持在目录,压缩文件和通配符上运行。 例如,可以使用textFile(“/ my / directory”),textFile(“/ my / directory / 。txt”)和textFile(“/ my / directory / .gz”)。
- textFile方法还接受可选的第二个参数,用于控制文件的分区数。 默认情况下,Spark为文件的每个块创建一个分区(HDFS默认为128MB),但你也可以通过传递一个更大的值来请求更多的分区。 请注意,您不能有比块更少的分区。
三、弹性分布式数据集(RDD)操作
####(一)定义
RDD支持两种类型的操作:
- 转换(Transformations):从现有数据集创建新数据集。例如:map是一个通过一个函数传递每个数据集元素的变换,并返回一个表示结果的新RDD
- 动作(Actions):在数据集上运行计算后向驱动程序返回值。例如:reduce是使用一些函数聚合RDD的所有元素并将最终结果返回到驱动程序的动作
Spark中的所有转换都是懒惰的,因为它们不会马上计算它们的结果。相反,他们只记住应用于一些基础数据集(例如文件)的转换。仅当操作需要将结果返回到驱动程序时才会计算转换。这种设计使Spark能够更有效地运行。例如,我们可以认识到通过map创建的数据集将在reduce中使用,并且只返回reduce到驱动程序的结果,而不是较大的映射数据集。
默认情况下,每次对其执行操作时,都可以重新计算每个已转换的RDD。但是,您还可以使用persist(或cache)方法在内存中持久化RDD,在这种情况下,Spark会保留集群上的元素,以便在下次查询时更快地访问。还支持在磁盘上持久存储RDD,或者在多个节点上复制RDD。
####(二)基础
为了说明RDD基础知识,下面示列程序解释说明:
1 | val lines = sc.textFile("data.txt") |
- 第一行定义来自外部文件的基本RDD。 此数据集未加载到内存中或以其他方式处理:行仅仅是指向文件的指针。
- 第二行定义lineLength作为地图变换的结果。 同样,由于懒惰,lineLength不会立即计算
- 第三行我们运行reduce,这是一个动作。 在这一点上,Spark将计算分解为在单独的机器上运行的任务,每个机器运行它的一部分映射和本地缩减,只返回它的驱动程序的答案。
- 第四行持久化数据到内存
####(三)RDD类型说明
下面表格列除了Spark
支持的一下常用Transformations
:
转换(Transformation) | 说明 | 备注 |
---|---|---|
map(func) | 返回通过函数func传递源的每个元素形成的新的分布式数据集 | |
filter(func) | 返回通过选择func返回true的源的那些元素形成的新数据集 | |
flatMap(func) | 与map类似,但每个输入项可以映射到0个或更多的输出项(因此func应该返回一个Seq,而不是一个单独的项) | |
mapPartitions(func) | 类似于map,但是在RDD的每个分区(块)上单独运行,因此当在类型T的RDD上运行时,func必须是Iterator |
|
mapPartitionsWithIndex(func) | 与mapPartition类似,但是也为func提供了表示分区索引的整数值,因此当在类型为T的RDD上运行时,func必须是类型(Int,Iterator |
|
sample(withReplacement, fraction, seed) | 使用给定的随机数发生器种子,采样具有或不具有替换的数据的分数 | |
union(otherDataset) | 返回一个新的数据集,其中包含源数据集和参数中元素的并集 | |
intersection(otherDataset) | 返回一个包含源数据集和参数中的元素交集的新RDD | |
distinct([numTasks])) | 返回包含源数据集的不同元素的新数据集 | 去重元素 |
groupByKey([numTasks]) | 当在(K,V)对的数据集上调用时,返回(K,Iterable |
|
reduceByKey(func, [numTasks]) | 当在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中使用给定的reduce函数func聚集每个键的值,其必须是类型(V,V)=> V.像groupByKey一样,reduce任务的数量可以通过可选的第二个参数来配置 | |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 当在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中使用给定的组合函数和中性“零”值来聚合每个键的值。 允许与输入值类型不同的聚合值类型,同时避免不必要的分配。 像groupByKey中一样,reduce任务的数量可以通过可选的第二个参数来配置 | |
sortByKey([ascending], [numTasks]) | 当对(K,V)对的数据集(其中K实现Ordered)调用时,返回按升序或降序按键排序的(K,V)对的数据集,如布尔值升序参数中所指定。 | |
join(otherDataset, [numTasks]) | 当对类型(K,V)和(K,W)的数据集调用时,返回具有每个键的所有元素对的(K,(V,W))对的数据集。 外部联接通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持 | |
cogroup(otherDataset, [numTasks]) | 当对类型(K,V)和(K,W)的数据集调用时,返回(K,(Iterable |
|
cartesian(otherDataset) | 当调用类型T和U的数据集时,返回(T,U)对(所有元素对)的数据集 | |
pipe(command, [envVars]) | 通过shell命令管道RDD的每个分区,例如。 一个Perl或bash脚本。 RDD元素被写入进程的stdin,并且行输出到其stdout作为字符串的RDD返回 | |
coalesce(numPartitions) | 将RDD中的分区数减少到numPartitions。 用于在过滤掉大型数据集后更有效地运行操作 | |
repartition(numPartitions) | 随机重新刷新RDD中的数据以创建更多或更少的分区,并在它们之间进行平衡。 这总是在网络上刷新所有数据 | |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的分区器重新分区RDD,并在每个生成的分区中按其键排序记录。 这比调用重新分区然后在每个分区内排序更有效,因为它可以将排序推入shuffle机器 |
下面表格列除了Spark
支持的一下常用Actions
:
动作(Action) | 说明 | |
---|---|---|
reduce(func) | 使用函数func(它接受两个参数并返回一个)聚合数据集的元素。 函数应该是可交换和关联的,以便它可以被并行正确计算 | |
collect() | 在驱动程序中将数据集的所有元素作为数组返回。 这在返回足够小的数据子集的过滤器或其他操作之后通常是有用的 | |
count() | 返回数据集中的元素数 | |
first() | 返回数据集的第一个元素,类似于take(1) | |
take(n) | 返回数组中前n个元素的数组 | |
takeSample(withReplacement, num, [seed]) | 返回具有数据集的num个元素的随机样本的数组,具有或不具有替换,可选地预指定随机数生成器种子 | |
takeOrdered(n, [ordering]) | 使用它们的自然顺序或自定义比较器返回RDD的前n个元素将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。 Spark将对每个元素调用toString将其转换为文件中的一行文本 | |
saveAsSequenceFile(path) (Java and Scala) | 将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定路径中。 这可以在实现Hadoop的Writable接口的键值对的RDD上使用。 在Scala中,它也可以隐式转换为Writable的类型(Spark包括基本类型的转换,如Int,Double,String等) | |
saveAsObjectFile(path) (Java and Scala) | 使用Java序列化以简单的格式编写数据集的元素,然后可以使用SparkContext.objectFile()加载 | |
countByKey() | 仅适用于(K,V)类型的RDD。 返回具有每个键的计数的(K,Int)对的哈希 | |
foreach(func) | 对数据集的每个元素运行函数func。 这通常会有其他影响,例如更新累加器或与外部存储系统交互。注意:修改除foreach()之外的累加器以外的变量可能会导致未定义的行为 |
最后
参考:http://spark.apache.org/docs/latest/programming-guide.html