加载中...
Spark+RDD
发表于:2021-10-13 | 分类: Spark学习

Spark RDD编程案例

WordCount

WordCount编程是我们最熟悉不过的了,使用Spark进行编写程序会比MapReduce编程简便许多。
代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//设置内核个数以及程序名称
val sc: SparkContext = new SparkContext(config)
//创建Spark上下文
sc.setLogLevel("WARN")
val result: Array[(String,Int)] = sc.textFile("in").
//逐行读入数据,路径是项目根目录下in文件夹
flatMap(_.split(" ")).
//拆分单词
map((_,1)).
//将每个单词变成键值对
reduceByKey(_+_).
//对每个键值对进行处理,对于相同的键进行相加
collect()
result.foreach(x => println(x))
}
}

这里选择从本地文件导入数据,存放在Idea项目根目录下in文件夹,数据内容以及运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//输入数据
hadoop hello
spark hello
ambari yes
scala java
hive hbase
hadoop hello
spark hello
ambari yes
scala java
hive hbase
hive hbase
hadoop hello
spark hello
ambari yes

//输出结果
(scala,2)
(hive,3)
(ambari,3)
(hello,6)
(java,2)
(spark,3)
(yes,3)
(hadoop,3)
(hbase,3)

TopN

TopN功能是实现对一个较大数据量的数据进行排序,之后输出前N条数据,一般来说有3种类型的TopN写法,这里使用的是键不相同的写法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.spark.{SparkConf, SparkContext}

object TopN {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TopN")
//设置内核个数以及程序名称
val sc: SparkContext = new SparkContext(config)
//创建Spark上下文
sc.setLogLevel("WARN")
val result: Array[(String,String )] = sc.textFile("in/top/")
//逐行读入数据,路径是项目根目录下in/top文件夹
.map(_.split(" "))
//按空格分开
.map(line => (line(0), line(1)))
//映射键值对
.sortByKey(false)
//按键进行排序,默认升序,false降序
.take(5)
//取结果前N个
result.foreach(println)
}
}

输入数据以及输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//输入数据
56 test1
73 test2
84 test3
74 test4
83 test5
93 test6
88 test7
81 test8
92 test9
34 test10

//输出
(93,test6)
(92,test9)
(88,test7)
(84,test3)
(83,test5)

Avg

题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark.{SparkConf, SparkContext}

object Avg {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Avg")
//设置内核个数以及程序名称
val sc: SparkContext = new SparkContext(config)
//创建Spark上下文
sc.setLogLevel("WARN")
val data = Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6))
val res = sc.parallelize(data)
//读入数据
.mapValues(a => (a,1))
//将值映射成为(值,次数)的形式
.reduceByKey((a,b) => (a._1+b._1,a._2+b._2))
//相同键进行Reduce,累加次数以及销量
.map(t => (t._1,t._2._1/t._2._2))
//计算平均值
res.foreach(println)
}
}

这里使用简易的数组导入数据,使用parallelize()方法,输出如下:

1
2
3
4
5
//output
(spark,4)
(hadoop,5)

进程已结束,退出代码 0

SecondarySort

二次排序也是一个经典案例,实现按键排序的同时按值排序,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.spark.{SparkConf, SparkContext}

//定义一个比较类,重写比较方法,记得加上序列化
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
def compare(other: SecondarySortKey): Int =
if (this.first - other.first != 0)
this.first - other.first
else
this.second - other.second
}

object SecondarySort {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SecondarySort")
//设置内核个数以及程序名称
val sc: SparkContext = new SparkContext(config)
//创建Spark上下文
sc.setLogLevel("WARN")
val result: Array[(Int,Int)] = sc.textFile("in/second")
//逐行读入数据,路径是项目根目录下in/second文件夹
.map(_.split(" "))
//按空格分开
.map(line => (line(0).toInt, line(1).toInt))
//映射键值对
.map(line => (new SecondarySortKey(line._1,line._2), line))
//映射成为(可比较类,原数据)的形式
.sortByKey(false)
//按键降序
.map(x => x._2).collect()
//排序完只保留原数据
result.foreach(println)

}

}

输入数据以及输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//input
32 35
25 46
52 65
29 85
85 42
25 45
95 96
75 96
75 62

//output
(95,96)
(85,42)
(75,96)
(75,62)
(52,65)
(32,35)
(29,85)
(25,46)
(25,45)

上一篇:
Azkaban
下一篇:
Ambari
本文目录
本文目录