PySpark实战:认识Pipeline

来自CloudWiki
跳转至: 导航搜索

介绍

在机器学习过程中,也需要运行一系列算法来处理数据并从中学习。

例如,一个简单的文本数据处理工作流程可能包含几个阶段:

  • 将每个文本拆分成单词
  • 将单词转换为数值特征向量
  • 使用特征向量和标签构建预测模型
  • 用预测模型对新数据进行预测

PySpark的MLlib库将这样的工作流程表示为Pipeline,

其中包含要按特定顺序运行的一系列Pipeline Stages(Transformers 和Estimators)

Python21073102.png

一个Pipeline被指定为一个阶段序列,这些阶段按顺序运行

在transformer阶段,调用transform()方法

在estimator阶段,调用fit()方法生成预测结果。

在上图中,分词器和特征频数HashingTF是transformer ,而机器学习阶段是estimator

代码实例


#import findspark
#findspark.init()
##############################################
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ML") \
        .getOrCreate()
sc = spark.sparkContext
#############################################
#模拟训练数据,有PySpark的文本为1,其他为0
training = spark.createDataFrame([
    (0, "Hello PySpark", 1.0),
    (1, "Using Flink", 0.0),
    (2, "PySpark 3.0", 1.0),
    (3, "Test MySQL", 0.0)
], ["id", "text", "label"])
# pipeline 三个阶段: tokenizer -> hashingTF -> logR.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
logR = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, logR])
#训练数据上进行pipeline fit操作,产生一个model
model = pipeline.fit(training)
#############################################
#测试集
test = spark.createDataFrame([
    (4, "PySpark Pipeline"),
    (5, "pipeline"),
    (6, "PySpark python"),
    (7, "julia c#")
], ["id", "text"])

#model执行transform
prediction = model.transform(test)
#预测
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    tid, text, prob, prediction = row
    print("(%d, %s) --> prediction=%f,prob=%s" \
    % (tid, text,  prediction,str(prob)))
#############################################
sc.stop()

输出:

(4, PySpark Pipeline) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833]
(5, pipeline) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]
(6, PySpark python) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833]
(7, julia c#) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]