0%

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

这篇文章将描述运行Spark官网上一个案例程序。

执行spark-submit命令,查看命令使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cd $SPARK_HOME

bin/spark-submit

Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
...

运行NetworkWordCount案例,我们需要指定master, class参数。

1
2
3
bin/spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount examples/jars/spark-examples_2.11-2.2.3.jar

Usage: NetworkWordCount <hostname> <port>

终端又给我们使用方法的提示,需要提供网络服务器地址hostname和端口port

如果网络上还没有提供服务时,执行上述命令,则会遇到如下错误。

1
2
19/01/24 10:49:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting
to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)

所以,另开一个Shell终端,启动Netcat, 在端口9999上提供服务

1
nc -lk 9999

继续执行NetworkWordCount案例

1
2
3
4
5
6
7
8
9
10
11
bin/spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount examples/jars/spark-examples_2.11-2.2.3.jar localhost 9999

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/24 11:04:46 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
19/01/24 11:04:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/24 11:04:47 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
19/01/24 11:04:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/24 11:04:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
-------------------------------------------
Time: 1548299089000 ms
-------------------------------------------

我们在Netcat终端上输入几个单词,看看统计的结果。

1
hello hello spark spark spark

很快的,我们在spark终端上就能看到统计的结果了

1
2
3
4
5
6

-------------------------------------------
Time: 1548298250000 ms
-------------------------------------------
(hello,2)
(spark,3)

我们再换一种方式,通过spark-shell的方式,输入代码,运行一下单词统计的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bin/spark-shell --master local[2]

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/24 12:45:00 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
19/01/24 12:45:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/24 12:45:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.112:4040
Spark context available as 'sc' (master = local[2], app id = local-1548305101841).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

我们按exmaples案例,输入以下代码。源代码可参看 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

1
2
3
4
5
6
7
8
9
10
11
scala>

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

netcat终端输入

1
hello hello spark spark spark

我们一样能看到统计的结果

1
2
3
4
5
6

-------------------------------------------
Time: 1548305187000 ms
-------------------------------------------
(hello,2)
(spark,3)

使用IntelliJ IDEA,我们在给Maven项目添加依赖时,可以设置让IDEA自动提示依赖包。

Preferences > Build, Execution, Deployment > Build Tools -> Maven > Repositories > Indexed Maven Repositores

选中URL,再点击Update按钮。更新后,在添加依赖时,则可以自动提示了。

通过IDEA创建一个Maven项目, spark-streaming-demo

编辑pom.xml文件

因为我们使用了CDH 5组件,所以需要引入CDH 5的Maven仓库。

project标签下面加入

1
2
3
4
5
6
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

properties标签下面添加组件版本信息

1
2
3
4
5
6
<properties>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.3</spark.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
<hbase.version>1.2.0-cdh5.7.0</hbase.version>
</properties>

dependencies标签里面添加如下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!-- Hadoop 依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<!-- HBase 依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

<!-- Spark streaming 依赖 -->

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>