“Spark全栈:通过报表探索数据”的版本间的差异
来自CloudWiki
(创建页面,内容为“==前言== 接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会…”) |
|||
第17行: | 第17行: | ||
<nowiki> | <nowiki> | ||
− | + | # Load the on-time parquet file | |
+ | on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet') | ||
− | # | + | # The first step is easily expressed as SQL: get all unique tail numbers for each airline |
− | + | on_time_dataframe.registerTempTable("on_time_performance") | |
− | + | carrier_airplane = spark.sql( | |
+ | "SELECT DISTINCT Carrier, TailNum FROM on_time_performance" | ||
+ | ) | ||
− | # | + | # Now we need to store a sorted group for each Carrier, along with a fleet count |
− | + | airplanes_per_carrier = carrier_airplane.rdd\ | |
− | + | .map(lambda nameTuple: (nameTuple[0], [nameTuple[1]]))\ | |
− | + | .reduceByKey(lambda a, b: a + b)\ | |
− | + | .map(lambda tuple: | |
− | + | { | |
− | + | 'Carrier': tuple[0], | |
− | + | 'TailNumbers': sorted( | |
− | + | filter( | |
− | + | lambda x: x is not None and x != '', tuple[1] # empty string tail numbers were getting through | |
− | + | ) | |
− | + | ), | |
− | + | 'FleetCount': len(tuple[1]) | |
− | + | } | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
) | ) | ||
− | + | airplanes_per_carrier.count() # 14 | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
# Save to Mongo in the airplanes_per_carrier relation | # Save to Mongo in the airplanes_per_carrier relation | ||
import pymongo_spark | import pymongo_spark | ||
pymongo_spark.activate() | pymongo_spark.activate() | ||
− | + | airplanes_per_carrier.saveToMongoDB( | |
− | 'mongodb://localhost:27017/agile_data_science. | + | 'mongodb://localhost:27017/agile_data_science.airplanes_per_carrier' |
) | ) | ||
+ | |||
</nowiki> | </nowiki> |
2021年8月14日 (六) 09:48的版本
前言
接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会给图表增添交互性,把静态页面变成动态页面,通过链接的网络、表和图表的相关实体使得数据可供探索。这些都是数据价值金字塔中报表阶段的特性
本章的示例代码在Agile_Data_Code_2/tree/master/ch06中
git clone https://github.con/rjurney/Agile_Data_Code_2.git
提取航空公司为实体
为了构建报表,我们需要组合数据集的各种视图。构建这些视图对应的工作是枚举实体。我们在上一章中创建了实体“飞机”,它会是我们继续创建其他实体及建立实体间的关联关系以构建报表的基础。如上一章所述,在我们开始创建数据的视图之前,我们需要构建一个网页来展示图表与表格。那么下面我们就来创建一个新的实体“航空公司”,并且为每家航空公司提供一个页面。
我们从收集一家特定航空公司所有飞机的机尾编号入手。每个商业航班都是由某一家航空公司运营的,各个航空公司又拥有各自的机队,以及基地机场设施和人员,这些都是航空公司业务的核心资产。我们已经为每架飞机创建了页面,因此我们可以利用这一数据来创建每个航空公司的全部机尾编号列表。
使用PySpark把航空公司定义为飞机的分组
我们从准备每个航空公司的飞机机尾编号列表入手,代码如ch06/ extract_airlines.py文件所展示的。这会成为航空公司页面的基础:
# Load the on-time parquet file on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet') # The first step is easily expressed as SQL: get all unique tail numbers for each airline on_time_dataframe.registerTempTable("on_time_performance") carrier_airplane = spark.sql( "SELECT DISTINCT Carrier, TailNum FROM on_time_performance" ) # Now we need to store a sorted group for each Carrier, along with a fleet count airplanes_per_carrier = carrier_airplane.rdd\ .map(lambda nameTuple: (nameTuple[0], [nameTuple[1]]))\ .reduceByKey(lambda a, b: a + b)\ .map(lambda tuple: { 'Carrier': tuple[0], 'TailNumbers': sorted( filter( lambda x: x is not None and x != '', tuple[1] # empty string tail numbers were getting through ) ), 'FleetCount': len(tuple[1]) } ) airplanes_per_carrier.count() # 14 # Save to Mongo in the airplanes_per_carrier relation import pymongo_spark pymongo_spark.activate() airplanes_per_carrier.saveToMongoDB( 'mongodb://localhost:27017/agile_data_science.airplanes_per_carrier' )