“Spark全栈:通过报表探索数据”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
(创建页面,内容为“==前言== 接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会…”)
 
第17行: 第17行:
  
 
  <nowiki>
 
  <nowiki>
airplanes = spark.read.json('data/airplanes.json')
+
# Load the on-time parquet file
 +
on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')
  
#
+
# The first step is easily expressed as SQL: get all unique tail numbers for each airline
# Who makes the airplanes in the US commercial fleet, as a %
+
on_time_dataframe.registerTempTable("on_time_performance")
#
+
carrier_airplane = spark.sql(
 +
  "SELECT DISTINCT Carrier, TailNum FROM on_time_performance"
 +
  )
  
# How many airplanes are made by each manufacturer?
+
# Now we need to store a sorted group for each Carrier, along with a fleet count
airplanes.registerTempTable("airplanes")
+
airplanes_per_carrier = carrier_airplane.rdd\
manufacturer_counts = spark.sql("""SELECT
+
   .map(lambda nameTuple: (nameTuple[0], [nameTuple[1]]))\
  Manufacturer,
+
  .reduceByKey(lambda a, b: a + b)\
  COUNT(*) AS Total
+
  .map(lambda tuple:
FROM
+
      {
  airplanes
+
        'Carrier': tuple[0],  
GROUP BY
+
        'TailNumbers': sorted(
  Manufacturer
+
          filter(
ORDER BY
+
            lambda x: x is not None and x != '', tuple[1] # empty string tail numbers were getting through
   Total DESC"""
+
            )
)
+
          ),
manufacturer_counts.show(10) # show top 10
+
        'FleetCount': len(tuple[1])
 
+
       }
# How many airplanes total?
 
total_airplanes = spark.sql(
 
  """SELECT
 
  COUNT(*) AS OverallTotal
 
  FROM airplanes"""
 
)
 
print("Total airplanes: {}".format(total_airplanes.collect()[0].OverallTotal))
 
 
 
mfr_with_totals = manufacturer_counts.crossJoin(total_airplanes)
 
mfr_with_totals = mfr_with_totals.rdd.map(
 
  lambda x: {
 
    'Manufacturer': x.Manufacturer,
 
    'Total': x.Total,
 
    'Percentage': round(
 
      (
 
        float(x.Total)/float(x.OverallTotal)
 
      ) * 100,
 
       2
 
 
     )
 
     )
  }
+
airplanes_per_carrier.count() # 14
)
 
mfr_with_totals.toDF().show()
 
 
 
#
 
# Same with sub-queries
 
#
 
relative_manufacturer_counts = spark.sql("""SELECT
 
  Manufacturer,
 
  COUNT(*) AS Total,
 
  ROUND(
 
    100 * (
 
      COUNT(*)/(SELECT COUNT(*) FROM airplanes)
 
    ),
 
    2
 
  ) AS PercentageTotal
 
FROM
 
  airplanes
 
GROUP BY
 
  Manufacturer
 
ORDER BY
 
  Total DESC, Manufacturer
 
LIMIT 10"""
 
)
 
relative_manufacturer_counts.show(30) # show top 30
 
 
 
#
 
# Now get these things on the web
 
#
 
relative_manufacturer_counts = relative_manufacturer_counts.rdd.map(lambda row: row.asDict())
 
grouped_manufacturer_counts = relative_manufacturer_counts.groupBy(lambda x: 1)
 
  
 
# Save to Mongo in the airplanes_per_carrier relation
 
# Save to Mongo in the airplanes_per_carrier relation
 
import pymongo_spark
 
import pymongo_spark
 
pymongo_spark.activate()
 
pymongo_spark.activate()
grouped_manufacturer_counts.saveToMongoDB(
+
airplanes_per_carrier.saveToMongoDB(
   'mongodb://localhost:27017/agile_data_science.airplane_manufacturer_totals'
+
   'mongodb://localhost:27017/agile_data_science.airplanes_per_carrier'
 
)
 
)
 +
 
</nowiki>
 
</nowiki>

2021年8月14日 (六) 09:48的版本

前言

接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会给图表增添交互性,把静态页面变成动态页面,通过链接的网络、表和图表的相关实体使得数据可供探索。这些都是数据价值金字塔中报表阶段的特性

Bd21081401.png

本章的示例代码在Agile_Data_Code_2/tree/master/ch06中

git clone https://github.con/rjurney/Agile_Data_Code_2.git

提取航空公司为实体

为了构建报表,我们需要组合数据集的各种视图。构建这些视图对应的工作是枚举实体。我们在上一章中创建了实体“飞机”,它会是我们继续创建其他实体及建立实体间的关联关系以构建报表的基础。如上一章所述,在我们开始创建数据的视图之前,我们需要构建一个网页来展示图表与表格。那么下面我们就来创建一个新的实体“航空公司”,并且为每家航空公司提供一个页面。

我们从收集一家特定航空公司所有飞机的机尾编号入手。每个商业航班都是由某一家航空公司运营的,各个航空公司又拥有各自的机队,以及基地机场设施和人员,这些都是航空公司业务的核心资产。我们已经为每架飞机创建了页面,因此我们可以利用这一数据来创建每个航空公司的全部机尾编号列表。

使用PySpark把航空公司定义为飞机的分组

我们从准备每个航空公司的飞机机尾编号列表入手,代码如ch06/ extract_airlines.py文件所展示的。这会成为航空公司页面的基础:

# Load the on-time parquet file
on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')

# The first step is easily expressed as SQL: get all unique tail numbers for each airline
on_time_dataframe.registerTempTable("on_time_performance")
carrier_airplane = spark.sql(
  "SELECT DISTINCT Carrier, TailNum FROM on_time_performance"
  )

# Now we need to store a sorted group for each Carrier, along with a fleet count
airplanes_per_carrier = carrier_airplane.rdd\
  .map(lambda nameTuple: (nameTuple[0], [nameTuple[1]]))\
  .reduceByKey(lambda a, b: a + b)\
  .map(lambda tuple:
      {
        'Carrier': tuple[0], 
        'TailNumbers': sorted(
          filter(
            lambda x: x is not None and x != '', tuple[1] # empty string tail numbers were getting through
            )
          ),
        'FleetCount': len(tuple[1])
      }
    )
airplanes_per_carrier.count() # 14

# Save to Mongo in the airplanes_per_carrier relation
import pymongo_spark
pymongo_spark.activate()
airplanes_per_carrier.saveToMongoDB(
  'mongodb://localhost:27017/agile_data_science.airplanes_per_carrier'
)