Spark全栈:航班数据收集与序列化
本章的代码示例可以在Agile_Data_Code_2/ch04找到:https://github.com/rjurney/Agile_Data_Code_2
整体使用
安装配置好整个软件栈要花费一番功夫。这番功夫不是白费的,有了这套软件栈,当看到用户对系统造成的负载升高,系统需要扩容的时候,我们就不用慌慌张张重新配置环境了。从此以后,我们可以把全部精力都放在迭代开发上,不断提升产品本身。
现在,让我们把2015年从美国起飞的所有航班的起降时间信息数据拿出来,看看我们要如何使用整套软件栈。
一条原子记录是一条基础记录,是我们分析事件时的最小粒度。我们可以对一组原子记录进行聚合、计数、分段、分块,但是单条记录是不可分为多条的。因此,原子记录表示的是基本事实,操作原子记录是我们理解数据的实际意义并构建应用程序的关键所在。大数据的核心就是能够使用NoSQL工具,对这些最琐碎的数据进行分析,并获得原先不可能达到的深层次理解。
下载数据
在图4-2中,我们可以看到事件序列化的过程。因此,我们要把本书接下来会用到的核心数据下载下来,参见download.sh:
# # Script to download data for book # mkdir ./data # Get on-time records for all flights in 2015 - 273MB # wget -P ../data/ http://s3.amazonaws.com/agile_data_science/On_Time_On_Time_Performance_2015.csv.gz # Get openflights data wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat mv /tmp/airports.dat ../data/airports.csv wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/airlines.dat mv /tmp/airlines.dat ../data/airlines.csv wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/routes.dat mv /tmp/routes.dat ../data/routes.csv wget -P /tmp/ https://raw.githubusercontent.com/jpatokal/openflights/master/data/countries.dat mv /tmp/countries.dat ../data/countries.csv # Get FAA data wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/aircraft.txt wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/ata.txt wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/compt.txt wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/engine.txt wget -P ../data/ http://av-info.faa.gov/data/ACRef/tab/prop.txt # Get Aircraft database # wget -P /tmp/ http://registry.faa.gov/database/AR042016.zip # unzip -d ../data/ /tmp/AR042016.zip # Get FAA Registration data # wget -P /tmp/ http://registry.faa.gov/database/AR042016.zip # unzip -d ../data/ /tmp/AR042016.zip
裁剪数据
作为开始,我们先把航班准点数据记录中不需要的列裁剪掉,然后把得到的数据转换为Parquet格式。这样可以提高装载数据时的性能,毕竟我们接下来需要经常装载数据。而在实际中,你可能要保留任何将来可能会有用的数据。注意,spark-csv的inferSchema参数有错误,所以我们要在保存处理过的数据之前,手动转换数值型字段的数据类型。
如果你想看懂下面这条查询,请参阅交通统计局对准点情况记录(这份数据在第3章中介绍过)的数据描述(https://www.transtats.bts.gov/Fields.asp?Table_ID=236)。
运行下面的代码,可以将数据中我们用不到的列都裁剪掉:
convert_data.py:
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# #注意:json数组不能有[] #df = spark.read.format('json') \ # .load('user.json') #mode : overwrite | append ... #user.csv不是一个文件,而是一个文件夹 #df.write.csv("user.csv","append") #spark.read.csv("user.csv").show() # Loads CSV with header parsing and type inference, in one line! on_time_dataframe = spark.read.format('com.databricks.spark.csv')\ .options( header='true', treatEmptyValuesAsNulls='true', )\ .load('data/On_Time_On_Time_Performance_2015.csv.gz') on_time_dataframe.registerTempTable("on_time_performance") trimmed_cast_performance = spark.sql(""" SELECT Year, Quarter, Month, DayofMonth, DayOfWeek, FlightDate, Carrier, TailNum, FlightNum, Origin, OriginCityName, OriginState, Dest, DestCityName, DestState, DepTime, cast(DepDelay as float), cast(DepDelayMinutes as int), cast(TaxiOut as float), cast(TaxiIn as float), WheelsOff, WheelsOn, ArrTime, cast(ArrDelay as float), cast(ArrDelayMinutes as float), cast(Cancelled as int), cast(Diverted as int), cast(ActualElapsedTime as float), cast(AirTime as float), cast(Flights as int), cast(Distance as float), cast(CarrierDelay as float), cast(WeatherDelay as float), cast(NASDelay as float), cast(SecurityDelay as float), cast(LateAircraftDelay as float), CRSDepTime, CRSArrTime FROM on_time_performance """) # Replace on_time_performance table with our new, trimmed table and show its contents trimmed_cast_performance.registerTempTable("on_time_performance") trimmed_cast_performance.show() # Verify we can sum numeric columns spark.sql("""SELECT SUM(WeatherDelay), SUM(CarrierDelay), SUM(NASDelay), SUM(SecurityDelay), SUM(LateAircraftDelay) FROM on_time_performance """).show() # Save records as gzipped json lines trimmed_cast_performance.toJSON()\ .saveAsTextFile( 'data/on_time_performance.jsonl.gz', 'org.apache.hadoop.io.compress.GzipCodec' ) # View records on filesystem # gunzip -c data/on_time_performance.jsonl.gz/part-00000.gz | head # Save records using Parquet trimmed_cast_performance.write.mode("overwrite").parquet("data/on_time_performance.parquet") # Load JSON records back on_time_dataframe = spark.read.json('data/on_time_performance.jsonl.gz') on_time_dataframe.show() # Load the parquet file back on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet') on_time_dataframe.show()
输出数据
python3 convert_data.py:
+-----------------+-----------------+-------------+------------------+----------------------+ |sum(WeatherDelay)|sum(CarrierDelay)|sum(NASDelay)|sum(SecurityDelay)|sum(LateAircraftDelay)| +-----------------+-----------------+-------------+------------------+----------------------+ | 3100233.0| 2.0172956E7| 1.4335762E7| 80985.0| 2.4961931E7| +-----------------+-----------------+-------------+------------------+----------------------+ +-----------------+-------+--------+---------------+-------+----------+----------+---------+-------+------------+---------+----------+--------+---------------+-------+----+--------------------+---------+--------+--------+----------+---------+-------+-----------------+-----+--------+------+--------------------+-----------+-------+-------------+-------+------+-------+------------+---------+--------+----+ |ActualElapsedTime|AirTime|ArrDelay|ArrDelayMinutes|ArrTime|CRSArrTime|CRSDepTime|Cancelled|Carrier|CarrierDelay|DayOfWeek|DayofMonth|DepDelay|DepDelayMinutes|DepTime|Dest| DestCityName|DestState|Distance|Diverted|FlightDate|FlightNum|Flights|LateAircraftDelay|Month|NASDelay|Origin| OriginCityName|OriginState|Quarter|SecurityDelay|TailNum|TaxiIn|TaxiOut|WeatherDelay|WheelsOff|WheelsOn|Year| +-----------------+-------+--------+---------------+-------+----------+----------+---------+-------+------------+---------+----------+--------+---------------+-------+----+--------------------+---------+--------+--------+----------+---------+-------+-----------------+-----+--------+------+--------------------+-----------+-------+-------------+-------+------+-------+------------+---------+--------+----+ | 82.0| 59.0| -6.0| 0.0| 1504| 1510| 1345| 0| AA| null| 4| 1| -3.0| 0| 1342| MEM| Memphis, TN| TN| 432.0| 0|2015-01-01| 1519| 1| null| 1| null| DFW|Dallas/Fort Worth...| TX| 1| null| N001AA| 7.0| 16.0| null| 1358| 1457|2015| | 95.0| 77.0| -9.0| 0.0| 1721| 1730| 1550| 0| AA| null| 4| 1| -4.0| 0| 1546| DFW|Dallas/Fort Worth...| TX| 432.0| 0|2015-01-01| 1519| 1| null| 1| null| MEM| Memphis, TN| TN| 1| null| N001AA| 9.0| 9.0| null| 1555| 1712|2015| | 176.0| 129.0| 26.0| 26.0| 2141| 2115| 1845| 0| AA| 0.0| 4|