Finology 大数据金融

通过大数据以量化金融

DataFrames API基本操作

启动Spark

需要加载MySQL JDBC驱动。

1
2
bin/spark-shell --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

加载json文件

1
2
3
scala>
val jsonPath = "/usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json"
val peopleDF = spark.read.format("json").load(jsonPath)

API操作

  • 输出dataframe对应的schema信息
1
2
3
4
scala> peopleDF.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
  • 输出数据集前20条记录
1
2
3
4
5
6
7
8
scala> peopleDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
  • 查询某列所有的数据

类似如下SQL语句:

1
SELECT name FROM people;
1
2
3
4
5
6
7
8
scala> peopleDF.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
  • FILTER查询

SQL的WHERE子句查询,类似如下SQL语句:

1
SELECT * FROM people WHERE age > 19;
1
2
3
4
5
6
peopleDF.filter("age > 19").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
  • 复杂查询

类似如下SQL语句:

1
SELECT name, age + 10 as new_age FROM people;
1
2
3
4
5
6
7
8
scala> peopleDF.select(peopleDF("name"), (peopleDF("age") + 10).as("new_age")).show
+-------+-------+
| name|new_age|
+-------+-------+
|Michael| null|
| Andy| 40|
| Justin| 29|
+-------+-------+
  • 聚合

统计不同年龄的人数。类似如下SQL语句:

1
SELECT age, count(1) FROM people GROUP BY age;
1
2
3
4
5
6
7
8
scala> peopleDF.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
  • 应用MySQL的函数
1
2
3
4
5
6
scala> peopleDF.filter("SUBSTRING(name, 0, 1) = 'A'").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

我们可以查看MySQL自带了哪些函数

show(rowNums, truncated)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> spark.sql("show functions").show(30, false)
+---------------------+
|function |
+---------------------+
|! |
|% |
|& |
|* |
|+ |
|- |
|/ |
|< |
|<= |
|<=> |
|= |
...
  • 排序
1
2
3
4
5
6
7
8
scala> peopleDF.sort(peopleDF("name").desc).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 19| Justin|
| 30| Andy|
+----+-------+
  • JOIN
1
2
3
4
5
6
7
8
9
10
scala> 
val df2 = spark.read.format("json").load(jsonPath)
peopleDF.join(df2, peopleDF("name") === df2("name")).show
+----+-------+----+-------+
| age| name| age| name|
+----+-------+----+-------+
|null|Michael|null|Michael|
| 30| Andy| 30| Andy|
| 19| Justin| 19| Justin|
+----+-------+----+-------+
  • 其他
1
take, head(3), first

RDD转Dataframes

Inferring the Schema Using Reflection

通过反射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

Programmatically Specifying the Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

Dataframs转Dataset

1
2
3
4
5
case class Person(name: String, age: Long)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

我们已经有了spark-shellspark-sql,为什么还要使用Thriftserver呢?

  1. 每一个spark-shell / spark-sql都是一个Spark Application。

  2. Thriftserver不管启动多少个客户端(beeline / code),都只会产生一个Spark Application

启动thriftserver

需要把MySQL JDBC驱动通过jars参数加入进来。

1
2
3
4
cd $SPARK_HOME/

sbin/start-thriftserver.sh --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

查看进程

1
2
3
4
5
6
7
8
jps -m

21522 SparkSubmit --master local[2] --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
--name Thrift JDBC/ODBC Server
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar spark-internal
3459 Worker --webui-port 8081 spark://localhost:7077
18420 SparkSubmit --master local[2] --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar spark-internal

我们看到有两个SparkSubmit进程,21522那个进程就是刚才启动的那个,class是HiveThriftServer2

以spark-shell方式启动的class是SparkSQLCLIDriver

thriftserver端口

默认端口为10000,可以修改为其他端口。

在启动时增加hiveconf参数,比如把端口号设置为14000

1
--hiveconf hive.server2.thrift.port = 14000

beeline访问thriftserver

需要设置jdbc uri: jdbc:hive2://localhost:10000

1
2
3
4
5
6
7
8
9
10
bin/beeline -u jdbc:hive2://localhost:10000 -n simon
Connecting to jdbc:hive2://localhost:10000
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to: Spark SQL (version 2.2.3)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000>

查看emp表的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
0: jdbc:hive2://localhost:10000> SELECT * FROM emp;
+--------+---------+------------+-------+-----------+---------+---------+---------+--+
| empno | ename | job | mgr | hiredate | sal | comm | deptno |
+--------+---------+------------+-------+-----------+---------+---------+---------+--+
| 7369 | SMITH | CLERK | 7902 | NULL | 800.0 | NULL | 20 |
| 7499 | ALLEN | SALESMAN | 7698 | NULL | 1600.0 | 300.0 | 30 |
| 7521 | WARD | SALESMAN | 7698 | NULL | 1250.0 | 500.0 | 30 |
| 7566 | JONES | MANAGER | 7839 | NULL | 2975.0 | NULL | 20 |
| 7654 | MARTIN | SALESMAN | 7698 | NULL | 1250.0 | 1400.0 | 30 |
| 7698 | BLAKE | MANAGER | 7839 | NULL | 2850.0 | NULL | 30 |
| 7782 | CLARK | MANAGER | 7839 | NULL | 2450.0 | NULL | 10 |
| 7788 | SCOTT | ANALYST | 7566 | NULL | 3000.0 | NULL | 20 |
| 7839 | KING | PRESIDENT | NULL | NULL | 5000.0 | NULL | 10 |
| 7844 | TURNER | SALESMAN | 7698 | NULL | 1500.0 | 0.0 | 30 |
| 7876 | ADAMS | CLERK | 7788 | NULL | 1100.0 | NULL | 20 |
| 7900 | JAMES | CLERK | 7698 | NULL | 950.0 | NULL | 30 |
| 7902 | FORD | ANALYST | 7566 | NULL | 3000.0 | NULL | 20 |
| 7934 | MILLER | CLERK | 7782 | NULL | 1300.0 | NULL | 10 |
+--------+---------+------------+-------+-----------+---------+---------+---------+--+
14 rows selected (0.411 seconds)

通过程序访问thriftserver

添加依赖

1
2
3
4
5
6
<!-- Thriftserver 支持 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>

SparkSQLThriftServerApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package gy.finolo.spark

import java.sql.DriverManager


object SparkSQLThriftServerApp {

def main(args: Array[String]): Unit = {

// 1. 加载驱动
Class.forName("org.apache.hive.jdbc.HiveDriver")

// 2. 获取连接
val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000", "root", "")
val ps = conn.prepareStatement("SELECT empno, ename, sal FROM emp")
val rs = ps.executeQuery()

while (rs.next()) {
println("empno: " + rs.getInt("empno") +
", ename: " + rs.getString("ename") +
", salary: " + rs.getDouble("sal"))
}

rs.close()
ps.close()
conn.close()
}

}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
empno: 7369, ename: SMITH, salary: 800.0
empno: 7499, ename: ALLEN, salary: 1600.0
empno: 7521, ename: WARD, salary: 1250.0
empno: 7566, ename: JONES, salary: 2975.0
empno: 7654, ename: MARTIN, salary: 1250.0
empno: 7698, ename: BLAKE, salary: 2850.0
empno: 7782, ename: CLARK, salary: 2450.0
empno: 7788, ename: SCOTT, salary: 3000.0
empno: 7839, ename: KING, salary: 5000.0
empno: 7844, ename: TURNER, salary: 1500.0
empno: 7876, ename: ADAMS, salary: 1100.0
empno: 7900, ename: JAMES, salary: 950.0
empno: 7902, ename: FORD, salary: 3000.0
empno: 7934, ename: MILLER, salary: 1300.0

注意

  1. 通过程序访问thriftserver前,得保证thriftserver已经启动,不然运行程序时,会遇到如下错误:
1
2
Exception in thread "main" java.sql.SQLException: Could not open client transport with JDBC Uri: 
jdbc:hive2://localhost:10000: java.net.ConnectException: Connection refused (Connection refused)
  1. 我安装的Hive版本是hive-1.1.0-cdh5.7.0,但我在pom.xml里依赖的jar版本不对时,可能遇到如下异常。
1
2
3
4
Exception in thread "main" java.sql.SQLException: Could not open client transport with JDBC Uri: 
jdbc:hive2://localhost:10000: Could not establish connection to jdbc:hive2://localhost:10000:
Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null,
configuration:{use:database=default})

这篇文章记录一下如何在Spark下面像Hive一样查询表的内容。

启动Spark

1
bin/spark-shell --master local[2]

查看一下有哪些表

1
2
3
4
5
6
7
8
scala> spark.sql("show tables").show
19/02/12 14:37:49 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/02/12 14:37:49 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
19/02/12 14:37:50 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

没有查看到东西。

所以需要把hive下面的hive-site.xml文件复制到spark下面。

1
cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf

重启Spark

1
bin/spark-shell --master local[2]

再次执行命令。

1
2
3
4
5
6
7
8
scala> spark.sql("show tables").show

19/02/12 15:53:46 WARN HiveMetaStore: Retrying creating default database after error: Error creating transactional connection factory
javax.jdo.JDOFatalInternalException: Error creating transactional connection factory

Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

说明MySQL的驱动未被加载。

第三次启动Spark,通过参数--jars指定MySQL驱动的位置。

1
2
bin/spark-shell --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

查看表

1
2
3
4
5
6
7
8
9
10
scala> spark.sql("show tables").show

...

+--------+--------------+-----------+
|database| tableName|isTemporary|
+--------+--------------+-----------+
| default| emp| false|
| default|hive_wordcount| false|
+--------+--------------+-----------+

可以看到有两个表,emphive_wordcount

验证SQL查询

在hive client里面新建一个dept表

1
2
3
4
5
6
hive>
CREATE TABLE dept(
deptno INT,
dname STRING,
loc STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

创建dept数据文件

1
2
3
4
5
6
cat /usr/local/spark/data/dept

10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

中间字段间间隔的是tab。

加载数据到dept表里

1
hive> LOAD DATA LOCAL INPATH '/usr/local/spark/data/dept' OVERWRITE INTO TABLE dept;

OVERWRITE参数的意义是:如果表里面有旧的数据,可以通过OVERWRITE先清除旧数据。

验证查询

回到spark shell。

1
2
3
4
5
6
7
8
9
scala> spark.sql("SELECT * FROM dept").show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+

如果省略spark.sql

我们发现在执行SQL语句时,我们在都需要写spark.sql(sql query).show。这样比起hive来,就没那么简便。

其实是可以通过启动spark-sql来达到这个简化目标的。

能spark-sql方式启动spark shell。

1
2
bin/spark-sql --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

启动后,直接写SQL语句查询

1
spark-sql> SELECT * FROM emp e JOIN dept d ON e.deptno = d.deptno;

将返回如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
19/02/12 16:39:50 INFO DAGScheduler: Job 1 finished: processCmd at CliDriver.java:376, took 0.059725 s
7369 SMITH CLERK 7902 NULL 800.0 NULL 20 20 RESEARCH DALLAS
7499 ALLEN SALESMAN 7698 NULL 1600.0 300.0 30 30 SALES CHICAGO
7521 WARD SALESMAN 7698 NULL 1250.0 500.0 30 30 SALES CHICAGO
7566 JONES MANAGER 7839 NULL 2975.0 NULL 20 20 RESEARCH DALLAS
7654 MARTIN SALESMAN 7698 NULL 1250.0 1400.0 30 30 SALES CHICAGO
7698 BLAKE MANAGER 7839 NULL 2850.0 NULL 30 30 SALES CHICAGO
7782 CLARK MANAGER 7839 NULL 2450.0 NULL 10 10 ACCOUNTING NEW YORK
7788 SCOTT ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS
7839 KING PRESIDENT NULL NULL 5000.0 NULL 10 10 ACCOUNTING NEW YORK
7844 TURNER SALESMAN 7698 NULL 1500.0 0.0 30 30 SALES CHICAGO
7876 ADAMS CLERK 7788 NULL 1100.0 NULL 20 20 RESEARCH DALLAS
7900 JAMES CLERK 7698 NULL 950.0 NULL 30 30 SALES CHICAGO
7902 FORD ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS
7934 MILLER CLERK 7782 NULL 1300.0 NULL 10 10 ACCOUNTING NEW YORK
Time taken: 3.793 seconds, Fetched 14 row(s)
...

查看执行计划

创建一个表

1
spark-sql> CREATE TABLE t (key STRING, value STRING);

执行如下SQL语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
spark-sql> EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3;

19/02/12 16:45:52 INFO SparkSqlParser: Parsing command: EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3
19/02/12 16:45:52 INFO HiveMetaStore: 0: get_table : db=default tbl=t
19/02/12 16:45:52 INFO audit: ugi=simon ip=unknown-ip-addr cmd=get_table : db=default tbl=t
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO HiveMetaStore: 0: get_table : db=default tbl=t
19/02/12 16:45:52 INFO audit: ugi=simon ip=unknown-ip-addr cmd=get_table : db=default tbl=t
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (2 + 3)), None)]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
:- 'SubqueryAlias a
: +- 'UnresolvedRelation `t`
+- 'SubqueryAlias b
+- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double
Project [(cast(key#34 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- Join Inner, ((key#34 = key#36) && (cast(key#34 as int) > 3))
:- SubqueryAlias a
: +- SubqueryAlias t
: +- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- SubqueryAlias b
+- SubqueryAlias t
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#36, value#37]

== Optimized Logical Plan ==
Project [(cast(key#34 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- Join Inner, (key#34 = key#36)
:- Project [key#34]
: +- Filter (isnotnull(key#34) && (cast(key#34 as int) > 3))
: +- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- Project [key#36]
+- Filter (isnotnull(key#36) && (cast(key#36 as int) > 3))
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#36, value#37]

== Physical Plan ==
*Project [(cast(key#34 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- *SortMergeJoin [key#34], [key#36], Inner
:- *Sort [key#34 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#34, 200)
: +- *Filter (isnotnull(key#34) && (cast(key#34 as int) > 3))
: +- HiveTableScan [key#34], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- *Sort [key#36 ASC NULLS FIRST], false, 0
+- ReusedExchange [key#36], Exchange hashpartitioning(key#34, 200)
Time taken: 0.117 seconds, Fetched 1 row(s)
19/02/12 16:45:52 INFO CliDriver: Time taken: 0.117 seconds, Fetched 1 row(s)

关于执行计划的详细分析,我们会在后面的博客中详细分享。

0%