0%

方便快速从不同的数据源(json, parquet, rdbms),经过混合处理(json join parquet)
再将处理结果以特定的格式(json, parquet)写回到指定的系统中(HDFS, S3)

spark.read.format(format)

format
built-in json, parquet jdbc, csv(2.x)
packages: 外部的非SPark内置 spark-packages.org

spark.write

操作parquet

ParquetApp

main

spark = SparkSession

df = spark.read.format(“parquet”).load(users.parquet”)
df.printSchema
df.select*

new_df.write.format(“json”).save(new_path)

默认处理parquet数据

spark-sql

create temporary view parquetTable
USING…
Options(
path: fsafsdfsafdsf
)

select * from parquettable

spark.read.format().option(path).load()

处理Hive

Df写入到hive
write.writeAsTable()

设置分区的数量,默认200

处理MySQL(jdbc)

当我们把Spark Streaming程序开发好以后,放到测试环境测试时,如何通过Maven打包呢?

在这篇文章中,我将介绍两种Maven的build方式。

推荐使用第二种方式。

所有依赖打成一个jar包

通过这种方式,把所有的依赖都打成一个大的jar包。

  • 优点

通过spark-submit执行程序时,简单方便,不需要指定jars参数。

  • 缺点

依赖多的时候,有时候包会有几百M,在网络传输过程时效率不高。

pom.xml文件的build节点需要添加如下两个Maven插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

使用org.scala-tools:maven-scala-plugin插件有个好处是,如果项目里面有src/main/javasrc/main/scala两种语言混编时,可以成功编译源码。

1
2
[INFO] /Users/simon/Development/workspace/scala/spark-streaming-demo/src/main/java:-1: info: compiling
[INFO] /Users/simon/Development/workspace/scala/spark-streaming-demo/src/main/scala:-1: info: compiling

执行Spark任务命令:

1
2
3
bin/spark-submit --master local[2] \
--class gy.finolo.spark.project.StreamingApp \
../data/app/spark-streaming-demo-1.0-SNAPSHOT.jar localhost:9092 streamingtopic

打成一个小jar和lib依赖文件夹

  • 优点

Spark Streaming源码被修改后,打成小包就可以上传服务器,传输流量小,速度快。

  • 缺点

提交任务时,需要指定jars参数

pom.xml文件的build节点需要添加如下两个Maven插件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<excludeScope>provided</excludeScope> <!-- provided scope的依赖不需要放入到lib中 -->
</configuration>
</execution>
</executions>
</plugin>
</plugins>

执行Spark任务命令,需要指定jars参数,注意:这里不是lib的地址。

1
2
3
4
bin/spark-submit --master local[2] \
--class gy.finolo.spark.project.StreamingApp \
--jars $(echo ../data/app/lib/*.jar | tr ' ' ',') \
../data/lib/spark-streaming-demo-1.0-SNAPSHOT.jar localhost:9092 streamingtopic

$(echo ../data/app/lib/*.jar | tr ' ' ',')代码的意思是把jar包名以逗号分隔符拼接成一个字符串。

在Mac OS X下使用crontab创建定时任务时报错。

1
2
crontab: no crontab for root - using an empty one
crontab: "/usr/bin/vi" exited with status 1

原因大概是因为没有安装vi,目前我们使用的vi都是vim

所以,可以按如下方式使用。

1
EDITOR=vim crontab -e

成功退出后,控制台上会打印如下内容:

1
crontab: installing new crontab