Spark全栈:通过报表探索数据

来自CloudWiki
Cloud17讨论 | 贡献2021年8月14日 (六) 09:46的版本 (创建页面,内容为“==前言== 接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会…”)
(差异) ←上一版本 | 最后版本 (差异) | 下一版本→ (差异)
跳转至: 导航搜索

前言

接下来就是我们的第三个敏捷开发冲刺周期了,我们要把图表页面进一步开发为完整的报表。在这一步骤中,我们会给图表增添交互性,把静态页面变成动态页面,通过链接的网络、表和图表的相关实体使得数据可供探索。这些都是数据价值金字塔中报表阶段的特性

Bd21081401.png

本章的示例代码在Agile_Data_Code_2/tree/master/ch06中

git clone https://github.con/rjurney/Agile_Data_Code_2.git

提取航空公司为实体

为了构建报表,我们需要组合数据集的各种视图。构建这些视图对应的工作是枚举实体。我们在上一章中创建了实体“飞机”,它会是我们继续创建其他实体及建立实体间的关联关系以构建报表的基础。如上一章所述,在我们开始创建数据的视图之前,我们需要构建一个网页来展示图表与表格。那么下面我们就来创建一个新的实体“航空公司”,并且为每家航空公司提供一个页面。

我们从收集一家特定航空公司所有飞机的机尾编号入手。每个商业航班都是由某一家航空公司运营的,各个航空公司又拥有各自的机队,以及基地机场设施和人员,这些都是航空公司业务的核心资产。我们已经为每架飞机创建了页面,因此我们可以利用这一数据来创建每个航空公司的全部机尾编号列表。

使用PySpark把航空公司定义为飞机的分组

我们从准备每个航空公司的飞机机尾编号列表入手,代码如ch06/ extract_airlines.py文件所展示的。这会成为航空公司页面的基础:

airplanes = spark.read.json('data/airplanes.json')

#
# Who makes the airplanes in the US commercial fleet, as a %
#

# How many airplanes are made by each manufacturer?
airplanes.registerTempTable("airplanes")
manufacturer_counts = spark.sql("""SELECT
  Manufacturer,
  COUNT(*) AS Total
FROM
  airplanes
GROUP BY
  Manufacturer
ORDER BY
  Total DESC"""
)
manufacturer_counts.show(10) # show top 10

# How many airplanes total?
total_airplanes = spark.sql(
  """SELECT
  COUNT(*) AS OverallTotal
  FROM airplanes"""
)
print("Total airplanes: {}".format(total_airplanes.collect()[0].OverallTotal))

mfr_with_totals = manufacturer_counts.crossJoin(total_airplanes)
mfr_with_totals = mfr_with_totals.rdd.map(
  lambda x: {
    'Manufacturer': x.Manufacturer,
    'Total': x.Total,
    'Percentage': round(
      (
        float(x.Total)/float(x.OverallTotal)
      ) * 100,
      2
    )
  }
)
mfr_with_totals.toDF().show()

#
# Same with sub-queries
#
relative_manufacturer_counts = spark.sql("""SELECT
  Manufacturer,
  COUNT(*) AS Total,
  ROUND(
    100 * (
      COUNT(*)/(SELECT COUNT(*) FROM airplanes)
    ),
    2
  ) AS PercentageTotal
FROM
  airplanes
GROUP BY
  Manufacturer
ORDER BY
  Total DESC, Manufacturer
LIMIT 10"""
)
relative_manufacturer_counts.show(30) # show top 30

#
# Now get these things on the web
#
relative_manufacturer_counts = relative_manufacturer_counts.rdd.map(lambda row: row.asDict())
grouped_manufacturer_counts = relative_manufacturer_counts.groupBy(lambda x: 1)

# Save to Mongo in the airplanes_per_carrier relation
import pymongo_spark
pymongo_spark.activate()
grouped_manufacturer_counts.saveToMongoDB(
  'mongodb://localhost:27017/agile_data_science.airplane_manufacturer_totals'
)