Spark全栈:航班数据收集与序列化

来自CloudWiki
跳转至: 导航搜索

本章的代码示例可以在Agile_Data_Code_2/ch04找到:https://github.com/rjurney/Agile_Data_Code_2

整体使用

安装配置好整个软件栈要花费一番功夫。这番功夫不是白费的,有了这套软件栈,当看到用户对系统造成的负载升高,系统需要扩容的时候,我们就不用慌慌张张重新配置环境了。从此以后,我们可以把全部精力都放在迭代开发上,不断提升产品本身。

现在,让我们把2015年从美国起飞的所有航班的起降时间信息数据拿出来,看看我们要如何使用整套软件栈。

一条原子记录是一条基础记录,是我们分析事件时的最小粒度。我们可以对一组原子记录进行聚合、计数、分段、分块,但是单条记录是不可分为多条的。因此,原子记录表示的是基本事实,操作原子记录是我们理解数据的实际意义并构建应用程序的关键所在。大数据的核心就是能够使用NoSQL工具,对这些最琐碎的数据进行分析,并获得原先不可能达到的深层次理解。

下载数据

在图4-2中,我们可以看到事件序列化的过程。因此,我们要把本书接下来会用到的核心数据下载下来,参见download.sh:

#
# Script to download data for book
#
mkdir ./data

# Get on-time records for all flights in 2015 - 273MB
# wget -P ../data/ http://s3.amazonaws.com/agile_data_science/On_Time_On_Time_Performance_2015.csv.gz

# Get openflights data
wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat
mv /tmp/airports.dat ../data/airports.csv

wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/airlines.dat
mv /tmp/airlines.dat ../data/airlines.csv

wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/routes.dat
mv /tmp/routes.dat ../data/routes.csv

wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/countries.dat
mv /tmp/countries.dat ../data/countries.csv

# Get FAA data
wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/aircraft.txt
wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/ata.txt
wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/compt.txt
wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/engine.txt
wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/prop.txt

# Get Aircraft database
# wget -P /tmp/ http://registry.faa.gov/database/AR042016.zip
# unzip -d ../data/ /tmp/AR042016.zip

# Get FAA Registration data
# wget -P /tmp/ http://registry.faa.gov/database/AR042016.zip
# unzip -d ../data/ /tmp/AR042016.zip

裁剪数据

作为开始,我们先把航班准点数据记录中不需要的列裁剪掉,然后把得到的数据转换为Parquet格式。这样可以提高装载数据时的性能,毕竟我们接下来需要经常装载数据。而在实际中,你可能要保留任何将来可能会有用的数据。注意,spark-csv的inferSchema参数有错误,所以我们要在保存处理过的数据之前,手动转换数值型字段的数据类型。

Ags2 0402.png

如果你想看懂下面这条查询,请参阅交通统计局对准点情况记录(这份数据在第3章中介绍过)的数据描述(https://www.transtats.bts.gov/Fields.asp?Table_ID=236)。

运行下面的代码,可以将数据中我们用不到的列都裁剪掉:

convert_data.py:


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
#注意:json数组不能有[]
#df = spark.read.format('json') \
#        .load('user.json')
#mode : overwrite | append ...
#user.csv不是一个文件,而是一个文件夹
#df.write.csv("user.csv","append")
#spark.read.csv("user.csv").show()

# Loads CSV with header parsing and type inference, in one line!
on_time_dataframe = spark.read.format('com.databricks.spark.csv')\
  .options(
    header='true',
    treatEmptyValuesAsNulls='true',
  )\
  .load('data/On_Time_On_Time_Performance_2015.csv.gz')
on_time_dataframe.registerTempTable("on_time_performance")

trimmed_cast_performance = spark.sql("""
SELECT
  Year, Quarter, Month, DayofMonth, DayOfWeek, FlightDate,
  Carrier, TailNum, FlightNum,
  Origin, OriginCityName, OriginState,
  Dest, DestCityName, DestState,
  DepTime, cast(DepDelay as float), cast(DepDelayMinutes as int),
  cast(TaxiOut as float), cast(TaxiIn as float),
  WheelsOff, WheelsOn,
  ArrTime, cast(ArrDelay as float), cast(ArrDelayMinutes as float),
  cast(Cancelled as int), cast(Diverted as int),
  cast(ActualElapsedTime as float), cast(AirTime as float),
  cast(Flights as int), cast(Distance as float),
  cast(CarrierDelay as float), cast(WeatherDelay as float), cast(NASDelay as float),
  cast(SecurityDelay as float), cast(LateAircraftDelay as float),
  CRSDepTime, CRSArrTime
FROM
  on_time_performance
""")

# Replace on_time_performance table with our new, trimmed table and show its contents
trimmed_cast_performance.registerTempTable("on_time_performance")
trimmed_cast_performance.show()

# Verify we can sum numeric columns
spark.sql("""SELECT
  SUM(WeatherDelay), SUM(CarrierDelay), SUM(NASDelay),
  SUM(SecurityDelay), SUM(LateAircraftDelay)
FROM on_time_performance
""").show()

# Save records as gzipped json lines
trimmed_cast_performance.toJSON()\
  .saveAsTextFile(
    'data/on_time_performance.jsonl.gz',
    'org.apache.hadoop.io.compress.GzipCodec'
  )

# View records on filesystem
# gunzip -c data/on_time_performance.jsonl.gz/part-00000.gz | head

# Save records using Parquet
trimmed_cast_performance.write.mode("overwrite").parquet("data/on_time_performance.parquet")

# Load JSON records back
on_time_dataframe = spark.read.json('data/on_time_performance.jsonl.gz')
on_time_dataframe.show()

# Load the parquet file back
on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')
on_time_dataframe.show()


输出数据

python3 convert_data.py:

+-----------------+-----------------+-------------+------------------+----------------------+
|sum(WeatherDelay)|sum(CarrierDelay)|sum(NASDelay)|sum(SecurityDelay)|sum(LateAircraftDelay)|
+-----------------+-----------------+-------------+------------------+----------------------+
|        3100233.0|      2.0172956E7|  1.4335762E7|           80985.0|           2.4961931E7|
+-----------------+-----------------+-------------+------------------+----------------------+

+-----------------+-------+--------+---------------+-------+----------+----------+---------+-------+------------+---------+----------+--------+---------------+-------+----+--------------------+---------+--------+--------+----------+---------+-------+-----------------+-----+--------+------+--------------------+-----------+-------+-------------+-------+------+-------+------------+---------+--------+----+
|ActualElapsedTime|AirTime|ArrDelay|ArrDelayMinutes|ArrTime|CRSArrTime|CRSDepTime|Cancelled|Carrier|CarrierDelay|DayOfWeek|DayofMonth|DepDelay|DepDelayMinutes|DepTime|Dest|        DestCityName|DestState|Distance|Diverted|FlightDate|FlightNum|Flights|LateAircraftDelay|Month|NASDelay|Origin|      OriginCityName|OriginState|Quarter|SecurityDelay|TailNum|TaxiIn|TaxiOut|WeatherDelay|WheelsOff|WheelsOn|Year|
+-----------------+-------+--------+---------------+-------+----------+----------+---------+-------+------------+---------+----------+--------+---------------+-------+----+--------------------+---------+--------+--------+----------+---------+-------+-----------------+-----+--------+------+--------------------+-----------+-------+-------------+-------+------+-------+------------+---------+--------+----+
|             82.0|   59.0|    -6.0|            0.0|   1504|      1510|      1345|        0|     AA|        null|        4|         1|    -3.0|              0|   1342| MEM|         Memphis, TN|       TN|   432.0|       0|2015-01-01|     1519|      1|             null|    1|    null|   DFW|Dallas/Fort Worth...|         TX|      1|         null| N001AA|   7.0|   16.0|        null|     1358|    1457|2015|
|             95.0|   77.0|    -9.0|            0.0|   1721|      1730|      1550|        0|     AA|        null|        4|         1|    -4.0|              0|   1546| DFW|Dallas/Fort Worth...|       TX|   432.0|       0|2015-01-01|     1519|      1|             null|    1|    null|   MEM|         Memphis, TN|         TN|      1|         null| N001AA|   9.0|    9.0|        null|     1555|    1712|2015|
|            176.0|  129.0|    26.0|           26.0|   2141|      2115|      1845|        0|     AA|         0.0|        4|