0%

Spark DataFrame和Dataset API基本操作

DataFrames API基本操作

启动Spark

需要加载MySQL JDBC驱动。

1
2
bin/spark-shell --master local[2] \
--jars ~/.m2/repository/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

加载json文件

1
2
3
scala>
val jsonPath = "/usr/local/spark/spark-2.2.3-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json"
val peopleDF = spark.read.format("json").load(jsonPath)

API操作

  • 输出dataframe对应的schema信息
1
2
3
4
scala> peopleDF.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
  • 输出数据集前20条记录
1
2
3
4
5
6
7
8
scala> peopleDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
  • 查询某列所有的数据

类似如下SQL语句:

1
SELECT name FROM people;
1
2
3
4
5
6
7
8
scala> peopleDF.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
  • FILTER查询

SQL的WHERE子句查询,类似如下SQL语句:

1
SELECT * FROM people WHERE age > 19;
1
2
3
4
5
6
peopleDF.filter("age > 19").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
  • 复杂查询

类似如下SQL语句:

1
SELECT name, age + 10 as new_age FROM people;
1
2
3
4
5
6
7
8
scala> peopleDF.select(peopleDF("name"), (peopleDF("age") + 10).as("new_age")).show
+-------+-------+
| name|new_age|
+-------+-------+
|Michael| null|
| Andy| 40|
| Justin| 29|
+-------+-------+
  • 聚合

统计不同年龄的人数。类似如下SQL语句:

1
SELECT age, count(1) FROM people GROUP BY age;
1
2
3
4
5
6
7
8
scala> peopleDF.groupBy("age").count().show
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
  • 应用MySQL的函数
1
2
3
4
5
6
scala> peopleDF.filter("SUBSTRING(name, 0, 1) = 'A'").show
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

我们可以查看MySQL自带了哪些函数

show(rowNums, truncated)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> spark.sql("show functions").show(30, false)
+---------------------+
|function |
+---------------------+
|! |
|% |
|& |
|* |
|+ |
|- |
|/ |
|< |
|<= |
|<=> |
|= |
...
  • 排序
1
2
3
4
5
6
7
8
scala> peopleDF.sort(peopleDF("name").desc).show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 19| Justin|
| 30| Andy|
+----+-------+
  • JOIN
1
2
3
4
5
6
7
8
9
10
scala> 
val df2 = spark.read.format("json").load(jsonPath)
peopleDF.join(df2, peopleDF("name") === df2("name")).show
+----+-------+----+-------+
| age| name| age| name|
+----+-------+----+-------+
|null|Michael|null|Michael|
| 30| Andy| 30| Andy|
| 19| Justin| 19| Justin|
+----+-------+----+-------+
  • 其他
1
take, head(3), first

RDD转Dataframes

Inferring the Schema Using Reflection

通过反射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

Programmatically Specifying the Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

Dataframs转Dataset

1
2
3
4
5
case class Person(name: String, age: Long)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()