PySpark实战:选择、筛选与聚合

来自CloudWiki
跳转至: 导航搜索

介绍

观察数据资料的目的,

是根据需求对特定数据进行预处理,比如转换数据类型、删除、添加一列等,

在这个过程中,涉及数据的选择、筛选、过滤和聚合

代码

转换类型、增删列

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import from_unixtime, to_timestamp
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ETL") \
        .getOrCreate();
#############################################
#相对路径,文件包含标题行
df = spark.read.csv('./ml-small/ratings.csv',header=True)
#转换rating类型
df = df.withColumn("rating", df.rating.cast("double"))
#新增一列date
df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd'))
df = df.withColumn("date",df.date.cast("date"))
#删除timestamp列
df = df.drop("timestamp")

联接、选择和自定义列

df2 = spark.read.csv('./ml-small/movies.csv',header=True)
#'movieId'不明确,需要用df.movieId
df3 = df.join(df2, df.movieId == df2.movieId,"inner") \
        .select("userId",df.movieId,"title","date","rating")
from pyspark.sql.functions import udf
#定义普通的python函数
def isLike(v):
    if v > 4:
        return True
    else:
        return False
#创建udf函数
#udf 即user defined function,用户自定义函数
from pyspark.sql.types import BooleanType
udf_isLike=udf(isLike,BooleanType())#user defined function,用户自定义函数,返回类型为BooleanType()
df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))#新增一列,isLike
df3.show()

聚合、选择和筛选

from pyspark.sql.functions import pandas_udf, PandasUDFType

#定义pandas udf函数,用于GroupedData
@pandas_udf("string", PandasUDFType.GROUPED_AGG) 
def fmerge(v):
    return ','.join(v)

df5 = spark.read.csv('./ml-small/tags.csv',header=True)
df5 = df5.drop("timestamp")
#groupBy聚合
df7 = df5.groupBy(["userId","movieId"]).agg(fmerge(df5["tag"]))#调用fmerge函数对tag字段进行聚合
df7 = df7.withColumnRenamed("fmerge(tag)","tags")
#select选择
df6 = df3.join(df7,(df3.movieId == df7.movieId) & (df3.userId == df7.userId))\
        .select(df3.userId,df3.movieId,"title","date","tags","rating","isLike") \
        .orderBy(["date"], ascending=[0])
#filter筛选
df6 = df6.filter(df.date>'2015-10-25')
df6.show(20)
#df6.show(20,False)
##############################################
spark.stop()

完整代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import from_unixtime, to_timestamp
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ETL") \
        .getOrCreate();
#############################################
#相对路径,文件包含标题行
df = spark.read.csv('./ml-small/ratings.csv',header=True)
#转换rating类型
df = df.withColumn("rating", df.rating.cast("double"))
#新增一列date
df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd'))
df = df.withColumn("date",df.date.cast("date"))
#删除timestamp列
df = df.drop("timestamp")
############################################
df2 = spark.read.csv('./ml-small/movies.csv',header=True)
#'movieId'不明确,需要用df.movieId
df3 = df.join(df2, df.movieId == df2.movieId,"inner") \
        .select("userId",df.movieId,"title","date","rating")
from pyspark.sql.functions import udf
#定义普通的python函数
def isLike(v):
    if v > 4:
        return True
    else:
        return False
#创建udf函数
#udf 即user defined function,用户自定义函数
from pyspark.sql.types import BooleanType
udf_isLike=udf(isLike,BooleanType())#user defined function,用户自定义函数,返回类型为BooleanType()
df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))#新增一列,isLike
df3.show(20)
############################################
from pyspark.sql.functions import pandas_udf, PandasUDFType

#定义pandas udf函数,用于GroupedData
@pandas_udf("string", PandasUDFType.GROUPED_AGG) 
def fmerge(v):
    return ','.join(v)

df5 = spark.read.csv('./ml-small/tags.csv',header=True)
df5 = df5.drop("timestamp")
#groupBy聚合
df7 = df5.groupBy(["userId","movieId"]).agg(fmerge(df5["tag"]))#调用fmerge函数对tag字段进行聚合
df7 = df7.withColumnRenamed("fmerge(tag)","tags")
#select选择
df6 = df3.join(df7,(df3.movieId == df7.movieId) & (df3.userId == df7.userId))\
        .select(df3.userId,df3.movieId,"title","date","tags","rating","isLike") \
        .orderBy(["date"], ascending=[0])
#filter筛选
df6 = df6.filter(df.date>'2015-10-25')
df6.show(20)
#df6.show(20,False)
##############################################
spark.stop()

输出

+------+-------+------+----------+
|userId|movieId|rating|      date|
+------+-------+------+----------+
|     1|      1|   4.0|2000-07-31|
|     1|      3|   4.0|2000-07-31|
|     1|      6|   4.0|2000-07-31|
|     1|     47|   5.0|2000-07-31|
|     1|     50|   5.0|2000-07-31|
|     1|     70|   3.0|2000-07-31|
|     1|    101|   5.0|2000-07-31|
|     1|    110|   4.0|2000-07-31|
|     1|    151|   5.0|2000-07-31|
|     1|    157|   5.0|2000-07-31|
|     1|    163|   5.0|2000-07-31|
|     1|    216|   5.0|2000-07-31|
|     1|    223|   3.0|2000-07-31|
|     1|    231|   5.0|2000-07-31|
|     1|    235|   4.0|2000-07-31|
|     1|    260|   5.0|2000-07-31|
|     1|    296|   3.0|2000-07-31|
|     1|    316|   3.0|2000-07-31|
|     1|    333|   5.0|2000-07-31|
|     1|    349|   4.0|2000-07-31|
+------+-------+------+----------+
only showing top 20 rows

[Stage 4:>                                                          (0 + 1) / 1]Traceback (most recent call last):
  File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 170, in manager
  File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/daemon.py", line 73, in worker
  File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 402, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/root/wmtools/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError
+------+-------+--------------------+----------+------+------+
|userId|movieId|               title|      date|rating|isLike|
+------+-------+--------------------+----------+------+------+
|     1|      1|    Toy Story (1995)|2000-07-31|   4.0| false|
|     1|      3|Grumpier Old Men ...|2000-07-31|   4.0| false|
|     1|      6|         Heat (1995)|2000-07-31|   4.0| false|
|     1|     47|Seven (a.k.a. Se7...|2000-07-31|   5.0|  true|
|     1|     50|Usual Suspects, T...|2000-07-31|   5.0|  true|
|     1|     70|From Dusk Till Da...|2000-07-31|   3.0| false|
|     1|    101|Bottle Rocket (1996)|2000-07-31|   5.0|  true|
|     1|    110|   Braveheart (1995)|2000-07-31|   4.0| false|
|     1|    151|      Rob Roy (1995)|2000-07-31|   5.0|  true|
|     1|    157|Canadian Bacon (1...|2000-07-31|   5.0|  true|
|     1|    163|    Desperado (1995)|2000-07-31|   5.0|  true|
|     1|    216|Billy Madison (1995)|2000-07-31|   5.0|  true|
|     1|    223|       Clerks (1994)|2000-07-31|   3.0| false|
|     1|    231|Dumb & Dumber (Du...|2000-07-31|   5.0|  true|
|     1|    235|      Ed Wood (1994)|2000-07-31|   4.0| false|
|     1|    260|Star Wars: Episod...|2000-07-31|   5.0|  true|
|     1|    296| Pulp Fiction (1994)|2000-07-31|   3.0| false|
|     1|    316|     Stargate (1994)|2000-07-31|   3.0| false|
|     1|    333|    Tommy Boy (1995)|2000-07-31|   5.0|  true|
|     1|    349|Clear and Present...|2000-07-31|   4.0| false|
+------+-------+--------------------+----------+------+------+
only showing top 20 rows