0%

这篇博文将讲述如何创建一个Spark SQL的demo程序。

创建Maven工程

在pom.xml文件中需要两个依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<properties>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.3</spark.version>
</properties>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

创建SparkSessionApp

在Spark 1.x时,是通过创建SQLContext()来获取context的。

在Spark 2.x后,改为使用SparkSession了。

args(0)表示输入文件的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package gy.finolo.spark

import org.apache.spark.sql.SparkSession

object SparkSessionApp {

def main(args: Array[String]): Unit = {

val path = args(0)

val sparkSession = SparkSession.builder()
.master("local[2]")
.appName("Spark Session App")
.getOrCreate()
val people = sparkSession.read.format("json").load(path)
people.printSchema()
people.show()
sparkSession.stop()
}

}

在IDEA中运行时,需要指定Program Arguments/usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json

1
2
3
4
cat /usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

提交作业

在本地测试完成后,可以打包,通过spark-submit提交作业运行。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionApp \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar \
/usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json

可以在console中看到输出的信息。

其中printSchema()语句打印的内容:

1
2
3
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

people.show()输出的内容:

1
2
3
4
5
6
7
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Spark运行模式

Spark 有很多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则运行在集群中,目前能很好的运行在 Yarn和 Mesos 中,当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,如果企业已经有 Yarn 或者 Mesos 环境,也是很方便部署的。

  • local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程;
  • standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HA
  • on yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
  • on mesos(集群模式): 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
  • on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3

local模式运行

1
spark-shell --master local[2]

2代表2个worker。

如果local[*],也是默认master选项,则自动获取机器cores数量。

standalone模式运行

Spark Standalone模式的架构和Hadoop hdfs/yarn很类似,1 master + n workers

配置conf/spark-env.sh文件

1
2
3
4
cd $SPARK_HOME
cd conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh

添加如下内容:

1
2
3
SPARK_MASTER_HOST=localhost
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=2g

启动Spark

1
2
3
4
cd $SPARK_HOME
sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/logs/spark-simon-org.apache.spark.deploy.master.Master-1-localhost.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/logs/spark-simon-org.apache.spark.deploy.worker.Worker-1-localhost.out

查看master日志

1
2
19/02/10 13:11:09 INFO Master: I have been elected leader! New state: ALIVE
19/02/10 13:11:12 INFO Master: Registering worker 192.168.1.6:51683 with 1 cores, 2.0 GB RAM

查看worker日志

1
19/02/10 13:11:12 INFO Worker: Successfully registered with master spark://localhost:7077

执行jps命令,可以看到有MasterWorker进程

1
2
3424 Master
3459 Worker

生成wordCount的输入文件

新建/usr/local/spark/data/words文件

1
vi /usr/local/spark/data/words

添加如下内容

1
2
3
hello,hello,world
hello,world,
welcome

启动Spark-shell

以standalone模式启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bin/spark-shell --master spark://localhost:7077
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/02/10 13:12:41 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
19/02/10 13:12:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/02/10 13:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.6:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190210131243-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

输入wordCount程序

1
2
3
4
5
6
7
8
9
10
scala> var file = spark.sparkContext.textFile("file:///usr/local/spark/data/words")
file: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/data MapPartitionsRDD[6] at textFile at <console>:23

scala> val wordCounts = file.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:25

scala> wordCounts.collect
res1: Array[(String, Int)] = Array((hello,3), (welcome,1), (world,2))

scala>

可以看到/usr/local/spark/data/words文件里面的单词被成功的统计了。

Spark会依赖Hadoop版本,当我们使用cdh版本的hadoop时,在Spark官网上下载不了对应的版本,这时就需要编译Spark了。

下载源码

到Spark官网 http://spark.apache.org/downloads.html 下载Spark的源码,并非已经Build好的安装包。

Spark release我选择的是2.2.3

package type选择Source Code

下载并解压

1
2
3
4
cd /usr/local/spark
wget https://archive.apache.org/dist/spark/spark-2.2.3/spark-2.2.3.tgz

tar -zxvf spark-2.2.3.tgz

构建发布版本

查看dev/make-distribution.sh源码,可以知道构建后的包的文件名为spark-$VERSION-bin-$NAME.tgz,所以--name参数设置为2.6.0-cdh5.7.0,-P是指定使用pom.xml中指定的profile,-D是指使用指定的Dependency。

1
2
cd spark-2.2.3
./dev/make-distribution.sh --name 2.6.0-cdh5.7.0 --tgz -Phadoop-2.6 -Phive -Phive-thriftserver -Pmesos -Pyarn -Dhadoop.version=2.6.0-cdh5.7.0

构建过程中,我们会发现出现了以下错误:

1
2
3
4
5
6
7
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 46.000 s (Wall Clock)
[INFO] Finished at: 2019-02-10T11:28:39+08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project spark-launcher_2.11: Could not resolve dependencies for project org.apache.spark:spark-launcher_2.11:jar:2.2.3: Could not find artifact org.apache.hadoop:hadoop-client:jar:2.6.0-cdh5.7.0 in alimaven (http://maven.aliyun.com/nexus/content/groups/public/) -> [Help 1]

表明在现有的maven仓库中,找不到cdh版本的jar包。所以,我们得在pom.xml中的repositories中添加cdh的仓库地址。

编辑pom.xml文件,在repositories标签下的maven central仓库后面添加cloudera的仓库。

1
2
3
4
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

然后再执行刚才的命令,经过12分钟,spark-2.2.3-bin-2.6.0-cdh5.7.0.tgz安装包构建成功。

1
2
3
4
5
6
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12:08 min (Wall Clock)
[INFO] Finished at: 2019-02-10T11:59:57+08:00
[INFO] ------------------------------------------------------------------------

解压spark-2.2.3-bin-2.6.0-cdh5.7.0.tgz包到上级spark目录

1
2
cd ..
tar -zxvf spark-2.2.3/spark-2.2.3-bin-2.6.0-cdh5.7.0.tgz -C .

设置环境变量

1
vi ~/.bash_profile

添加内容

1
export SPARK_HOME=/usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0

使设置生效

1
source ~/.bash_profile

启动Spark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
cd $SPARK_HOME
bin/spark-shell --master local[*]

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/02/10 12:13:43 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
19/02/10 12:13:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/02/10 12:13:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.6:4040
Spark context available as 'sc' (master = local[*], app id = local-1549772024897).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Spark成功启动。