PySpark实战:DataFrame基本操作
来自CloudWiki
介绍
除了用Spark SQL对DataFrame对象进行操作外,
DataFrame和RDD一样,自身也支持各类数据操作
基本操作
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# #创建自定义函数 strlen = spark.udf.register("strLen", lambda x: len(x)) #创建dataframe a = [('Jack', 32),('Smith', 33),('李四', 36)] rdd = sc.parallelize(a) df = spark.createDataFrame(rdd, "name: string, age: int") #在df上执行select操作,选择特定的数据 df2 = df.select("name").where(strlen("name")>2) df2.show() # +-----+ # | name| # +-----+ # | Jack| # |Smith| # +-----+ #在df上执行agg操作,求字段age的最大值 df2 = df.agg({"age": "max"}) # +--------+ # |max(age)| # +--------+ # | 36| # +--------+ df2.show() #对字段age进行统计描述 df.describe(['age']).show() # +--------+ # |sum(age)| # +--------+ # | 101| # +--------+ #在df上执行agg操作,对字段age进行求和 df.agg({"age": "sum"}).show() ##############################################
输出
+-----+ | name| +-----+ | Jack| |Smith| +-----+ +--------+ |max(age)| +--------+ | 36| +--------+ +-------+------------------+ |summary| age| +-------+------------------+ | count| 3| | mean|33.666666666666664| | stddev|2.0816659994661326| | min| 32| | max| 36| +-------+------------------+ +--------+ |sum(age)| +--------+ | 101| +--------+
读取parquet文件
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# #Path : /src/ch04/users.parquet #创建df a = [('Jack', 32),('Smith', 33),('李四', 36)] rdd = sc.parallelize(a) df = spark.createDataFrame(rdd, "name: string, age: int") #写入parquet格式文件 #myuser.parquet实际上是一个目录 df.write.parquet("myuser.parquet") #从文件中读取数据、创建对象 peopleDf = spark.read.parquet("myuser.parquet") peopleDf.show() # +-----+---+ # | name|age| # +-----+---+ # | Jack| 32| # |Smith| 33| # | 李四| 36| # +-----+---+ #对数据进行过滤 print("age>32:") peopleDf.filter(peopleDf.age > 32).show() # +-----+---+ # | name|age| # +-----+---+ # |Smith| 33| # | 李四| 36| # +-----+---+ ##############################################
输出
+-----+---+ | name|age| +-----+---+ | Jack| 32| |Smith| 33| | 李四| 36| +-----+---+ age>32: +-----+---+ | name|age| +-----+---+ |Smith| 33| | 李四| 36| +-----+---+
DataFrame关联查询
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# #创建第一个对象peopleDf a = [ ('01','张三', '男',32,5000), ('01','李四', '男',33,6000), ('01','王五', '女',38,5500), ('02','Jack', '男',42,7000), ('02','Smith', '女',27,6500), ('02','Lily', '女',45,9500) ] rdd = sc.parallelize(a) peopleDf = spark.createDataFrame(rdd,\ "deptId:string,name:string,gender:string,age:int,salary:int") #peopleDf.show() #创建第二个对象deptDf b = [ ('01','销售部'), ('02','研发部') ] rdd2 = sc.parallelize(b) deptDf = spark.createDataFrame(rdd2, "id:string,name:string") #deptDf.show() #对对象peopleDf和deptDf进行关联查询 #join函数第三个参数默认为inner,其他选项为: # inner, cross, outer, full, full_outer, left, left_outer, # right, right_outer, left_semi, and left_anti. peopleDf.join(deptDf, peopleDf.deptId == deptDf.id,'inner') \ .groupBy(deptDf.name, "gender") \ .agg({"salary": "avg", "age": "max"}) \ .show() #groupBy:分组操作 #agg: 对salary求求平均值,对age求最大值 # +------+------+-----------+--------+ # | name|gender|avg(salary)|max(age)| # +------+------+-----------+--------+ # |研发部| 男| 7000.0| 42| # |销售部| 男| 5500.0| 33| # |销售部| 女| 5500.0| 38| # |研发部| 女| 8000.0| 45| # +------+------+-----------+--------+ ##############################################
输出
+------+------+-----------+--------+ | name|gender|avg(salary)|max(age)| +------+------+-----------+--------+ |研发部| 男| 7000.0| 42| |销售部| 男| 5500.0| 33| |销售部| 女| 5500.0| 38| |研发部| 女| 8000.0| 45| +------+------+-----------+--------+