Spark DStream相关操作
与 RDD 类似,DStream 也提供了自己的一系列操作方法,这些操作可以分成 3 类:普通的转换操作、窗口转换操作和输出操作。
普通的转换操作
普通的转换操作如表 1 所示
Suo | 描述 |
---|---|
map(func) | 源 DStream 的每个元素通过函数 func 返回一个新的 DStream。 |
flatMap(func) | 类似于 map 操作,不同的是,每个输入元素可以被映射出 0 或者更多的输出元素 |
filter(func) | 在源 DStream 上选择 func 函数的返回值仅为 true 的元素,最终返回一个新的 DStream。 |
repartition(numPartitions) | 通过输入的参数 numPartitions 的值来改变 DStream 的分区大小 |
union(otherStream) | 返回一个包含源 DStream 与其他 DStream 的元素合并后的新 DStream |
count() | 对源 DStream 内部所含有的 RDD 的元素数量进行计数,返回一个内部的 RDD 只包含一个元素的 DStream |
reduce(func) | 使用函数 func(有两个参数并返回一个结果)将源 DStream 中每个 RDD 的元素进行聚合操作,返回一个内部所包含的 RDD 只有一个元素的新 DStream |
countByValue() | 计算 DStream 中每个 RDD 内的元素出现的频次并返回新的 DStream(<K,Long>),其中,K 是 RDD 中元素的类型,Long 是元素出现的频次 |
reduceByKey(func,[numTasks]) | 当一个类型为 <K,V> 键值对的 DStream 被调用的时候,返回类型为键值对的新 DStream,其中每个键的值 V 都是使用聚合函数 func 汇总的。可以通过配置 numTasks 设置不同的并行任务数 |
join(otherStream,[numTasks]) | 当被调用类型分别为 <K,V> 和 <K,W> 键值对的两个 DStream 时,返回一个类型为 <K,<V,W>> 键值对的新 DStream |
cogroup(otherStream,[numTasks]) | 当被调用的两个 DStream 分别含有 <K,V> 和 <K,W>键值对时,返回一个 <K,Seq[V],seq[W]> 类型的新的 DStream |
transform(func) | 通过对源 DStream 的每个 RDD 应用 RDD-to-KDD 函数,返回一个新的 DStream,这可以用来在 DStream 中做任意 RDD 操作 |
updateStateByKey(func) | 返回一个新状态的 DStream,其中每个键的新状态是基于前一个状态和其新值通过函数 func 计算得出的。这个方法可以被用来维持每个键的任何状态数据 |
在表 1 列出的操作中,transform(func) 方法和 updateStateByKey(fhnc) 方法值得再深入地探讨一下。
1. transform(func) 方法
transform 方法及类似的 transformWith(func) 方法允许在 DStream 上应用任意 RDD-to-RDD 函数,它们可以被应用于未在 DStream API 中暴露的任何 RDD 操作中。
例如,每批次的数据流与另一数据集的连接功能不能直接暴露在 DStream API 中,但可以轻松地使用 transform(func) 方法来做到这一点,这使得 DStream 的功能非常强大。
例如,可以通过连接预先计算的垃圾邮件信息的输入数据流,来做实时数据清理的筛选。事实上,也可以在 transform(func) 方法中使用机器学习和图形计算的算法。
2. updateStateByKey(func) 方法
updateStateByKey(func) 方法可以保持任意状态,同时允许不断有新的信息进行更新。要使用此功能,必须进行以下两个步骤。
1)定义状态:状态可以是任意的数据类型。
2)定义状态更新函数:用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。
用一个例子来说明,假设要进行文本数据流中单词计数。在这里,正在运行的计数是状态而且它是一个整数。更新功能定义如下。
def updateFunction(newValues : seq[Int], runningCount:option[Int]);
Option[Int] = {
val newCount = ... //给前序 runningCount 添加新值,获取新 count
Some(newCount)
}
此函数应用于含有键值对的 DStream 中(例如,在前面的单词计数示例中,在 DStream 含有 <word,1> 键值对)。它会针对里面的每个元素(如 WordCount 中的 Word)调用更新函数,其中,newValues 是最新的值,runningCount 是之前的值。
val runningCounts = pairs.updateStateByKey[Int](updateFunction._)
窗口转换操作
Spark Streaming 还提供了窗口的计算,它允许通过滑动窗口对数据进行转换,窗口转换操作如表 2 所示
转换 | 描述 |
---|---|
window(windowLength,slideInterval) | 返回一个基于源 DStream 的窗口批次计算得到新的 DStream |
countByWindow(windowLength,slideInterval) | 返回基于滑动窗口的 DStream 中的元素的数量 |
reduceByWindow(func,windowLength,slideInterval) | 基于滑动窗口对源 DStream 中的元素进行聚合操作,得到一个新的 DStream |
reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks]) | 基于滑动窗口对 <K,V> 键值对类型的 DStream 中的值按 K 使用聚合函数 func 进行聚合操作,得到一个新的 DStream |
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks]) |
一个更高效的实现版本,先对滑动窗口中新的时间间隔内的数据进行增量聚合,再移去最早的同等时间间隔内的数据统计量。 例如,计算 t+4 秒这个时刻过去 5 秒窗口的 WordCount 时,可以将 t+3 时刻过去 5 秒的统计量加上 [t+3,t+4] 的统计量,再减去 [t-2,t-1] 的统计量,这种方法可以复用中间 3 秒的统计量,提高统计的效率 |
countByValueAndWindow(windowLength,slideInterval,[numTasks]) | 基于滑动窗口计算源 DStream 中每个 RDD 内每个元素出现的频次,并返回 DStream[<K,Long>],其中,K 是 RDD 中元素的类型,Long 是元素频次。Reduce 任务的数量可以通过一个可选参数进行配置 |
在 Spark Streaming 中,数据处理是按批进行的,而数据采集是逐条进行的,因此在 Spark Streaming 中会先设置好批处理间隔,当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。
对于窗口操作而言,在其窗口内部会有 N 个批处理数据,批处理数据的大小由窗口间隔决定,而窗口间隔指的就是窗口的持续时间。
在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数,即滑动间隔,它指的是经过多长时间窗口滑动一次形成新的窗口。滑动间隔默认情况下和批次间隔相同,而窗口间隔一般设置得要比它们两个大。在这里必须注意的一点是,滑动间隔和窗口间隔的大小一定得设置为批处理间隔的整数倍。
如图 1 所示,批处理间隔是 1 个时间单位,窗口间隔是 3 个时间单位,滑动间隔是 2 个时间单位。对于初始的窗口(time 1~time 3),只有窗口间隔满足了才会触发数据的处理。
这里需要注意,有可能初始的窗口没有被流入的数据撑满,但是随着时间的推进/窗口最终会被撑满。每过 2 个时间单位,窗口滑动一次,这时会有新的数据流入窗口,窗口则移去最早的 2 个时间单位的数据,而与最新的 2 个时间单位的数据进行汇总形成新的窗口(time 3~ time 5)。
发表评论