PySpark实战:认识Pipeline
来自CloudWiki
介绍
在机器学习过程中,也需要运行一系列算法来处理数据并从中学习。
例如,一个简单的文本数据处理工作流程可能包含几个阶段:
- 将每个文本拆分成单词
- 将单词转换为数值特征向量
- 使用特征向量和标签构建预测模型
- 用预测模型对新数据进行预测
PySpark的MLlib库将这样的工作流程表示为Pipeline,
其中包含要按特定顺序运行的一系列Pipeline Stages(Transformers 和Estimators)
一个Pipeline被指定为一个阶段序列,这些阶段按顺序运行
在transformer阶段,调用transform()方法
在estimator阶段,调用fit()方法生成预测结果。
在上图中,分词器和特征频数HashingTF是transformer ,而机器学习阶段是estimator
代码实例
#import findspark #findspark.init() ############################################## from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ML") \ .getOrCreate() sc = spark.sparkContext ############################################# #模拟训练数据,有PySpark的文本为1,其他为0 training = spark.createDataFrame([ (0, "Hello PySpark", 1.0), (1, "Using Flink", 0.0), (2, "PySpark 3.0", 1.0), (3, "Test MySQL", 0.0) ], ["id", "text", "label"]) # pipeline 三个阶段: tokenizer -> hashingTF -> logR. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") logR = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, logR]) #训练数据上进行pipeline fit操作,产生一个model model = pipeline.fit(training) ############################################# #测试集 test = spark.createDataFrame([ (4, "PySpark Pipeline"), (5, "pipeline"), (6, "PySpark python"), (7, "julia c#") ], ["id", "text"]) #model执行transform prediction = model.transform(test) #预测 selected = prediction.select("id", "text", "probability", "prediction") for row in selected.collect(): tid, text, prob, prediction = row print("(%d, %s) --> prediction=%f,prob=%s" \ % (tid, text, prediction,str(prob))) ############################################# sc.stop()
输出:
(4, PySpark Pipeline) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833] (5, pipeline) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104] (6, PySpark python) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833] (7, julia c#) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]