0%

运行Spark Streaming案例

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

这篇文章将描述运行Spark官网上一个案例程序。

执行spark-submit命令,查看命令使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cd $SPARK_HOME

bin/spark-submit

Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
...

运行NetworkWordCount案例,我们需要指定master, class参数。

1
2
3
bin/spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount examples/jars/spark-examples_2.11-2.2.3.jar

Usage: NetworkWordCount <hostname> <port>

终端又给我们使用方法的提示,需要提供网络服务器地址hostname和端口port

如果网络上还没有提供服务时,执行上述命令,则会遇到如下错误。

1
2
19/01/24 10:49:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting
to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)

所以,另开一个Shell终端,启动Netcat, 在端口9999上提供服务

1
nc -lk 9999

继续执行NetworkWordCount案例

1
2
3
4
5
6
7
8
9
10
11
bin/spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount examples/jars/spark-examples_2.11-2.2.3.jar localhost 9999

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/24 11:04:46 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
19/01/24 11:04:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/24 11:04:47 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
19/01/24 11:04:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/24 11:04:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
-------------------------------------------
Time: 1548299089000 ms
-------------------------------------------

我们在Netcat终端上输入几个单词,看看统计的结果。

1
hello hello spark spark spark

很快的,我们在spark终端上就能看到统计的结果了

1
2
3
4
5
6

-------------------------------------------
Time: 1548298250000 ms
-------------------------------------------
(hello,2)
(spark,3)

我们再换一种方式,通过spark-shell的方式,输入代码,运行一下单词统计的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bin/spark-shell --master local[2]

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/24 12:45:00 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
19/01/24 12:45:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/24 12:45:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.112:4040
Spark context available as 'sc' (master = local[2], app id = local-1548305101841).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

我们按exmaples案例,输入以下代码。源代码可参看 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

1
2
3
4
5
6
7
8
9
10
11
scala>

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

netcat终端输入

1
hello hello spark spark spark

我们一样能看到统计的结果

1
2
3
4
5
6

-------------------------------------------
Time: 1548305187000 ms
-------------------------------------------
(hello,2)
(spark,3)