“Spark全栈:航班记录处理与发布”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
 
第5行: 第5行:
  
 
MongoDB的Spark支持简化了这一步骤。我们只需要引入并激活pymongo_spark包,把DataFrame转为RDD,然后调用saveToMongoDB方法。这些代码可以在ch04/pyspark_to_mongo.py中找到:
 
MongoDB的Spark支持简化了这一步骤。我们只需要引入并激活pymongo_spark包,把DataFrame转为RDD,然后调用saveToMongoDB方法。这些代码可以在ch04/pyspark_to_mongo.py中找到:
 +
 +
<nowiki>
 +
import pymongo
 +
import pymongo_spark
 +
# Important: activate pymongo_spark.
 +
pymongo_spark.activate()
 +
 +
on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')
 +
 +
# Note we have to convert the row to a dict to avoid https://jira.mongodb.org/browse/HADOOP-276
 +
as_dict = on_time_dataframe.rdd.map(lambda row: row.asDict())
 +
as_dict.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
 +
</nowiki>

2022年7月19日 (二) 22:27的最新版本

航班记录处理与发布

在收集了航班数据之后,让我们对数据进行处理(见图4-3)。为了获取初始状态,使用我们的整套软件栈逐步深入探索真实数据,首先让我们把航班准点记录直接发布到MongoDB和Elasticsearch中,这样我们就可以从网页上直接使用Mongo、Elasticsearch以及Flask等工具直接访问数据了。.

Ads2 0403.png

MongoDB的Spark支持简化了这一步骤。我们只需要引入并激活pymongo_spark包,把DataFrame转为RDD,然后调用saveToMongoDB方法。这些代码可以在ch04/pyspark_to_mongo.py中找到:

import pymongo
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()

on_time_dataframe = spark.read.parquet('data/on_time_performance.parquet')

# Note we have to convert the row to a dict to avoid https://jira.mongodb.org/browse/HADOOP-276
as_dict = on_time_dataframe.rdd.map(lambda row: row.asDict())
as_dict.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')