PySpark实战:存储数据
来自CloudWiki
介绍
数据经过观察,并根据业务需求进行数据类型转换、数据选择、筛选和聚合等处理后,需要将处理后的结果进行存储。
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import from_unixtime, to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); ############################################# df = spark.read.csv('./ml-small/ratings.csv',header=True).cache() df = df.withColumn("rating", df.rating.cast("double")) df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd')) df = df.withColumn("date",df.date.cast("date")) df = df.drop("timestamp") df2 = spark.read.csv('./ml-small/movies.csv',header=True).cache() df3 = df.join(df2, df.movieId == df2.movieId,"inner") \ .select("userId",df.movieId,"title","date","rating").cache() from pyspark.sql.functions import udf def isLike(v): if v > 4: return True else: return False from pyspark.sql.types import BooleanType udf_isLike=udf(isLike,BooleanType()) df3 = df3.withColumn("isLike",udf_isLike(df3["rating"])) df3.show(20) #存储数据 df3.rdd.coalesce(1).saveAsTextFile("movie-out") #存储数据CSV格式 df3.coalesce(1).write.format("csv") \ .option("header","true").save("movie-out-csv") #parquet格式 df3.write.format('parquet').save("movie-out-parquet") #json格式 df3.coalesce(1).write.format("json") \ .save("movie-out-json") ############################################## spark.stop()
- 注:saveAsTextFile在进行数据存储时,如果目录存在,则会报错。另外,对于实际项目来说,数据量可能非常打,那么存储需要放到Hadoop
HDFS系统上。
输出
ls m*
[root@localhost wmsoft]# ls m* matplot01.py ml-small: links.csv movies.csv ratings.csv README.txt tags.csv movie-out: part-00000 _SUCCESS movie-out-csv: part-00000-6a5a4ab5-9046-475b-8c11-abf200742716-c000.csv _SUCCESS movie-out-json: part-00000-ef441383-e81a-4d82-a24c-d370c97cc4a1-c000.json _SUCCESS movie-out-parquet: part-00000-7c719014-2633-4629-afad-c52a84175c17-c000.snappy.parquet _SUCCESS myuser.parquet: part-00000-068c6bf0-8b3f-4a83-82f5-42d6ee9858d0-c000.snappy.parquet _SUCCESS