Spark流转换操作(Streaming Transformation Operations)

1. 目的

本章将学习常用的流转换操作:map、flatmap、filter、reduceByKey、countByValue、updateStateByKey。

2. Spark常用的流转换操作

1)map(func)

对DStream中的各个元素进行func函数操作,然后返回一个新的DStream。

val conf = new SparkConf().setMaster("local[2]") .setAppName("MapOpTest")
val ssc = new StreamingContext(conf , Seconds(1))
val words = ssc.socketTextStream("localhost", 9999)
val ans = words.map { word => ("hello" ,word ) }    // map hello with each line
ans.print()
ssc.start()    // Start the computation
ssc.awaitTermination()    // Wait for termination
}

2)flatMap(func)

与map方法类似,只不过各个输入项可以被输出为零个或多个输出项。

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))    // for each line it split the words by space
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

3)filter(func)

过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream。

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val output = words.filter { word => word.startsWith("s") }    // filter the words starts with letter“s”
output.print()

4)reduceByKey(func, [numTasks])

利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream。

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

5)countByValue()

对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数。

val line = ssc.socketTextStream("localhost", 9999)
val words = line.flatMap(_.split(" "))
words.countByValue().print()

6)updateStateByKey(func)

根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream。

updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。需要通过两步来使用这个操作:

1.定义状态:状态可以是任何的数据类型。

2.定义状态更新函数:怎样利用更新前的状态和从输入流里面获取的新值更新状态。

def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val ssc = new StreamingContext(conf , Seconds(10))
val line = ssc.socketTextStream("localhost", 9999)
ssc.checkpoint("/home/asus/checkpoints/")    // Here ./checkpoints/ are the directory where all checkpoints are stored.
val words = line.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val globalCountStream = pairs.updateStateByKey(updateFunc)
globalCountStream.print()
ssc.start()   // Start the computation
ssc.awaitTermination()
Spark流转换操作(Streaming Transformation Operations)
滚动到顶部