这篇文章记录一下如何在Spark下面像Hive一样查询表的内容。
启动Spark 1 bin/spark-shell --master local[2]
查看一下有哪些表 1 2 3 4 5 6 7 8 scala> spark.sql("show tables").show 19/02/12 14:37:49 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 19/02/12 14:37:49 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 19/02/12 14:37:50 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+
没有查看到东西。
所以需要把hive下面的hive-site.xml
文件复制到spark下面。
1 cp $HIVE_HOME/conf/hive-site.xml $SPARK_HOME/conf
重启Spark 1 bin/spark-shell --master local[2]
再次执行命令。
1 2 3 4 5 6 7 8 scala> spark.sql("show tables").show 19/02/12 15:53:46 WARN HiveMetaStore: Retrying creating default database after error: Error creating transactional connection factory javax.jdo.JDOFatalInternalException: Error creating transactional connection factory Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
说明MySQL的驱动未被加载。
第三次启动Spark,通过参数--jars
指定MySQL驱动的位置。
1 2 bin/spark-shell --master local[2] \ --jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar
查看表 1 2 3 4 5 6 7 8 9 10 scala> spark.sql("show tables").show ... +--------+--------------+-----------+ |database| tableName|isTemporary| +--------+--------------+-----------+ | default| emp| false| | default|hive_wordcount| false| +--------+--------------+-----------+
可以看到有两个表,emp
和hive_wordcount
。
验证SQL查询 在hive client里面新建一个dept表 1 2 3 4 5 6 hive> CREATE TABLE dept( deptno INT, dname STRING, loc STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
创建dept数据文件 1 2 3 4 5 6 cat /usr/local/spark/data/dept 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 30 SALES CHICAGO 40 OPERATIONS BOSTON
中间字段间间隔的是tab。
加载数据到dept表里 1 hive> LOAD DATA LOCAL INPATH '/usr/local/spark/data/dept' OVERWRITE INTO TABLE dept;
OVERWRITE
参数的意义是:如果表里面有旧的数据,可以通过OVERWRITE
先清除旧数据。
验证查询 回到spark shell。
1 2 3 4 5 6 7 8 9 scala> spark.sql("SELECT * FROM dept").show +------+----------+--------+ |deptno| dname| loc| +------+----------+--------+ | 10|ACCOUNTING|NEW YORK| | 20| RESEARCH| DALLAS| | 30| SALES| CHICAGO| | 40|OPERATIONS| BOSTON| +------+----------+--------+
如果省略spark.sql 我们发现在执行SQL语句时,我们在都需要写spark.sql(sql query).show
。这样比起hive来,就没那么简便。
其实是可以通过启动spark-sql来达到这个简化目标的。
能spark-sql方式启动spark shell。
1 2 bin/spark-sql --master local[2] \ --jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar
启动后,直接写SQL语句查询
1 spark-sql> SELECT * FROM emp e JOIN dept d ON e.deptno = d.deptno;
将返回如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ... 19/02/12 16:39:50 INFO DAGScheduler: Job 1 finished: processCmd at CliDriver.java:376, took 0.059725 s 7369 SMITH CLERK 7902 NULL 800.0 NULL 20 20 RESEARCH DALLAS 7499 ALLEN SALESMAN 7698 NULL 1600.0 300.0 30 30 SALES CHICAGO 7521 WARD SALESMAN 7698 NULL 1250.0 500.0 30 30 SALES CHICAGO 7566 JONES MANAGER 7839 NULL 2975.0 NULL 20 20 RESEARCH DALLAS 7654 MARTIN SALESMAN 7698 NULL 1250.0 1400.0 30 30 SALES CHICAGO 7698 BLAKE MANAGER 7839 NULL 2850.0 NULL 30 30 SALES CHICAGO 7782 CLARK MANAGER 7839 NULL 2450.0 NULL 10 10 ACCOUNTING NEW YORK 7788 SCOTT ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS 7839 KING PRESIDENT NULL NULL 5000.0 NULL 10 10 ACCOUNTING NEW YORK 7844 TURNER SALESMAN 7698 NULL 1500.0 0.0 30 30 SALES CHICAGO 7876 ADAMS CLERK 7788 NULL 1100.0 NULL 20 20 RESEARCH DALLAS 7900 JAMES CLERK 7698 NULL 950.0 NULL 30 30 SALES CHICAGO 7902 FORD ANALYST 7566 NULL 3000.0 NULL 20 20 RESEARCH DALLAS 7934 MILLER CLERK 7782 NULL 1300.0 NULL 10 10 ACCOUNTING NEW YORK Time taken: 3.793 seconds, Fetched 14 row(s) ...
查看执行计划 创建一个表 1 spark-sql> CREATE TABLE t (key STRING, value STRING);
执行如下SQL语句 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 spark-sql> EXPLAIN EXTENDED SELECT a.key * (2 + 3) FROM t a JOIN t b ON a.key = b.key AND a.key > 3; 19/02/12 16:45:52 INFO SparkSqlParser: Parsing command: EXPLAIN EXTENDED SELECT a.key * (2 + 3 ) FROM t a JOIN t b ON a.key = b.key AND a.key > 3 19 /02 /12 16 :45 :52 INFO HiveMetaStore: 0 : get_table : db=default tbl=t19 /02 /12 16 :45 :52 INFO audit : ugi=simon ip=unknown -ip-addr cmd=get_table : db=default tbl=t19 /02 /12 16 :45 :52 INFO CatalystSqlParser: Parsing command: string 19 /02 /12 16 :45 :52 INFO CatalystSqlParser: Parsing command: string 19 /02 /12 16 :45 :52 INFO HiveMetaStore: 0 : get_table : db=default tbl=t19 /02 /12 16 :45 :52 INFO audit : ugi=simon ip=unknown -ip-addr cmd=get_table : db=default tbl=t19 /02 /12 16 :45 :52 INFO CatalystSqlParser: Parsing command: string 19 /02 /12 16 :45 :52 INFO CatalystSqlParser: Parsing command: string == Parsed Logical Plan == 'Project [unresolvedalias((' a.key * (2 + 3 )), None )]+- 'Join Inner, ((' a.key = 'b.key) && (' a.key > 3 )) :- 'SubqueryAlias a : +- ' UnresolvedRelation `t` +- 'SubqueryAlias b +- ' UnresolvedRelation `t` == Analyzed Logical Plan == (CAST (key AS DOUBLE ) * CAST ((2 + 3 ) AS DOUBLE )): double Project [(cast (key +- Join Inner , ((key :- SubqueryAlias a : +- SubqueryAlias t : +- HiveTableRelation `default` .`t` , org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key +- SubqueryAlias b +- SubqueryAlias t +- HiveTableRelation `default` .`t` , org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key == Optimized Logical Plan == Project [(cast (key +- Join Inner , (key :- Project [key : +- Filter (isnotnull(key : +- HiveTableRelation `default` .`t` , org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key +- Project [key +- Filter (isnotnull(key +- HiveTableRelation `default` .`t` , org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key == Physical Plan == *Project [(cast (key +- *SortMergeJoin [key :- *Sort [key : +- Exchange hashpartitioning(key : +- *Filter (isnotnull(key : +- HiveTableScan [key +- *Sort [key +- ReusedExchange [key Time taken: 0.117 seconds, Fetched 1 row (s)19 /02 /12 16 :45 :52 INFO CliDriver: Time taken: 0.117 seconds, Fetched 1 row (s)
关于执行计划的详细分析,我们会在后面的博客中详细分享。