Spark全栈:通过报表探索数据
来自CloudWiki
前言
接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会给图表增添交互性,把静态页面变成动态页面,通过链接的网络、表和图表的相关实体使得数据可供探索。这些都是数据价值金字塔中报表阶段的特性
本章的示例代码在Agile_Data_Code_2/tree/master/ch06中
git clone https://github.con/rjurney/Agile_Data_Code_2.git
提取航空公司为实体
为了构建报表,我们需要组合数据集的各种视图。构建这些视图对应的工作是枚举实体。我们在上一章中创建了实体“飞机”,它会是我们继续创建其他实体及建立实体间的关联关系以构建报表的基础。如上一章所述,在我们开始创建数据的视图之前,我们需要构建一个网页来展示图表与表格。那么下面我们就来创建一个新的实体“航空公司”,并且为每家航空公司提供一个页面。
我们从收集一家特定航空公司所有飞机的机尾编号入手。每个商业航班都是由某一家航空公司运营的,各个航空公司又拥有各自的机队,以及基地机场设施和人员,这些都是航空公司业务的核心资产。我们已经为每架飞机创建了页面,因此我们可以利用这一数据来创建每个航空公司的全部机尾编号列表。
使用PySpark把航空公司定义为飞机的分组
我们从准备每个航空公司的飞机机尾编号列表入手,代码如ch06/ extract_airlines.py文件所展示的。这会成为航空公司页面的基础:
airplanes = spark.read.json('data/airplanes.json') # # Who makes the airplanes in the US commercial fleet, as a % # # How many airplanes are made by each manufacturer? airplanes.registerTempTable("airplanes") manufacturer_counts = spark.sql("""SELECT Manufacturer, COUNT(*) AS Total FROM airplanes GROUP BY Manufacturer ORDER BY Total DESC""" ) manufacturer_counts.show(10) # show top 10 # How many airplanes total? total_airplanes = spark.sql( """SELECT COUNT(*) AS OverallTotal FROM airplanes""" ) print("Total airplanes: {}".format(total_airplanes.collect()[0].OverallTotal)) mfr_with_totals = manufacturer_counts.crossJoin(total_airplanes) mfr_with_totals = mfr_with_totals.rdd.map( lambda x: { 'Manufacturer': x.Manufacturer, 'Total': x.Total, 'Percentage': round( ( float(x.Total)/float(x.OverallTotal) ) * 100, 2 ) } ) mfr_with_totals.toDF().show() # # Same with sub-queries # relative_manufacturer_counts = spark.sql("""SELECT Manufacturer, COUNT(*) AS Total, ROUND( 100 * ( COUNT(*)/(SELECT COUNT(*) FROM airplanes) ), 2 ) AS PercentageTotal FROM airplanes GROUP BY Manufacturer ORDER BY Total DESC, Manufacturer LIMIT 10""" ) relative_manufacturer_counts.show(30) # show top 30 # # Now get these things on the web # relative_manufacturer_counts = relative_manufacturer_counts.rdd.map(lambda row: row.asDict()) grouped_manufacturer_counts = relative_manufacturer_counts.groupBy(lambda x: 1) # Save to Mongo in the airplanes_per_carrier relation import pymongo_spark pymongo_spark.activate() grouped_manufacturer_counts.saveToMongoDB( 'mongodb://localhost:27017/agile_data_science.airplane_manufacturer_totals' )