Finology 大数据金融

通过大数据以量化金融

在这篇文章中,我们将使用SparkSession,打印出Hive中的emp表的内容。

Spark SQL最基础程序,可以参考博文 创建Spark SQL程序

创建SparkSessionHiveApp程序

pom.xml

1
2
3
4
5
<properties>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.3</spark.version>
</properties>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- hive support -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>

SparkSessionHiveApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package gy.finolo.spark

import org.apache.spark.sql.SparkSession

object SparkSessionHiveApp {

def main(args: Array[String]): Unit = {

// 创建Spark Session
val sparkSession = SparkSession.builder()
.master("local[2]")
.appName("Spark Session Hive App")
.getOrCreate()

// 业务处理
sparkSession.table("emp").show()

// 关闭资源
sparkSession.stop()
}

}

创建emp表

创建emp原始文件

1
vi /usr/local/spark/data/emp.csv

写入如下内容到emp.csv文件中。注意,有些字段为空值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

在Hive中创建emp表

Hive的基本使用,可以参考博文 Hive环境搭建

语句后面必须要有分号。

hive>
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate DATE,
sal DOUBLE,
comm DOUBLE,
deptno INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;

导入数据到emp表中

1
hive> LOAD DATA LOCAL INPATH '/usr/local/spark/data/emp.csv' INTO TABLE emp;

hiredate这里有一些问题,以后再解决。

在IDEA中运行程序

可以看到emp表中的结果被成功打印在了控制台上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
19/02/12 00:00:23 INFO DAGScheduler: Job 0 finished: show at SparkSessionHiveApp.scala:21, took 0.340480 s
+-----+------+---------+----+--------+------+------+------+
|empno| ename| job| mgr|hiredate| sal| comm|deptno|
+-----+------+---------+----+--------+------+------+------+
| 7369| SMITH| CLERK|7902| null| 800.0| null| 20|
| 7499| ALLEN| SALESMAN|7698| null|1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| null|1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| null|2975.0| null| 20|
| 7654|MARTIN| SALESMAN|7698| null|1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| null|2850.0| null| 30|
| 7782| CLARK| MANAGER|7839| null|2450.0| null| 10|
| 7788| SCOTT| ANALYST|7566| null|3000.0| null| 20|
| 7839| KING|PRESIDENT|null| null|5000.0| null| 10|
| 7844|TURNER| SALESMAN|7698| null|1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| null|1100.0| null| 20|
| 7900| JAMES| CLERK|7698| null| 950.0| null| 30|
| 7902| FORD| ANALYST|7566| null|3000.0| null| 20|
| 7934|MILLER| CLERK|7782| null|1300.0| null| 10|
+-----+------+---------+----+--------+------+------+------+

打包并提交作业

在测试环境上面已经可以成功运行,我们现在打好包,以spark-submit方式提交作业。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionHiveApp \
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar

需要通过jars参数指定JDBC驱动的位置。

如果需要加载的jar很多怎么办?我将在以后的博文中讲到这个问题。

可能遇到的问题

JDBC驱动未添加

1
2
3
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: 
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

解决方案:

如果是在IDEA中开发运行的话,那需要添加驱动的jar包。

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>

DatastoreDriverNotFound

1
2
3
org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: 
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

和上面那个错误是类似的,我们在提交作业时,必须要通过--jars指定mysql-connector-java.jar的位置。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionHiveApp \
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar

Table or view not found

1
2
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Table or view not found: emp;

解决方案:

把Hive conf下面的hive-site.xml拷贝到工程的resources目录下面。

1
cp /usr/local/hive/hive-1.1.0-cdh5.7.0/conf/hive-site.xml spark-sql-demo/src/resources

这篇博文将讲述如何创建一个Spark SQL的demo程序。

创建Maven工程

在pom.xml文件中需要两个依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<properties>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.3</spark.version>
</properties>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

创建SparkSessionApp

在Spark 1.x时,是通过创建SQLContext()来获取context的。

在Spark 2.x后,改为使用SparkSession了。

args(0)表示输入文件的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package gy.finolo.spark

import org.apache.spark.sql.SparkSession

object SparkSessionApp {

def main(args: Array[String]): Unit = {

val path = args(0)

val sparkSession = SparkSession.builder()
.master("local[2]")
.appName("Spark Session App")
.getOrCreate()
val people = sparkSession.read.format("json").load(path)
people.printSchema()
people.show()
sparkSession.stop()
}

}

在IDEA中运行时,需要指定Program Arguments/usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json

1
2
3
4
cat /usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

提交作业

在本地测试完成后,可以打包,通过spark-submit提交作业运行。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionApp \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar \
/usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json

可以在console中看到输出的信息。

其中printSchema()语句打印的内容:

1
2
3
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

people.show()输出的内容:

1
2
3
4
5
6
7
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Spark运行模式

Spark 有很多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则运行在集群中,目前能很好的运行在 Yarn和 Mesos 中,当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,如果企业已经有 Yarn 或者 Mesos 环境,也是很方便部署的。

  • local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程;
  • standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HA
  • on yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
  • on mesos(集群模式): 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
  • on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3

local模式运行

1
spark-shell --master local[2]

2代表2个worker。

如果local[*],也是默认master选项,则自动获取机器cores数量。

standalone模式运行

Spark Standalone模式的架构和Hadoop hdfs/yarn很类似,1 master + n workers

配置conf/spark-env.sh文件

1
2
3
4
cd $SPARK_HOME
cd conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh

添加如下内容:

1
2
3
SPARK_MASTER_HOST=localhost
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=2g

启动Spark

1
2
3
4
cd $SPARK_HOME
sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/logs/spark-simon-org.apache.spark.deploy.master.Master-1-localhost.out
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/logs/spark-simon-org.apache.spark.deploy.worker.Worker-1-localhost.out

查看master日志

1
2
19/02/10 13:11:09 INFO Master: I have been elected leader! New state: ALIVE
19/02/10 13:11:12 INFO Master: Registering worker 192.168.1.6:51683 with 1 cores, 2.0 GB RAM

查看worker日志

1
19/02/10 13:11:12 INFO Worker: Successfully registered with master spark://localhost:7077

执行jps命令,可以看到有MasterWorker进程

1
2
3424 Master
3459 Worker

生成wordCount的输入文件

新建/usr/local/spark/data/words文件

1
vi /usr/local/spark/data/words

添加如下内容

1
2
3
hello,hello,world
hello,world,
welcome

启动Spark-shell

以standalone模式启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bin/spark-shell --master spark://localhost:7077
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/02/10 13:12:41 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
19/02/10 13:12:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/02/10 13:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.6:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20190210131243-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

输入wordCount程序

1
2
3
4
5
6
7
8
9
10
scala> var file = spark.sparkContext.textFile("file:///usr/local/spark/data/words")
file: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/data MapPartitionsRDD[6] at textFile at <console>:23

scala> val wordCounts = file.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:25

scala> wordCounts.collect
res1: Array[(String, Int)] = Array((hello,3), (welcome,1), (world,2))

scala>

可以看到/usr/local/spark/data/words文件里面的单词被成功的统计了。

0%