Spark Streaming

image

image

Spark Streaming

接收数据,把我们的数据拆分成一个个batche(批次),
之后按照批次交给Spark引擎处理数据
之后处理后的数据,以流的形式输出

SparkStreaming 核心抽象:Dstream

Spark Core的核心抽象:RDD,他的5大特性,对应源码中的5大方法

入口:StreamingContext

一、StreamingContext创建Dstream

1
2
3
4
5
6
7
8
9
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

StreamingContext(spark引擎的相关属性,batch的间隔)
二、读取文件流,创建Dstream

1
2
// 创建一个DStream,它将连接到主机名:端口,比如localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

三、处理相关的数据

处理后生成新的DStream。

1
2
3
4
5
6
7
// 将数据拆分为单词
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// 打印前十个元素
wordCounts.print()
将DStream字进一步映射(一对一变换)到(word,1)对的DStream,
然后将其缩小以获得每批数据中的字的频率。 
最后,wordCounts.print()将打印每秒生成的一些计数。
请注意,执行这些行时,Spark Streaming仅设置启动时将执行的计算,
并且尚未启动实际处理。
要在设置完所有转换后开始处理,我们最终调用 

四、启动计算引擎,开始处理计算

1
2
ssc.start()             // 启动计算引擎
ssc.awaitTermination() // 等待计算终止

五、启动流式数据引擎(kafka/nc……)
发送数据

1
2
3
4
5
6
7
8
$ nc -lk 9999
hello world
hello world
hello world
hello world
hello world
hello world
hello world

六、启动Streaming,处理数据

1
2
3
4
5
6
7
8
9
10
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2018-2020 丁振莹
  • 访问人数: | 浏览次数:

你的每一分支持,是我努力下去的最大的力量 ٩(๑❛ᴗ❛๑)۶

支付宝
微信