PySpark实战:存储数据

来自CloudWiki
跳转至: 导航搜索

介绍

数据经过观察,并根据业务需求进行数据类型转换、数据选择、筛选和聚合等处理后,需要将处理后的结果进行存储。

代码

 
import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import from_unixtime, to_timestamp
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ETL") \
        .getOrCreate();
#############################################
df = spark.read.csv('./ml-small/ratings.csv',header=True).cache()
df = df.withColumn("rating", df.rating.cast("double"))
df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd'))
df = df.withColumn("date",df.date.cast("date"))
df = df.drop("timestamp")
df2 = spark.read.csv('./ml-small/movies.csv',header=True).cache()
df3 = df.join(df2, df.movieId == df2.movieId,"inner") \
        .select("userId",df.movieId,"title","date","rating").cache()
from pyspark.sql.functions import udf
def isLike(v):
    if v > 4:
        return True
    else:
        return False
from pyspark.sql.types import BooleanType
udf_isLike=udf(isLike,BooleanType())
df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))
df3.show(20)
#存储数据
df3.rdd.coalesce(1).saveAsTextFile("movie-out")
#存储数据CSV格式
df3.coalesce(1).write.format("csv") \
        .option("header","true").save("movie-out-csv")
#parquet格式
df3.write.format('parquet').save("movie-out-parquet")
#json格式
df3.coalesce(1).write.format("json") \
        .save("movie-out-json")
##############################################
spark.stop()

  • 注:saveAsTextFile在进行数据存储时,如果目录存在,则会报错。另外,对于实际项目来说,数据量可能非常打,那么存储需要放到Hadoop

HDFS系统上。

输出

ls m*

[root@localhost wmsoft]# ls m*
matplot01.py

ml-small:
links.csv  movies.csv  ratings.csv  README.txt  tags.csv

movie-out:
part-00000  _SUCCESS

movie-out-csv:
part-00000-6a5a4ab5-9046-475b-8c11-abf200742716-c000.csv  _SUCCESS

movie-out-json:
part-00000-ef441383-e81a-4d82-a24c-d370c97cc4a1-c000.json  _SUCCESS

movie-out-parquet:
part-00000-7c719014-2633-4629-afad-c52a84175c17-c000.snappy.parquet  _SUCCESS

myuser.parquet:
part-00000-068c6bf0-8b3f-4a83-82f5-42d6ee9858d0-c000.snappy.parquet  _SUCCESS