PySpark实战:选择、筛选与聚合
来自CloudWiki
介绍
观察数据资料的目的,
是根据需求对特定数据进行预处理,比如转换数据类型、删除、添加一列等,
在这个过程中,涉及数据的选择、筛选、过滤和聚合
代码
转换类型、增删列
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import from_unixtime, to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); ############################################# #相对路径,文件包含标题行 df = spark.read.csv('./ml-small/ratings.csv',header=True) #转换rating类型 df = df.withColumn("rating", df.rating.cast("double")) #新增一列date df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd')) df = df.withColumn("date",df.date.cast("date")) #删除timestamp列 df = df.drop("timestamp")
联接、选择和自定义列
df2 = spark.read.csv('./ml-small/movies.csv',header=True) #'movieId'不明确,需要用df.movieId df3 = df.join(df2, df.movieId == df2.movieId,"inner") \ .select("userId",df.movieId,"title","date","rating") from pyspark.sql.functions import udf #定义普通的python函数 def isLike(v): if v > 4: return True else: return False #创建udf函数 #udf 即user defined function,用户自定义函数 from pyspark.sql.types import BooleanType udf_isLike=udf(isLike,BooleanType())#user defined function,用户自定义函数,返回类型为BooleanType() df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))#新增一列,isLike df3.show()
聚合、选择和筛选
from pyspark.sql.functions import pandas_udf, PandasUDFType #定义pandas udf函数,用于GroupedData @pandas_udf("string", PandasUDFType.GROUPED_AGG) def fmerge(v): return ','.join(v) df5 = spark.read.csv('./ml-small/tags.csv',header=True) df5 = df5.drop("timestamp") #groupBy聚合 df7 = df5.groupBy(["userId","movieId"]).agg(fmerge(df5["tag"]))#调用fmerge函数对tag字段进行聚合 df7 = df7.withColumnRenamed("fmerge(tag)","tags") #select选择 df6 = df3.join(df7,(df3.movieId == df7.movieId) & (df3.userId == df7.userId))\ .select(df3.userId,df3.movieId,"title","date","tags","rating","isLike") \ .orderBy(["date"], ascending=[0]) #filter筛选 df6 = df6.filter(df.date>'2015-10-25') df6.show(20) #df6.show(20,False) ############################################## spark.stop()
完整代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import from_unixtime, to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); ############################################# #相对路径,文件包含标题行 df = spark.read.csv('./ml-small/ratings.csv',header=True) #转换rating类型 df = df.withColumn("rating", df.rating.cast("double")) #新增一列date df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd')) df = df.withColumn("date",df.date.cast("date")) #删除timestamp列 df = df.drop("timestamp") ############################################ df2 = spark.read.csv('./ml-small/movies.csv',header=True) #'movieId'不明确,需要用df.movieId df3 = df.join(df2, df.movieId == df2.movieId,"inner") \ .select("userId",df.movieId,"title","date","rating") from pyspark.sql.functions import udf #定义普通的python函数 def isLike(v): if v > 4: return True else: return False #创建udf函数 #udf 即user defined function,用户自定义函数 from pyspark.sql.types import BooleanType udf_isLike=udf(isLike,BooleanType())#user defined function,用户自定义函数,返回类型为BooleanType() df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))#新增一列,isLike df3.show(20) ############################################ from pyspark.sql.functions import pandas_udf, PandasUDFType #定义pandas udf函数,用于GroupedData @pandas_udf("string", PandasUDFType.GROUPED_AGG) def fmerge(v): return ','.join(v) df5 = spark.read.csv('./ml-small/tags.csv',header=True) df5 = df5.drop("timestamp") #groupBy聚合 df7 = df5.groupBy(["userId","movieId"]).agg(fmerge(df5["tag"]))#调用fmerge函数对tag字段进行聚合 df7 = df7.withColumnRenamed("fmerge(tag)","tags") #select选择 df6 = df3.join(df7,(df3.movieId == df7.movieId) & (df3.userId == df7.userId))\ .select(df3.userId,df3.movieId,"title","date","tags","rating","isLike") \ .orderBy(["date"], ascending=[0]) #filter筛选 df6 = df6.filter(df.date>'2015-10-25') df6.show(20) #df6.show(20,False) ############################################## spark.stop()
输出
+------+-------+------+----------+ |userId|movieId|rating| date| +------+-------+------+----------+ | 1| 1| 4.0|2000-07-31| | 1| 3| 4.0|2000-07-31| | 1| 6| 4.0|2000-07-31| | 1| 47| 5.0|2000-07-31| | 1| 50| 5.0|2000-07-31| | 1| 70| 3.0|2000-07-31| | 1| 101| 5.0|2000-07-31| | 1| 110| 4.0|2000-07-31| | 1| 151| 5.0|2000-07-31| | 1| 157| 5.0|2000-07-31| | 1| 163| 5.0|2000-07-31| | 1| 216| 5.0|2000-07-31| | 1| 223| 3.0|2000-07-31| | 1| 231| 5.0|2000-07-31| | 1| 235| 4.0|2000-07-31| | 1| 260| 5.0|2000-07-31| | 1| 296| 3.0|2000-07-31| | 1| 316| 3.0|2000-07-31| | 1| 333| 5.0|2000-07-31| | 1| 349| 4.0|2000-07-31| +------+-------+------+----------+ only showing top 20 rows [Stage 4:> (0 + 1) / 1]Traceback (most recent call last): File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 170, in manager File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 73, in worker File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 402, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int raise EOFError EOFError +------+-------+--------------------+----------+------+------+ |userId|movieId| title| date|rating|isLike| +------+-------+--------------------+----------+------+------+ | 1| 1| Toy Story (1995)|2000-07-31| 4.0| false| | 1| 3|Grumpier Old Men ...|2000-07-31| 4.0| false| | 1| 6| Heat (1995)|2000-07-31| 4.0| false| | 1| 47|Seven (a.k.a. Se7...|2000-07-31| 5.0| true| | 1| 50|Usual Suspects, T...|2000-07-31| 5.0| true| | 1| 70|From Dusk Till Da...|2000-07-31| 3.0| false| | 1| 101|Bottle Rocket (1996)|2000-07-31| 5.0| true| | 1| 110| Braveheart (1995)|2000-07-31| 4.0| false| | 1| 151| Rob Roy (1995)|2000-07-31| 5.0| true| | 1| 157|Canadian Bacon (1...|2000-07-31| 5.0| true| | 1| 163| Desperado (1995)|2000-07-31| 5.0| true| | 1| 216|Billy Madison (1995)|2000-07-31| 5.0| true| | 1| 223| Clerks (1994)|2000-07-31| 3.0| false| | 1| 231|Dumb & Dumber (Du...|2000-07-31| 5.0| true| | 1| 235| Ed Wood (1994)|2000-07-31| 4.0| false| | 1| 260|Star Wars: Episod...|2000-07-31| 5.0| true| | 1| 296| Pulp Fiction (1994)|2000-07-31| 3.0| false| | 1| 316| Stargate (1994)|2000-07-31| 3.0| false| | 1| 333| Tommy Boy (1995)|2000-07-31| 5.0| true| | 1| 349|Clear and Present...|2000-07-31| 4.0| false| +------+-------+--------------------+----------+------+------+ only showing top 20 rows