PySpark实战:DataFrame基本操作

来自CloudWiki
跳转至: 导航搜索

介绍

除了用Spark SQL对DataFrame对象进行操作外,

DataFrame和RDD一样,自身也支持各类数据操作

基本操作

代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
#创建自定义函数
strlen = spark.udf.register("strLen", lambda x: len(x))

#创建dataframe
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataFrame(rdd, "name: string, age: int")

#在df上执行select操作,选择特定的数据
df2 = df.select("name").where(strlen("name")>2)
df2.show()
# +-----+
# | name|
# +-----+
# | Jack|
# |Smith|
# +-----+

#在df上执行agg操作,求字段age的最大值
df2 = df.agg({"age": "max"})
# +--------+
# |max(age)|
# +--------+
# |      36|
# +--------+
df2.show()

#对字段age进行统计描述
df.describe(['age']).show()
# +--------+
# |sum(age)|
# +--------+
# |     101|
# +--------+

#在df上执行agg操作,对字段age进行求和
df.agg({"age": "sum"}).show()
##############################################

输出

+-----+
| name|
+-----+
| Jack|
|Smith|
+-----+

+--------+
|max(age)|
+--------+
|      36|
+--------+

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 3|
|   mean|33.666666666666664|
| stddev|2.0816659994661326|
|    min|                32|
|    max|                36|
+-------+------------------+

+--------+
|sum(age)|
+--------+
|     101|
+--------+

读取parquet文件

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
#Path : /src/ch04/users.parquet
#创建df
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataFrame(rdd, "name: string, age: int")

#写入parquet格式文件
#myuser.parquet实际上是一个目录
df.write.parquet("myuser.parquet")

#从文件中读取数据、创建对象
peopleDf = spark.read.parquet("myuser.parquet")
peopleDf.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# | 李四| 36|
# +-----+---+

#对数据进行过滤
print("age>32:")
peopleDf.filter(peopleDf.age > 32).show()
# +-----+---+
# | name|age|
# +-----+---+
# |Smith| 33|
# | 李四| 36|
# +-----+---+
##############################################

输出

+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
| 李四| 36|
+-----+---+

age>32:
+-----+---+
| name|age|
+-----+---+
|Smith| 33|
| 李四| 36|
+-----+---+

DataFrame关联查询

代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################

#创建第一个对象peopleDf
a = [   ('01','张三', '男',32,5000),
        ('01','李四', '男',33,6000),
        ('01','王五', '女',38,5500),
        ('02','Jack', '男',42,7000),
        ('02','Smith', '女',27,6500),
        ('02','Lily', '女',45,9500)
]
rdd = sc.parallelize(a)
peopleDf = spark.createDataFrame(rdd,\
   "deptId:string,name:string,gender:string,age:int,salary:int")
#peopleDf.show()

#创建第二个对象deptDf
b = [   ('01','销售部'),
        ('02','研发部')
]
rdd2 = sc.parallelize(b)
deptDf = spark.createDataFrame(rdd2, "id:string,name:string")
#deptDf.show()

#对对象peopleDf和deptDf进行关联查询
#join函数第三个参数默认为inner,其他选项为:
# inner, cross, outer, full, full_outer, left, left_outer, 
# right, right_outer, left_semi, and left_anti.
peopleDf.join(deptDf, peopleDf.deptId == deptDf.id,'inner') \
  .groupBy(deptDf.name, "gender") \
  .agg({"salary": "avg", "age": "max"}) \
  .show()
#groupBy:分组操作
#agg: 对salary求求平均值,对age求最大值
# +------+------+-----------+--------+
# |  name|gender|avg(salary)|max(age)|
# +------+------+-----------+--------+
# |研发部|    男|     7000.0|      42|
# |销售部|    男|     5500.0|      33|
# |销售部|    女|     5500.0|      38|
# |研发部|    女|     8000.0|      45|
# +------+------+-----------+--------+
##############################################


输出

+------+------+-----------+--------+
|  name|gender|avg(salary)|max(age)|
+------+------+-----------+--------+
|研发部|    男|     7000.0|      42|
|销售部|    男|     5500.0|      33|
|销售部|    女|     5500.0|      38|
|研发部|    女|     8000.0|      45|
+------+------+-----------+--------+