0%

这篇文章记录一下如何在Spark下面像Hive一样查询表的内容。

启动Spark

1
bin/spark-shell --master local[2]

查看一下有哪些表

1
2
3
4
5
6
7
8
scala> spark.sql("show tables").show
19/02/12 14:37:49 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/02/12 14:37:49 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
19/02/12 14:37:50 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

没有查看到东西。

所以需要把hive下面的hive-site.xml文件复制到spark下面。

1
cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf

重启Spark

1
bin/spark-shell --master local[2]

再次执行命令。

1
2
3
4
5
6
7
8
scala> spark.sql("show tables").show

19/02/12 15:53:46 WARN HiveMetaStore: Retrying creating default database after error: Error creating transactional connection factory
javax.jdo.JDOFatalInternalException: Error creating transactional connection factory

Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

说明MySQL的驱动未被加载。

第三次启动Spark,通过参数--jars指定MySQL驱动的位置。

1
2
bin/spark-shell --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

查看表

1
2
3
4
5
6
7
8
9
10
scala> spark.sql("show tables").show

...

+--------+--------------+-----------+
|database| tableName|isTemporary|
+--------+--------------+-----------+
| default| emp| false|
| default|hive_wordcount| false|
+--------+--------------+-----------+

可以看到有两个表,emphive_wordcount

验证SQL查询

在hive client里面新建一个dept表

1
2
3
4
5
6
hive>
CREATE TABLE dept(
deptno INT,
dname STRING,
loc STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

创建dept数据文件

1
2
3
4
5
6
cat /usr/local/spark/data/dept

10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

中间字段间间隔的是tab。

加载数据到dept表里

1
hive> LOAD DATA LOCAL INPATH '/usr/local/spark/data/dept' OVERWRITE INTO TABLE dept;

OVERWRITE参数的意义是:如果表里面有旧的数据,可以通过OVERWRITE先清除旧数据。

验证查询

回到spark shell。

1
2
3
4
5
6
7
8
9
scala> spark.sql("SELECT * FROM dept").show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+

如果省略spark.sql

我们发现在执行SQL语句时,我们在都需要写spark.sql(sql query).show。这样比起hive来,就没那么简便。

其实是可以通过启动spark-sql来达到这个简化目标的。

能spark-sql方式启动spark shell。

1
2
bin/spark-sql --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

启动后,直接写SQL语句查询

1
spark-sql> SELECT * FROM emp e JOIN dept d ON e.deptno = d.deptno;

将返回如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
19/02/12 16:39:50 INFO DAGScheduler: Job 1 finished: processCmd at CliDriver.java:376, took 0.059725 s
7369 SMITH CLERK 7902 NULL 800.0 NULL 20 20 RESEARCH DALLAS
7499 ALLEN SALESMAN 7698 NULL 1600.0 300.0 30 30 SALES CHICAGO
7521 WARD SALESMAN 7698 NULL 1250.0 500.0 30 30 SALES CHICAGO
7566 JONES MANAGER 7839 NULL 2975.0 NULL 20 20 RESEARCH DALLAS
7654 MARTIN SALESMAN 7698 NULL 1250.0 1400.0 30 30 SALES CHICAGO
7698 BLAKE MANAGER 7839 NULL 2850.0 NULL 30 30 SALES CHICAGO
7782 CLARK MANAGER 7839 NULL 2450.0 NULL 10 10 ACCOUNTING NEW YORK
7788 SCOTT ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS
7839 KING PRESIDENT NULL NULL 5000.0 NULL 10 10 ACCOUNTING NEW YORK
7844 TURNER SALESMAN 7698 NULL 1500.0 0.0 30 30 SALES CHICAGO
7876 ADAMS CLERK 7788 NULL 1100.0 NULL 20 20 RESEARCH DALLAS
7900 JAMES CLERK 7698 NULL 950.0 NULL 30 30 SALES CHICAGO
7902 FORD ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS
7934 MILLER CLERK 7782 NULL 1300.0 NULL 10 10 ACCOUNTING NEW YORK
Time taken: 3.793 seconds, Fetched 14 row(s)
...

查看执行计划

创建一个表

1
spark-sql> CREATE TABLE t (key STRING, value STRING);

执行如下SQL语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
spark-sql> EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3;

19/02/12 16:45:52 INFO SparkSqlParser: Parsing command: EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3
19/02/12 16:45:52 INFO HiveMetaStore: 0: get_table : db=default tbl=t
19/02/12 16:45:52 INFO audit: ugi=simon ip=unknown-ip-addr cmd=get_table : db=default tbl=t
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO HiveMetaStore: 0: get_table : db=default tbl=t
19/02/12 16:45:52 INFO audit: ugi=simon ip=unknown-ip-addr cmd=get_table : db=default tbl=t
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (2 + 3)), None)]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
:- 'SubqueryAlias a
: +- 'UnresolvedRelation `t`
+- 'SubqueryAlias b
+- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double
Project [(cast(key#34 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- Join Inner, ((key#34 = key#36) && (cast(key#34 as int) > 3))
:- SubqueryAlias a
: +- SubqueryAlias t
: +- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- SubqueryAlias b
+- SubqueryAlias t
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#36, value#37]

== Optimized Logical Plan ==
Project [(cast(key#34 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- Join Inner, (key#34 = key#36)
:- Project [key#34]
: +- Filter (isnotnull(key#34) && (cast(key#34 as int) > 3))
: +- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- Project [key#36]
+- Filter (isnotnull(key#36) && (cast(key#36 as int) > 3))
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#36, value#37]

== Physical Plan ==
*Project [(cast(key#34 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- *SortMergeJoin [key#34], [key#36], Inner
:- *Sort [key#34 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#34, 200)
: +- *Filter (isnotnull(key#34) && (cast(key#34 as int) > 3))
: +- HiveTableScan [key#34], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- *Sort [key#36 ASC NULLS FIRST], false, 0
+- ReusedExchange [key#36], Exchange hashpartitioning(key#34, 200)
Time taken: 0.117 seconds, Fetched 1 row(s)
19/02/12 16:45:52 INFO CliDriver: Time taken: 0.117 seconds, Fetched 1 row(s)

关于执行计划的详细分析,我们会在后面的博客中详细分享。

在这篇文章中,我们将使用SparkSession,打印出Hive中的emp表的内容。

Spark SQL最基础程序,可以参考博文 创建Spark SQL程序

创建SparkSessionHiveApp程序

pom.xml

1
2
3
4
5
<properties>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.3</spark.version>
</properties>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- hive support -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>

SparkSessionHiveApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package gy.finolo.spark

import org.apache.spark.sql.SparkSession

object SparkSessionHiveApp {

def main(args: Array[String]): Unit = {

// 创建Spark Session
val sparkSession = SparkSession.builder()
.master("local[2]")
.appName("Spark Session Hive App")
.getOrCreate()

// 业务处理
sparkSession.table("emp").show()

// 关闭资源
sparkSession.stop()
}

}

创建emp表

创建emp原始文件

1
vi /usr/local/spark/data/emp.csv

写入如下内容到emp.csv文件中。注意,有些字段为空值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

在Hive中创建emp表

Hive的基本使用,可以参考博文 Hive环境搭建

语句后面必须要有分号。

hive>
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate DATE,
sal DOUBLE,
comm DOUBLE,
deptno INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’;

导入数据到emp表中

1
hive> LOAD DATA LOCAL INPATH '/usr/local/spark/data/emp.csv' INTO TABLE emp;

hiredate这里有一些问题,以后再解决。

在IDEA中运行程序

可以看到emp表中的结果被成功打印在了控制台上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
19/02/12 00:00:23 INFO DAGScheduler: Job 0 finished: show at SparkSessionHiveApp.scala:21, took 0.340480 s
+-----+------+---------+----+--------+------+------+------+
|empno| ename| job| mgr|hiredate| sal| comm|deptno|
+-----+------+---------+----+--------+------+------+------+
| 7369| SMITH| CLERK|7902| null| 800.0| null| 20|
| 7499| ALLEN| SALESMAN|7698| null|1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| null|1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| null|2975.0| null| 20|
| 7654|MARTIN| SALESMAN|7698| null|1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| null|2850.0| null| 30|
| 7782| CLARK| MANAGER|7839| null|2450.0| null| 10|
| 7788| SCOTT| ANALYST|7566| null|3000.0| null| 20|
| 7839| KING|PRESIDENT|null| null|5000.0| null| 10|
| 7844|TURNER| SALESMAN|7698| null|1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| null|1100.0| null| 20|
| 7900| JAMES| CLERK|7698| null| 950.0| null| 30|
| 7902| FORD| ANALYST|7566| null|3000.0| null| 20|
| 7934|MILLER| CLERK|7782| null|1300.0| null| 10|
+-----+------+---------+----+--------+------+------+------+

打包并提交作业

在测试环境上面已经可以成功运行,我们现在打好包,以spark-submit方式提交作业。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionHiveApp \
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar

需要通过jars参数指定JDBC驱动的位置。

如果需要加载的jar很多怎么办?我将在以后的博文中讲到这个问题。

可能遇到的问题

JDBC驱动未添加

1
2
3
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: 
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

解决方案:

如果是在IDEA中开发运行的话,那需要添加驱动的jar包。

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>

DatastoreDriverNotFound

1
2
3
org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: 
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

和上面那个错误是类似的,我们在提交作业时,必须要通过--jars指定mysql-connector-java.jar的位置。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionHiveApp \
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar

Table or view not found

1
2
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Table or view not found: emp;

解决方案:

把Hive conf下面的hive-site.xml拷贝到工程的resources目录下面。

1
cp /usr/local/hive/hive-1.1.0-cdh5.7.0/conf/hive-site.xml spark-sql-demo/src/resources

这篇博文将讲述如何创建一个Spark SQL的demo程序。

创建Maven工程

在pom.xml文件中需要两个依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<properties>
<scala.version>2.11.12</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spark.version>2.2.3</spark.version>
</properties>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>

创建SparkSessionApp

在Spark 1.x时,是通过创建SQLContext()来获取context的。

在Spark 2.x后,改为使用SparkSession了。

args(0)表示输入文件的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package gy.finolo.spark

import org.apache.spark.sql.SparkSession

object SparkSessionApp {

def main(args: Array[String]): Unit = {

val path = args(0)

val sparkSession = SparkSession.builder()
.master("local[2]")
.appName("Spark Session App")
.getOrCreate()
val people = sparkSession.read.format("json").load(path)
people.printSchema()
people.show()
sparkSession.stop()
}

}

在IDEA中运行时,需要指定Program Arguments/usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json

1
2
3
4
cat /usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

提交作业

在本地测试完成后,可以打包,通过spark-submit提交作业运行。

1
2
3
4
5
bin/spark-submit \
--master local[2] \
--class gy.finolo.spark.SparkSessionApp \
/Users/simon/Development/workspace/scala/spark-sql-demo/target/spark-sql-demo-1.0-SNAPSHOT.jar \
/usr/local/spark/spark-2.2.3-bin-hadoop2.6/examples/src/main/resources/people.json

可以在console中看到输出的信息。

其中printSchema()语句打印的内容:

1
2
3
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

people.show()输出的内容:

1
2
3
4
5
6
7
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+