Spark RDD编程案例
WordCount
WordCount编程是我们最熟悉不过的了,使用Spark进行编写程序会比MapReduce编程简便许多。
代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.apache.spark.{SparkConf, SparkContext}
object WordCount { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc: SparkContext = new SparkContext(config) sc.setLogLevel("WARN") val result: Array[(String,Int)] = sc.textFile("in"). flatMap(_.split(" ")). map((_,1)). reduceByKey(_+_). collect() result.foreach(x => println(x)) } }
|
这里选择从本地文件导入数据,存放在Idea项目根目录下in文件夹,数据内容以及运行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| hadoop hello spark hello ambari yes scala java hive hbase hadoop hello spark hello ambari yes scala java hive hbase hive hbase hadoop hello spark hello ambari yes
(scala,2) (hive,3) (ambari,3) (hello,6) (java,2) (spark,3) (yes,3) (hadoop,3) (hbase,3)
|
TopN
TopN功能是实现对一个较大数据量的数据进行排序,之后输出前N条数据,一般来说有3种类型的TopN写法,这里使用的是键不相同的写法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import org.apache.spark.{SparkConf, SparkContext}
object TopN { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TopN") val sc: SparkContext = new SparkContext(config) sc.setLogLevel("WARN") val result: Array[(String,String )] = sc.textFile("in/top/") .map(_.split(" ")) .map(line => (line(0), line(1))) .sortByKey(false) .take(5) result.foreach(println) } }
|
输入数据以及输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 56 test1 73 test2 84 test3 74 test4 83 test5 93 test6 88 test7 81 test8 92 test9 34 test10
(93,test6) (92,test9) (88,test7) (84,test3) (83,test5)
|
Avg
题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.apache.spark.{SparkConf, SparkContext}
object Avg { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Avg") val sc: SparkContext = new SparkContext(config) sc.setLogLevel("WARN") val data = Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)) val res = sc.parallelize(data) .mapValues(a => (a,1)) .reduceByKey((a,b) => (a._1+b._1,a._2+b._2)) .map(t => (t._1,t._2._1/t._2._2)) res.foreach(println) } }
|
这里使用简易的数组导入数据,使用parallelize()方法,输出如下:
1 2 3 4 5
| (spark,4) (hadoop,5)
进程已结束,退出代码 0
|
SecondarySort
二次排序也是一个经典案例,实现按键排序的同时按值排序,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import org.apache.spark.{SparkConf, SparkContext}
class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable { def compare(other: SecondarySortKey): Int = if (this.first - other.first != 0) this.first - other.first else this.second - other.second }
object SecondarySort { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SecondarySort") val sc: SparkContext = new SparkContext(config) sc.setLogLevel("WARN") val result: Array[(Int,Int)] = sc.textFile("in/second") .map(_.split(" ")) .map(line => (line(0).toInt, line(1).toInt)) .map(line => (new SecondarySortKey(line._1,line._2), line)) .sortByKey(false) .map(x => x._2).collect() result.foreach(println)
}
}
|
输入数据以及输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 32 35 25 46 52 65 29 85 85 42 25 45 95 96 75 96 75 62
(95,96) (85,42) (75,96) (75,62) (52,65) (32,35) (29,85) (25,46) (25,45)
|