索鸟网

  1. 首页
  2. Spark入门(四):RDD基本操作

Spark入门(四):RDD基本操作


1.RDD转换

RDD的所有转换操作都不会进行真正的计算

1.1单个RDD转换操作

# 创建测试RDD
val rdd = sc.parallelize(Array("hello world","java","scala easy"))

# 1.map():遍历RDD中的每个元素,将返回值构成新的RDD,返回值类型可和原RDD不一致
val mapRdd = rdd.map(x => "map:"+x)
mapRdd.foreach(println)
# 输出
# map:hello world
# map:java
# map:scala easy

# 2.flatMap(): 遍历RDD中的每个元素,将返回的迭代器的所有内容构成新的 RDD
val flatMapRdd = rdd.flatMap(x => x.split(" "))
flatMapRdd.foreach(println)
# 输出
# hello
# world
# java
# scala
# easy

# 3.filter():遍历RDD中的每个元素,将匹配的元素构成新的RDD
val filterRdd = rdd.filter(x => x.contains("java"))
filterRdd.foreach(x => x.contains("java"))
filterRdd.foreach(println)
# 输出
# java

# 4.distinct():去重
val distinctRdd = flatMapRdd.distinct()
distinctRdd.foreach(println)
# scala
# hello
# easy
# java
# world

# 5.sample(withReplacement, fraction, [seed]):对 RDD 采样,以及是否替换

1.2 两个RDD转换操作

# 创建两个测试RDD
val rdd1 = sc.parallelize(Array("java","scala","spring"))
val rdd2 = sc.parallelize(Array("c++","java","spark"))

# 1.union():合并两个RDD
val unionRdd = rdd1.union(rdd2)
unionRdd.foreach(println)
# java
# scala
# spring
# c++
# java
# spark

# 2.intersection():求两个RDD元素的共同元素
val intersectionRdd = rdd1.intersection(rdd2)
intersectionRdd.foreach(println)
# java

# 3.subtract():移除RDD中的指定元素
val subtractRdd = rdd1.subtract(rdd2)
subtractRdd.foreach(println)

# 4.cartesian():求两个RDD元素的笛卡尔积
val cartesianRdd = rdd1.cartesian(rdd2)
cartesianRdd.foreach(println)
# (java,c++)
# (java,java)
# (java,spark)
# (scala,c++)
# (scala,java)
# (scala,spark)
# (spring,c++)
# (spring,java)
# (spring,spark)
2.行动操作

行动操作会真正触发RDD的计算操作

2.1 reduce()

它接收一个函数作为参数,这个
函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素

val rdd = sc.parallelize(Array(1,2,3,4))
# 计算所有元素的总和
println(rdd.reduce((x,y) => x+y))
# 10

2.2 fold()

fold() 和 reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个
“初始值”来作为每个分区第一次调用时的结果。(例如 +
对应的 0, * 对应的 1,或拼接操作对应的空列表)。

# 计算所有元素的综合
println(rdd.fold(0)((x,y) => x+y))
# 10

2.3 collect()

将整个RDD的内容返回

rdd.collect().foreach(print)
#1234

2.4 take(n)

返回RDD中的n个元素

rdd.take(2).foreach(print)
#12

2.5 top(n)

返回RDD中前n个元素,top()会使用数据的默认排序

rdd.top(3)
#123

2.6 count()

返回RDD中所有元素的个数

print(rdd.count())
4

2.7 countByValue()

返回个元素在RDD中出现的个数

rdd.countByValue().foreach(println+)
(1,1)
(3,1)
(2,1)
(4,1)

2.8 takeSample(withReplacement, num, [seed])

从 RDD 中返回任意num个元素

rdd.takeSample(false,3)

2.9 foreach

对 RDD 中的每个元素使用给
定的函数

2.10 aggregate(zeroValue)(seqOp, combOp)

和 reduce() 相似,可以返回不同类型的函数

val result = rdd.aggregate((0, 0))((x, y) =>(x._1 + y, x._2 + 1),(part1, part2) =>(part1._1 + part2._1,part1._2 + part2._2))
print(result)
(10,4)

参数说明

((0, 0))
# 第一步:指定初始值
((x, y) =>(x._1 + y, x._2 + 1),
# 2:分片计算
# x为初始值(0,0),y为RDD元素(1,2,3,4)
# 假设RDD分布在两个分片上(1,2)为一个分片,(3,4)为一个分片
# 则计算结果如下:
# 分片1:
# 0+1,0+1
# 1+2,1+1
# 分片1结果:(3,2)
# 分片2:
# 0+3,0+1
# 3+4,1+1
# 分片2结果:(7,2)
(part1, part2) =>(part1._1 + part2._1,part1._2 + part2._2))
# 第三步:合并分片数据
# 3+7,2+2
# 输出结果(10,4)
相关标签: 大数据

本文原创发布于慕课网 ,转载请注明出处,谢谢合作!

来源地址:http://www.imooc.com/article/19882 版权归作者所有!

相关教程

  • Spark入门(三):RDD概述

    1.RDD概述 Spark 对数据的核心抽象---弹性分布式数据集(Resilient Distributed Dataset,简 称 RDD) 1.1 不可变 RDD是一个不可变的分布式对象集合,一旦创建便不能修改。 1.2 分区 每个RDD都可被分为多个分区,分布在不同的节点机器之上 1.3 转换(transformation) 转化操作会由一个 RDD 生成一个新
  • Spark入门(五):键值对RDD

    1.创建PairRDD 普通RDD转Pair RDD val rdd = sc.parallelize(Array("java","scala")) rdd.foreach(println) //java //scala val pairRdd = rdd.map(w => (w,"编程语言")) pairRdd.foreach(print) //(java,编
  • Spark入门(七):Spark运行时架构

    在一个 Spark 集群中,有一个节点负责中央协调,调度各个分布式工作节点。这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节 点进行通信,它们也都作为独立的 Java 进程运行。驱动器节点和所有的执行器节点一起被称为一个 Spark 应用(application)。 一、Spark
  • Spark入门阶段一之扫盲笔记

    介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率更高,mapreduce总是消耗大量时间排序,而有些场景不需要排序,spark可以避免不必要的排序所带来的开销,spark是一张有向无环图,spark支
  • Spark官方文档翻译:Quick Start

    Quick Start This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write a
  • Spark:超越Hadoop MapReduce

    引言:和 Hadoop 一样,Spark 提供了一个 Map/Reduce API(分布式计算)和分布式存储。二者主要的不同点是,Spark 在集群的内存中保存数据,而 Hadoop 在集群的磁盘中存储数据。 本文选自《SparkGraphX实战》。  大数据对一些数据科学团队来说是 主要的挑战,因为在要求的可扩展性方面单机没有能力和容量来运行大规模数据处 理。此外,即
  • Spark Programming Guide(二)

    Basics To illustrate RDD basics, consider the simple program below: 仔细观察下面的程序,介绍了RDD的基本功能: val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLeng
  • Spark官方文档翻译:Spark Programming Guide

    Overview At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstract