Spark全栈:工具集概览

来自CloudWiki
跳转至: 导航搜索

敏捷开发工具栈的要求

对于数据科学技术栈来说,为了实现敏捷性,有那些必需的要求?

一个要求是栈的每一层都要水平可伸缩。往集群中再加一台机器比升级昂贵的专有硬件要好得多。如果要重写预测模型的实现才能重新部署,这就不敏捷了。这就是为什么我们要使用Spark MLlib而不是那些专门为单机设计的工具。

另一个要求是在栈的各层之间上下传递数据必须要能一行代码解决。在今天的配置密集型环境中,这是一个比较高的要求,但是我们可以通过精心挑选工具来满足。

基础工具集

Python3

由于Spark 2.1.0无法与Python 3.6兼容,所以我们在本书中使用了Python 3.5。本书原版出版后不久Spark 2.1.1就发布了,后续版本已经兼容于Python 3.6。

Anaconda与Miniconda

在本书中,我们使用Anaconda Python 3.5,因为Anaconda已成为数据科学领域领先的Python发行版。Anaconda是Continuum Analytics出品的Python发行版,包含超过400个流行的数据科学库。

尽管我推荐在你们自己的电脑上使用功能齐全的Anaconda,在Vagrant和EC2映像中我使用的是Anaconda的小弟弟Miniconda。这是因为Anaconda太大了,下载会花很长时间(20~30分钟)。而下载Miniconda只要几分钟。Miniconda和Anaconda很像,只是少装了一些包。

Jupyter笔记本

在第7章和第9章中我们会利用IPython/Jupyter笔记本交互式地使用Python,进行数据可视化、训练预测模型并改进。

使用JSON行和Parquet序列化事件

在我们的工具栈中,我们使用的一种序列化系统为JSON行(http://jsonlines.org/) (见图2-10)。你可能听到这种格式被称为换行符分隔的JSON格式,即NDJSON (http://ndjson.org/),不过确切来说JSON行格式不支持空行,而NDJSON支持。JSON使我们可以用一种通用格式通过各种语言和工具访问数据。

Python中的JSON

json模块在Python 2.7和3.x的标准库中(http://bit.ly/1upkGOV)不需要额外安装。要读取和写入JSON行文件,我们只需要短短几行代码。参见ch02/test_json.py

我们后面还会用到这些帮助函数,你可以在utils.py(https://github. com/rjurney/Agile_Data_Code_2/blob/master/lib/util.py)中找到它们以及其他一些全书都会用到的工具函数。就是这样!在Python中使用JSON行几乎不费吹灰之力。


收集数据

除了用于实时作业,Kafka(见图2-11)也成为了进行数据混洗的一种不错的选择。

使用Apache Kafka分发流数据

我们要使用Kafka流数据和Spark Streaming来进行“准实时”的预测。

Kafka也可以用来收集数据,聚合存入HDFS或Amazon S3这样的批量存储中。

通过kafka-python实现来在Python中使用Kafka

kafka-python(https://github.com/dpkp/kafka-python)为使用Python操作Kafka提供了一种简易方式。

我们会在第8章中使用kafka-python包来从Flask网络应用中发出预测事件,然后在PySpark Streaming中完成处理。我们将会使用PySpark Streaming来处理大规模的Kafka流数据中的消息。

数据处理

使用Spark进行数据处理

Spark是领先的分布式通用数据处理平台。Spark把数据处理切分为小任务交给一群廉价的PC,每个任务在单台机器的磁盘和内存上执行。

Spark的任务是协调这些机器为一个整体的计算平台。Spark是一个分布式的平台,这对于支持任意规模的数据至关重要,而且Spark在这一方面做得非常好。它既可以在一台机器上以“本地模式”运行得很好,也可以在数千台机器组成的集群上运行得不错。

Spark也是优秀的黏合剂,它提供了包括Kafka和MongoDB等很多系统的连接器,可以把各种系统整合到一起。

Spark基于HDFS或S3运行,提供了Spark SQL、Spark MLlib和Spark Streaming等组件。

Python2022071001.png

Spark的本地模式可以让我们在开发中在本地的小规模数据集上运行Spark。在整本书中我们都使用Spark本地模式。这样你就可以通过本地开发进行学习,而等数据量变大的时候再迁移到Spark集群上。

在安装好了Spark及其依赖,并配置好环境之后,我们可以简单试用一下Spark。你可以使用pyspark命令在任何位置启动PySpark,不过如果是要运行本书的示例代码,你应该从Agile_Data_Code_2项目根目录启动PySpark。如果你是Spark新手,不妨阅读Spark编程指南(https://spark.apache.org/docs/1.6.1/programming-guide.html),然后跟着做一遍。

在使用PySpark时,你最好在浏览器单独的标签页中开着API文档,以便临时参考。PySpark有两套API:RDD(http://bit.ly/2p5IM9z)和DataFDataFrame。(你可能需要分别在浏览器不同的标签页中查询RDD、DataFrame和MLlib的相关文档。)你可能还需要查询Spark ML文档。


Spark的实时计算和批量计算

使用Kafka很简单,但是我们稍后会看到:这样一个简单的框架可以用简单的方式创建复杂的数据流。Kafka提供的全局队列的抽象更是非常强大。我们只会用Kafka来部署Spark Streaming实现的预测,但是Kafka其实还有很多其他的功能。

尽管Kafka很强大,但是我们会在本书中花大部分时间来做批量计算。我们的原则是:“只要能用批量模式,就用批量模式。”运维一个Spark集群比运维一堆Kafka实时工作者要容易得多。尽管你可以回放Kafka的历史消息来实现和批量操作等价的效果,但批量计算才是专为组成数据科学的应用研究过程而优化的。

如果你决定从批量计算转到实时流计算,PySpark还是能满足你的需求的!在PySpark批量计算模式中用来处理消息的代码,可以原封不动地放到PySpark Streaming中来处理来自Kafka的消息。


使用PySpark Streaming处理流数据

使用PySpark Streaming处理Kafka流数据要比使用普通的Spark稍微复杂一些。

使用scikit-learn与Spark MLlib进行机器学习

我们要使用scikit-learn(简称sklearn)和Spark MLlib构建预测模型。我们会使用sklearn做回归分析,使用Spark MLlib做分类。

为什么有了Spark MLlib还要使用scikit-learn

在这样一本书中引入scikit learn的主要原因是在实践中它真的非常好用。Spark MLlib是专为大规模数据设计的,而大数据却经常会在提取特征时整合归约为很小的数据集。这意味着sklearn有的时候比Spark MLlib更好。如果你要在数据流的中间使用简单的机器学习算法,那么务必使用MLlib。但是如果要实时预测而且数据能放进内存中,请考虑使用sklearn。我们会在第7章中同时用到这两个工具,而在第8章和第9章中只使用Spark MLlib。

使用Apache Airflow(孵化项目)进行调度

Apache Airflow(孵化项目)(https://airflow.incubator.apache.org/)是有向无环图(DAG)的一种调度器。我们将在PySpark中创建一些数据管道,而DAG对于描述这种数据管道非常方便。Airflow让我们把长长的数据管道分为多个逻辑上相连的脚本。我们将使用Airflow部署构成预测应用的数据管道(也就是“数据流”)。Airflow让我们可以调度应用来定期执行,比如每天、每小时等。

Airflow正在成为领先的开源数据管道调度器,因为它使用Python代码进行控制,而不是使用配置文件。这是配置调度器的一种更“干净”的方式。

Airflow是一个批量计算的工具。值得注意的是,只要能把应用部署为批处理模式,可能就应该把它部署为批处理模式。如果能安排应用程序的代码每天或者每小时甚至是每10分钟运行一次,那么部署、运维和维护的工作就简单得多了。

Airflow的代码可以在GitHub(https://github.com/apache/incubator-airflow)上找到。Airbnb创建了Airflow项目,并且提供了一个带有截图、视频,以及其他文档的精美的Airflow介绍页面(http://nerds.airbnb.com/airflow/)。注意,我们是以开发为目的配置Airflow的。要在生产环境中使用的话,你需要先验证Airflow是不是能和真正的Spark集群共同工作。


我们可以得到既能用于开发环境的PySpark控制台,又能用于生产环境中的Airflow的脚本。

用Python创建Airflow DAG

发布数据

使用MongoDB发布数据

Spark不能直接和网络应用服务器通信。为了把数据提供给网络应用,我们需要把它发布到某个数据库中。有很多不错的选择,我们选择的是MongoDB,因为它简单易用,面向文档,而且和Spark集成得很好(见图2-16)。有了MongoDB和PySpark,我们可以在PySpark中定义任意的结构,然后使用MongoDB把它保存为与结构相对应的关系。在我们创建新的关系时,不用费心管理表结构,因为我们在PySpark中就把数据处理成了用来发布的形式。这就是敏捷!

从PySpark向MongoDB推送数据

从PySpark向MongoDB推送数据是很简单的。

在使用mongo-hadoop(https://github.com/mongodb/mongo-hadoop)项目配置PySpark成功连上MongoDB之后,我们可以照常使用PySpark。如ch02/pyspark_mongodb.py所示,我们可以使用pymongo spark模块把文档存储到刚才用的MongoDB中。


搜索数据

使用Elasticsearch搜索数据

Elasticsearch(https://www.elastic.co/cn/)是“用来搜索的Hadoop”,因为它提供了健壮而易用的搜索解决方案,降低了个人实现数据搜索的门槛,不论数据量是大还是小。Elasticsearch有一个简单的RESTful的JSON接口,因此我们可以在命令行中使用任何语言使用它。

我们要创建一个分片为1且数据镜像为1的索引,这对开发来说够用了。如果是生产环境,你可能要让索引有多个分片,同时保留多份数据镜像,以满足数据冗余和性能的要求。

Elasticsearch与PySpark

要把数据从PySpark写入Elasticsearch中(或者从Elasticsearch读取数据到PySpark中),我们要使用Elasticsearch for Hadoop(https://www.elastic.co/products/hadoop)。

让PySpark数据可以被搜索。我们用ch02/pyspark_elasticsearch.py(https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch02/pyspark_elasticsearch.py)把数据从PySpark保存到Elasticsearch中:

Elasticsearch也是一个很好的键值对存储系统,也就是文档存储系统!它可以轻易替代我们软件栈中的MongoDB,而且这么做减少了软件栈的组件,能简化并提升软件栈的伸缩性。

通过pyelasticsearch使用Python操作Elasticsearch

pyelasticsearch(http://pyelasticsearch.readthedocs.org/en/latest/)是从Python中访问Elasticsearch数据的一种好办法。

反思我们的工作流程

和直接从MySQL或者MongoDB中查询比起来,我们的工作流程看起来更难。但是要注意到,我们的软件栈是为耗时的有深度的数据处理和不时的数据发布而优化的。而且,这样我们就不会在实时查询中越来越复杂,最终导致在无法扩容的时候撞上天花板。

数据上云

下一步是把我们发布的数据转为可交互的应用。见图2-18,我们将使用轻量级网站开发框架来实现。

轻量级网络应用

Flask是Python的一个快速(http://bottlepy.org/docs/dev/)、简单、轻量级的WSGI微网站框架。

展示数据

设计和展示对于工作成果的价值很重要。事实上,对敏捷数据科学的一种理解就是迭代的数据设计。

数据设计的组成部分。考虑到这一点,我们最好能一开始就有一套稳固而整洁的设计,并基于这套设计开展工作。

启动Bootstrap

让我们尝试使用Bootstrap的样式把刚才的例子做成表格。

设计和数据处理没有什么不同。相反,它们是同一个协作行为——数据设计的组成部分。考虑到这一点,我们最好能一开始就有一套稳固而整洁的设计,并基于这套设计开展工作。

Bootstrap让我们可以大胆使用表格。

使用D3.js实现数据可视化

D3.js(https://d3js.org/)支持数据驱动的文档。