PySpark实战:观察资料 实施
来自CloudWiki
介绍
在简要介绍了Matplotlib绘图工具的基本用法后,
接下来介绍如何用Spark对数据资料进行观察
无图形版代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import from_unixtime, to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); sc = spark.sparkContext ############################################# df = spark.read.csv('./ml-small/ratings.csv',header=True,inferSchema=True) df.show(10) df.printSchema() df.summary().show() #df.select("rating").summary().show() #打印资料的行数量和列数量 print(df.count(),len(df.columns)) #删除所有列的空值和NaN dfNotNull=df.na.drop() print(dfNotNull.count(),len(dfNotNull.columns)) #创建视图movie df.createOrReplaceTempView("movie") #spark sql df2 = spark.sql("select count(*) as counter, rating from \ movie group by rating order by rating asc") df2.show() ##############################################
- df.select("rating").summary().show() 只对rating进行统计分析
- spark.sql("select count(*) as counter, rating from movie group by rating order by rating asc")
按照用户评分进行分组,并统计每个分组的用户评价记录数量。
输出
+------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1| 4.0|964982703| | 1| 3| 4.0|964981247| | 1| 6| 4.0|964982224| | 1| 47| 5.0|964983815| | 1| 50| 5.0|964982931| | 1| 70| 3.0|964982400| | 1| 101| 5.0|964980868| | 1| 110| 4.0|964982176| | 1| 151| 5.0|964984041| | 1| 157| 5.0|964984100| +------+-------+------+---------+ only showing top 10 rows root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- rating: double (nullable = true) |-- timestamp: integer (nullable = true) +-------+------------------+----------------+------------------+---------------- ----+ |summary| userId| movieId| rating| times tamp| +-------+------------------+----------------+------------------+---------------- ----+ | count| 100836| 100836| 100836| 10 0836| | mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.20594608736846 95E9| | stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.16261035995130 78E8| | min| 1| 1| 0.5| 82812 4615| | 25%| 177| 1199| 3.0| 101853 5155| | 50%| 325| 2991| 3.5| 118608 6516| | 75%| 477| 8092| 4.0| 143599 3828| | max| 610| 193609| 5.0| 153779 9250| +-------+------------------+----------------+------------------+---------------- ----+ 100836 4 100836 4 +-------+------+ |counter|rating| +-------+------+ | 1370| 0.5| | 2811| 1.0| | 1791| 1.5| | 7551| 2.0| | 5550| 2.5| | 20047| 3.0| | 13136| 3.5| | 26818| 4.0| | 8551| 4.5| | 13211| 5.0| +-------+------+
可视化版代码(未验证)
在上面代码基础上增加了可视化。
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import from_unixtime, to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); sc = spark.sparkContext ############################################# df = spark.read.csv('./ml-small/ratings.csv',header=True,inferSchema=True) df.show(10) df.printSchema() df.summary().show() #df.select("rating").summary().show() #打印资料的行数量和列数量 print(df.count(),len(df.columns)) #删除所有列的空值和NaN dfNotNull=df.na.drop() print(dfNotNull.count(),len(dfNotNull.columns)) #创建视图movie df.createOrReplaceTempView("movie") #spark sql df2 = spark.sql("select count(*) as counter, rating from \ movie group by rating order by rating asc") df2.show() ############################################## from matplotlib import pyplot as plt #Pandas >= 0.19.2 must be installed pdf = df2.toPandas() x = pdf["rating"] y = pdf["counter"] plt.xlabel("rating") plt.ylabel("counter") plt.title("movie rating") plt.bar(x,y) #显示数值标签 for x1,y1 in zip(x,y): plt.text(x1, y1+0.05, '%.0f' % y1, ha='center', va= 'bottom',fontsize=11) plt.show() ############################################## sc.stop()