Spark全栈:航班记录处理与发布
来自CloudWiki
航班记录处理与发布
在收集了航班数据之后,让我们对数据进行处理(见图4-3)。为了获取初始状态,使用我们的整套软件栈逐步深入探索真实数据,首先让我们把航班准点记录直接发布到MongoDB和Elasticsearch中,这样我们就可以从网页上直接使用Mongo、Elasticsearch以及Flask等工具直接访问数据了。.
MongoDB的Spark支持简化了这一步骤。我们只需要引入并激活pymongo_spark包,把DataFrame转为RDD,然后调用saveToMongoDB方法。这些代码可以在ch04/pyspark_to_mongo.py中找到:
import pymongo import pymongo_spark # Important: activate pymongo_spark. pymongo_spark.activate() on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet') # Note we have to convert the row to a dict to avoid https://jira.mongodb.org/browse/HADOOP-276 as_dict = on_time_dataframe.rdd.map(lambda row: row.asDict()) as_dict.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')