0%

ctrl键组合

快捷键 功能
ctrl+a 光标移到行首
ctrl+b 光标左移一个字母
ctrl+c 杀死当前进程
ctrl+d 退出当前 Shell
ctrl+e 光标移到行尾
ctrl+h 删除光标前一个字符,同 backspace 键相同
ctrl+k 清除光标后至行尾的内容
ctrl+l 清屏,相当于clear
ctrl+r 搜索之前打过的命令会有一个提示,根据你输入的关键字进行搜索bash的history
ctrl+u 清除光标前至行首间的所有内容
ctrl+w 移除光标前的一个单词
ctrl+t 交换光标位置前的两个字符
ctrl+y 粘贴或者恢复上次的删除
ctrl+d 删除光标所在字母;注意和backspace以及ctrl+h的区别,这2个是删除光标前的字符
ctrl+f 光标右移
ctrl+z 把当前进程转到后台运行,使用fg命令恢复比如top -d1 然后ctrl+z,到后台,然后fg,重新恢复

esc组合

快捷键 功能
esc+d 删除光标后的一个词
esc+f 往右跳一个词
esc+b 往左跳一个词
esc+t 交换光标位置前的两个单词

DataFrames API基本操作

启动Spark

需要加载MySQL JDBC驱动。

1
2
bin/spark-shell --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

加载json文件

1
2
3
scala>
val jsonPath = "/usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json"
val peopleDF = spark.read.format("json").load(jsonPath)

API操作

  • 输出dataframe对应的schema信息
1
2
3
4
scala> peopleDF.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
  • 输出数据集前20条记录
1
2
3
4
5
6
7
8
scala> peopleDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
  • 查询某列所有的数据

类似如下SQL语句:

1
SELECT name FROM people;
1
2
3
4
5
6
7
8
scala> peopleDF.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
  • FILTER查询

SQL的WHERE子句查询,类似如下SQL语句:

1
SELECT * FROM people WHERE age > 19;
1
2
3
4
5
6
peopleDF.filter("age > 19").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
  • 复杂查询

类似如下SQL语句:

1
SELECT name, age + 10 as new_age FROM people;
1
2
3
4
5
6
7
8
scala> peopleDF.select(peopleDF("name"), (peopleDF("age") + 10).as("new_age")).show
+-------+-------+
| name|new_age|
+-------+-------+
|Michael| null|
| Andy| 40|
| Justin| 29|
+-------+-------+
  • 聚合

统计不同年龄的人数。类似如下SQL语句:

1
SELECT age, count(1) FROM people GROUP BY age;
1
2
3
4
5
6
7
8
scala> peopleDF.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
  • 应用MySQL的函数
1
2
3
4
5
6
scala> peopleDF.filter("SUBSTRING(name, 0, 1) = 'A'").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

我们可以查看MySQL自带了哪些函数

show(rowNums, truncated)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> spark.sql("show functions").show(30, false)
+---------------------+
|function |
+---------------------+
|! |
|% |
|& |
|* |
|+ |
|- |
|/ |
|< |
|<= |
|<=> |
|= |
...
  • 排序
1
2
3
4
5
6
7
8
scala> peopleDF.sort(peopleDF("name").desc).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 19| Justin|
| 30| Andy|
+----+-------+
  • JOIN
1
2
3
4
5
6
7
8
9
10
scala> 
val df2 = spark.read.format("json").load(jsonPath)
peopleDF.join(df2, peopleDF("name") === df2("name")).show
+----+-------+----+-------+
| age| name| age| name|
+----+-------+----+-------+
|null|Michael|null|Michael|
| 30| Andy| 30| Andy|
| 19| Justin| 19| Justin|
+----+-------+----+-------+
  • 其他
1
take, head(3), first

RDD转Dataframes

Inferring the Schema Using Reflection

通过反射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

Programmatically Specifying the Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

Dataframs转Dataset

1
2
3
4
5
case class Person(name: String, age: Long)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

我们已经有了spark-shellspark-sql,为什么还要使用Thriftserver呢?

  1. 每一个spark-shell / spark-sql都是一个Spark Application。

  2. Thriftserver不管启动多少个客户端(beeline / code),都只会产生一个Spark Application

启动thriftserver

需要把MySQL JDBC驱动通过jars参数加入进来。

1
2
3
4
cd $SPARK_HOME/

sbin/start-thriftserver.sh --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

查看进程

1
2
3
4
5
6
7
8
jps -m

21522 SparkSubmit --master local[2] --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
--name Thrift JDBC/ODBC Server
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar spark-internal
3459 Worker --webui-port 8081 spark://localhost:7077
18420 SparkSubmit --master local[2] --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
--jars /Users/simon/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar spark-internal

我们看到有两个SparkSubmit进程,21522那个进程就是刚才启动的那个,class是HiveThriftServer2

以spark-shell方式启动的class是SparkSQLCLIDriver

thriftserver端口

默认端口为10000,可以修改为其他端口。

在启动时增加hiveconf参数,比如把端口号设置为14000

1
--hiveconf hive.server2.thrift.port = 14000

beeline访问thriftserver

需要设置jdbc uri: jdbc:hive2://localhost:10000

1
2
3
4
5
6
7
8
9
10
bin/beeline -u jdbc:hive2://localhost:10000 -n simon
Connecting to jdbc:hive2://localhost:10000
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to: Spark SQL (version 2.2.3)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
0: jdbc:hive2://localhost:10000>

查看emp表的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
0: jdbc:hive2://localhost:10000> SELECT * FROM emp;
+--------+---------+------------+-------+-----------+---------+---------+---------+--+
| empno | ename | job | mgr | hiredate | sal | comm | deptno |
+--------+---------+------------+-------+-----------+---------+---------+---------+--+
| 7369 | SMITH | CLERK | 7902 | NULL | 800.0 | NULL | 20 |
| 7499 | ALLEN | SALESMAN | 7698 | NULL | 1600.0 | 300.0 | 30 |
| 7521 | WARD | SALESMAN | 7698 | NULL | 1250.0 | 500.0 | 30 |
| 7566 | JONES | MANAGER | 7839 | NULL | 2975.0 | NULL | 20 |
| 7654 | MARTIN | SALESMAN | 7698 | NULL | 1250.0 | 1400.0 | 30 |
| 7698 | BLAKE | MANAGER | 7839 | NULL | 2850.0 | NULL | 30 |
| 7782 | CLARK | MANAGER | 7839 | NULL | 2450.0 | NULL | 10 |
| 7788 | SCOTT | ANALYST | 7566 | NULL | 3000.0 | NULL | 20 |
| 7839 | KING | PRESIDENT | NULL | NULL | 5000.0 | NULL | 10 |
| 7844 | TURNER | SALESMAN | 7698 | NULL | 1500.0 | 0.0 | 30 |
| 7876 | ADAMS | CLERK | 7788 | NULL | 1100.0 | NULL | 20 |
| 7900 | JAMES | CLERK | 7698 | NULL | 950.0 | NULL | 30 |
| 7902 | FORD | ANALYST | 7566 | NULL | 3000.0 | NULL | 20 |
| 7934 | MILLER | CLERK | 7782 | NULL | 1300.0 | NULL | 10 |
+--------+---------+------------+-------+-----------+---------+---------+---------+--+
14 rows selected (0.411 seconds)

通过程序访问thriftserver

添加依赖

1
2
3
4
5
6
<!-- Thriftserver 支持 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>

SparkSQLThriftServerApp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package gy.finolo.spark

import java.sql.DriverManager


object SparkSQLThriftServerApp {

def main(args: Array[String]): Unit = {

// 1. 加载驱动
Class.forName("org.apache.hive.jdbc.HiveDriver")

// 2. 获取连接
val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000", "root", "")
val ps = conn.prepareStatement("SELECT empno, ename, sal FROM emp")
val rs = ps.executeQuery()

while (rs.next()) {
println("empno: " + rs.getInt("empno") +
", ename: " + rs.getString("ename") +
", salary: " + rs.getDouble("sal"))
}

rs.close()
ps.close()
conn.close()
}

}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
empno: 7369, ename: SMITH, salary: 800.0
empno: 7499, ename: ALLEN, salary: 1600.0
empno: 7521, ename: WARD, salary: 1250.0
empno: 7566, ename: JONES, salary: 2975.0
empno: 7654, ename: MARTIN, salary: 1250.0
empno: 7698, ename: BLAKE, salary: 2850.0
empno: 7782, ename: CLARK, salary: 2450.0
empno: 7788, ename: SCOTT, salary: 3000.0
empno: 7839, ename: KING, salary: 5000.0
empno: 7844, ename: TURNER, salary: 1500.0
empno: 7876, ename: ADAMS, salary: 1100.0
empno: 7900, ename: JAMES, salary: 950.0
empno: 7902, ename: FORD, salary: 3000.0
empno: 7934, ename: MILLER, salary: 1300.0

注意

  1. 通过程序访问thriftserver前,得保证thriftserver已经启动,不然运行程序时,会遇到如下错误:
1
2
Exception in thread "main" java.sql.SQLException: Could not open client transport with JDBC Uri: 
jdbc:hive2://localhost:10000: java.net.ConnectException: Connection refused (Connection refused)
  1. 我安装的Hive版本是hive-1.1.0-cdh5.7.0,但我在pom.xml里依赖的jar版本不对时,可能遇到如下异常。
1
2
3
4
Exception in thread "main" java.sql.SQLException: Could not open client transport with JDBC Uri: 
jdbc:hive2://localhost:10000: Could not establish connection to jdbc:hive2://localhost:10000:
Required field 'client_protocol' is unset! Struct:TOpenSessionReq(client_protocol:null,
configuration:{use:database=default})