DataFrames API基本操作
启动Spark
需要加载MySQL JDBC驱动。
1 | bin/spark-shell --master local[2] \ |
加载json文件
1 | scala> |
API操作
- 输出dataframe对应的schema信息
1 | scala> peopleDF.printSchema |
- 输出数据集前20条记录
1 | scala> peopleDF.show |
- 查询某列所有的数据
类似如下SQL语句:
1 | SELECT name FROM people; |
1 | scala> peopleDF.select("name").show |
- FILTER查询
SQL的WHERE子句查询,类似如下SQL语句:
1 | SELECT * FROM people WHERE age > 19; |
1 | peopleDF.filter("age > 19").show |
- 复杂查询
类似如下SQL语句:
1 | SELECT name, age + 10 as new_age FROM people; |
1 | scala> peopleDF.select(peopleDF("name"), (peopleDF("age") + 10).as("new_age")).show |
- 聚合
统计不同年龄的人数。类似如下SQL语句:
1 | SELECT age, count(1) FROM people GROUP BY age; |
1 | scala> peopleDF.groupBy("age").count().show |
- 应用MySQL的函数
1 | scala> peopleDF.filter("SUBSTRING(name, 0, 1) = 'A'").show |
我们可以查看MySQL自带了哪些函数
show(rowNums, truncated)
1 | scala> spark.sql("show functions").show(30, false) |
- 排序
1 | scala> peopleDF.sort(peopleDF("name").desc).show |
- JOIN
1 | scala> |
- 其他
1 | take, head(3), first |
RDD转Dataframes
Inferring the Schema Using Reflection
通过反射
1 | // For implicit conversions from RDDs to DataFrames |
Programmatically Specifying the Schema
1 | // Create an RDD |
Dataframs转Dataset
1 | case class Person(name: String, age: Long) |