Spark全栈:航班记录处理与发布

来自CloudWiki
跳转至: 导航搜索

航班记录处理与发布

在收集了航班数据之后,让我们对数据进行处理(见图4-3)。为了获取初始状态,使用我们的整套软件栈逐步深入探索真实数据,首先让我们把航班准点记录直接发布到MongoDB和Elasticsearch中,这样我们就可以从网页上直接使用Mongo、Elasticsearch以及Flask等工具直接访问数据了。.

Ads2 0403.png

MongoDB的Spark支持简化了这一步骤。我们只需要引入并激活pymongo_spark包,把DataFrame转为RDD,然后调用saveToMongoDB方法。这些代码可以在ch04/pyspark_to_mongo.py中找到:

import pymongo
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()

on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')

# Note we have to convert the row to a dict to avoid https://jira.mongodb.org/browse/HADOOP-276
as_dict = on_time_dataframe.rdd.map(lambda row: row.asDict())
as_dict.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')