StructuredStreaming-flatMapGroupsWithState

创建方式

1
2
3
4
flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U]

keyValueGroupedDataset 流聚合

KeyValueGroupedDataset 表示作为Dataset.groupByKey

1
2
3
4
5
6
7
8
9
10
case class Signal(timestamp: java.sql.Timestamp, value: Long, deviceId: DeviceId)

val signals = spark.
readStream.
format("rate").
option("rowsPerSecond", 1).
load.
withColumn("value", $"value" % 10).
withColumn("deviceId", rint(rand() * 10) cast "int"). // <-- 10 devices randomly assigned to values
as[Signal]

###代码案例

1
2
3
val signalCounter: Dataset[EventsCounted] = signalsByDevice.flatMapGroupsWithState(
outputMode = OutputMode.Update(),
timeoutConf = GroupStateTimeout.NoTimeout)(func = countValuesPerKey)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def countValuesPerKey(deviceId: Int, signalsPerDevice: Iterator[Signal], state: GroupState[State]): Iterator[EventsCounted] = {
val values = signalsPerDevice.toList
println(s"Device: $deviceId")
println("startTime: "+System.currentTimeMillis())
//Thread.sleep(10000)

println(s"Signals (${values.size}):")
var aa: Seq[(FlatMapTest.Signal, Int)] =values.zipWithIndex
//values.zipWithIndex.foreach { case (v, idx) => println(s"$idx. $v") }
//println(s"State: $state")

// update the state with the count of elements for the key
val initialState: State = Map(deviceId -> 0)
val oldState: State = state.getOption.getOrElse(initialState)
// the name to highlight that the state is for the key only
val newValue = oldState(deviceId) + values.size
val newState = Map(deviceId -> newValue)
state.update(newState)

// you must not return as it's already consumed
// that leads to a very subtle error where no elements are in an iterator
// iterators are one-pass data structures
println("endTime: "+System.currentTimeMillis())
Iterator(EventsCounted(deviceId, newValue))
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信