Spark RDD编程案例
 WordCount
WordCount编程是我们最熟悉不过的了,使用Spark进行编写程序会比MapReduce编程简便许多。
代码示例如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | import org.apache.spark.{SparkConf, SparkContext}
 object WordCount {
 def main(args: Array[String]): Unit = {
 val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
 
 val sc: SparkContext = new SparkContext(config)
 
 sc.setLogLevel("WARN")
 val result: Array[(String,Int)] = sc.textFile("in").
 
 flatMap(_.split(" ")).
 
 map((_,1)).
 
 reduceByKey(_+_).
 
 collect()
 result.foreach(x => println(x))
 }
 }
 
 | 
这里选择从本地文件导入数据,存放在Idea项目根目录下in文件夹,数据内容以及运行结果如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | hadoop hello
 spark hello
 ambari yes
 scala java
 hive hbase
 hadoop hello
 spark hello
 ambari yes
 scala java
 hive hbase
 hive hbase
 hadoop hello
 spark hello
 ambari yes
 
 
 (scala,2)
 (hive,3)
 (ambari,3)
 (hello,6)
 (java,2)
 (spark,3)
 (yes,3)
 (hadoop,3)
 (hbase,3)
 
 | 
 TopN
TopN功能是实现对一个较大数据量的数据进行排序,之后输出前N条数据,一般来说有3种类型的TopN写法,这里使用的是键不相同的写法。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | import org.apache.spark.{SparkConf, SparkContext}
 object TopN {
 def main(args: Array[String]): Unit = {
 val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TopN")
 
 val sc: SparkContext = new SparkContext(config)
 
 sc.setLogLevel("WARN")
 val result: Array[(String,String )] = sc.textFile("in/top/")
 
 .map(_.split(" "))
 
 .map(line => (line(0), line(1)))
 
 .sortByKey(false)
 
 .take(5)
 
 result.foreach(println)
 }
 }
 
 | 
输入数据以及输出结果:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | 56 test1
 73 test2
 84 test3
 74 test4
 83 test5
 93 test6
 88 test7
 81 test8
 92 test9
 34 test10
 
 
 (93,test6)
 (92,test9)
 (88,test7)
 (84,test3)
 (83,test5)
 
 | 
 Avg
题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
代码示例如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | import org.apache.spark.{SparkConf, SparkContext}
 object Avg {
 def main(args: Array[String]): Unit = {
 val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Avg")
 
 val sc: SparkContext = new SparkContext(config)
 
 sc.setLogLevel("WARN")
 val data = Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6))
 val res = sc.parallelize(data)
 
 .mapValues(a => (a,1))
 
 .reduceByKey((a,b) => (a._1+b._1,a._2+b._2))
 
 .map(t => (t._1,t._2._1/t._2._2))
 
 res.foreach(println)
 }
 }
 
 | 
这里使用简易的数组导入数据,使用parallelize()方法,输出如下:
| 12
 3
 4
 5
 
 | (spark,4)
 (hadoop,5)
 
 进程已结束,退出代码 0
 
 | 
 SecondarySort
二次排序也是一个经典案例,实现按键排序的同时按值排序,代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 
 | import org.apache.spark.{SparkConf, SparkContext}
 
 class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable {
 def compare(other: SecondarySortKey): Int =
 if (this.first - other.first != 0)
 this.first - other.first
 else
 this.second - other.second
 }
 
 object SecondarySort {
 def main(args: Array[String]): Unit = {
 val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SecondarySort")
 
 val sc: SparkContext = new SparkContext(config)
 
 sc.setLogLevel("WARN")
 val result: Array[(Int,Int)] = sc.textFile("in/second")
 
 .map(_.split(" "))
 
 .map(line => (line(0).toInt, line(1).toInt))
 
 .map(line => (new SecondarySortKey(line._1,line._2), line))
 
 .sortByKey(false)
 
 .map(x => x._2).collect()
 
 result.foreach(println)
 
 }
 
 }
 
 | 
输入数据以及输出结果:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | 32 35
 25 46
 52 65
 29 85
 85 42
 25 45
 95 96
 75 96
 75 62
 
 
 (95,96)
 (85,42)
 (75,96)
 (75,62)
 (52,65)
 (32,35)
 (29,85)
 (25,46)
 (25,45)
 
 
 |