0%

从Hive平滑过渡到Spark SQL

这篇文章记录一下如何在Spark下面像Hive一样查询表的内容。

启动Spark

1
bin/spark-shell --master local[2]

查看一下有哪些表

1
2
3
4
5
6
7
8
scala> spark.sql("show tables").show
19/02/12 14:37:49 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/02/12 14:37:49 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
19/02/12 14:37:50 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

没有查看到东西。

所以需要把hive下面的hive-site.xml文件复制到spark下面。

1
cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf

重启Spark

1
bin/spark-shell --master local[2]

再次执行命令。

1
2
3
4
5
6
7
8
scala> spark.sql("show tables").show

19/02/12 15:53:46 WARN HiveMetaStore: Retrying creating default database after error: Error creating transactional connection factory
javax.jdo.JDOFatalInternalException: Error creating transactional connection factory

Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException:
The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
Please check your CLASSPATH specification, and the name of the driver.

说明MySQL的驱动未被加载。

第三次启动Spark,通过参数--jars指定MySQL驱动的位置。

1
2
bin/spark-shell --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

查看表

1
2
3
4
5
6
7
8
9
10
scala> spark.sql("show tables").show

...

+--------+--------------+-----------+
|database| tableName|isTemporary|
+--------+--------------+-----------+
| default| emp| false|
| default|hive_wordcount| false|
+--------+--------------+-----------+

可以看到有两个表,emphive_wordcount

验证SQL查询

在hive client里面新建一个dept表

1
2
3
4
5
6
hive>
CREATE TABLE dept(
deptno INT,
dname STRING,
loc STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

创建dept数据文件

1
2
3
4
5
6
cat /usr/local/spark/data/dept

10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

中间字段间间隔的是tab。

加载数据到dept表里

1
hive> LOAD DATA LOCAL INPATH '/usr/local/spark/data/dept' OVERWRITE INTO TABLE dept;

OVERWRITE参数的意义是:如果表里面有旧的数据,可以通过OVERWRITE先清除旧数据。

验证查询

回到spark shell。

1
2
3
4
5
6
7
8
9
scala> spark.sql("SELECT * FROM dept").show
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+

如果省略spark.sql

我们发现在执行SQL语句时,我们在都需要写spark.sql(sql query).show。这样比起hive来,就没那么简便。

其实是可以通过启动spark-sql来达到这个简化目标的。

能spark-sql方式启动spark shell。

1
2
bin/spark-sql --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

启动后,直接写SQL语句查询

1
spark-sql> SELECT * FROM emp e JOIN dept d ON e.deptno = d.deptno;

将返回如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
19/02/12 16:39:50 INFO DAGScheduler: Job 1 finished: processCmd at CliDriver.java:376, took 0.059725 s
7369 SMITH CLERK 7902 NULL 800.0 NULL 20 20 RESEARCH DALLAS
7499 ALLEN SALESMAN 7698 NULL 1600.0 300.0 30 30 SALES CHICAGO
7521 WARD SALESMAN 7698 NULL 1250.0 500.0 30 30 SALES CHICAGO
7566 JONES MANAGER 7839 NULL 2975.0 NULL 20 20 RESEARCH DALLAS
7654 MARTIN SALESMAN 7698 NULL 1250.0 1400.0 30 30 SALES CHICAGO
7698 BLAKE MANAGER 7839 NULL 2850.0 NULL 30 30 SALES CHICAGO
7782 CLARK MANAGER 7839 NULL 2450.0 NULL 10 10 ACCOUNTING NEW YORK
7788 SCOTT ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS
7839 KING PRESIDENT NULL NULL 5000.0 NULL 10 10 ACCOUNTING NEW YORK
7844 TURNER SALESMAN 7698 NULL 1500.0 0.0 30 30 SALES CHICAGO
7876 ADAMS CLERK 7788 NULL 1100.0 NULL 20 20 RESEARCH DALLAS
7900 JAMES CLERK 7698 NULL 950.0 NULL 30 30 SALES CHICAGO
7902 FORD ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS
7934 MILLER CLERK 7782 NULL 1300.0 NULL 10 10 ACCOUNTING NEW YORK
Time taken: 3.793 seconds, Fetched 14 row(s)
...

查看执行计划

创建一个表

1
spark-sql> CREATE TABLE t (key STRING, value STRING);

执行如下SQL语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
spark-sql> EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3;

19/02/12 16:45:52 INFO SparkSqlParser: Parsing command: EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3
19/02/12 16:45:52 INFO HiveMetaStore: 0: get_table : db=default tbl=t
19/02/12 16:45:52 INFO audit: ugi=simon ip=unknown-ip-addr cmd=get_table : db=default tbl=t
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO HiveMetaStore: 0: get_table : db=default tbl=t
19/02/12 16:45:52 INFO audit: ugi=simon ip=unknown-ip-addr cmd=get_table : db=default tbl=t
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
19/02/12 16:45:52 INFO CatalystSqlParser: Parsing command: string
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (2 + 3)), None)]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
:- 'SubqueryAlias a
: +- 'UnresolvedRelation `t`
+- 'SubqueryAlias b
+- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double
Project [(cast(key#34 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- Join Inner, ((key#34 = key#36) && (cast(key#34 as int) > 3))
:- SubqueryAlias a
: +- SubqueryAlias t
: +- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- SubqueryAlias b
+- SubqueryAlias t
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#36, value#37]

== Optimized Logical Plan ==
Project [(cast(key#34 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- Join Inner, (key#34 = key#36)
:- Project [key#34]
: +- Filter (isnotnull(key#34) && (cast(key#34 as int) > 3))
: +- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- Project [key#36]
+- Filter (isnotnull(key#36) && (cast(key#36 as int) > 3))
+- HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#36, value#37]

== Physical Plan ==
*Project [(cast(key#34 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#38]
+- *SortMergeJoin [key#34], [key#36], Inner
:- *Sort [key#34 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#34, 200)
: +- *Filter (isnotnull(key#34) && (cast(key#34 as int) > 3))
: +- HiveTableScan [key#34], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#34, value#35]
+- *Sort [key#36 ASC NULLS FIRST], false, 0
+- ReusedExchange [key#36], Exchange hashpartitioning(key#34, 200)
Time taken: 0.117 seconds, Fetched 1 row(s)
19/02/12 16:45:52 INFO CliDriver: Time taken: 0.117 seconds, Fetched 1 row(s)

关于执行计划的详细分析,我们会在后面的博客中详细分享。