内容
1. 目的
本章将学习常用的流转换操作:map、flatmap、filter、reduceByKey、countByValue、updateStateByKey。
2. Spark常用的流转换操作
1)map(func)
对DStream中的各个元素进行func函数操作,然后返回一个新的DStream。
val conf = new SparkConf().setMaster("local[2]") .setAppName("MapOpTest") val ssc = new StreamingContext(conf , Seconds(1)) val words = ssc.socketTextStream("localhost", 9999) val ans = words.map { word => ("hello" ,word ) } // map hello with each line ans.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for termination }
2)flatMap(func)
与map方法类似,只不过各个输入项可以被输出为零个或多个输出项。
val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) // for each line it split the words by space val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print()
3)filter(func)
过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream。
val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val output = words.filter { word => word.startsWith("s") } // filter the words starts with letter“s” output.print()
4)reduceByKey(func, [numTasks])
利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream。
val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print()
5)countByValue()
对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数。
val line = ssc.socketTextStream("localhost", 9999) val words = line.flatMap(_.split(" ")) words.countByValue().print()
6)updateStateByKey(func)
根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream。
updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。需要通过两步来使用这个操作:
1.定义状态:状态可以是任何的数据类型。
2.定义状态更新函数:怎样利用更新前的状态和从输入流里面获取的新值更新状态。
def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val ssc = new StreamingContext(conf , Seconds(10)) val line = ssc.socketTextStream("localhost", 9999) ssc.checkpoint("/home/asus/checkpoints/") // Here ./checkpoints/ are the directory where all checkpoints are stored. val words = line.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val globalCountStream = pairs.updateStateByKey(updateFunc) globalCountStream.print() ssc.start() // Start the computation ssc.awaitTermination()
Spark流转换操作(Streaming Transformation Operations)