PySpark实战:资料清理与变形
来自CloudWiki
介绍
现实采集的数据,往往都有缺失值,
或者数据格式不符合特定机器学习算法的要求
因此需要进行资料的清理和变形。
实训步骤
处理缺失值
#import findspark #findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ML") \ .getOrCreate() sc = spark.sparkContext ############################################# df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True) \ .cache() #df_train.printSchema() #891 12 #print(df_train.count(),len(df_train.columns)) #用平均值29.699替换缺失值 df_train = df_train.fillna({'Age': round(29.699,0)}) #用登录最多的港口'S'替换缺失值 df_train = df_train.fillna({'Embarked': 'S'}) #df_train = df_train.fillna({'Cabin': 'unknown'}) #删除列 df_train = df_train.drop("Cabin") df_train = df_train.drop("Ticket")
输出:
D:\Tech\PySpark实战\PySpark源代码\ch06\test>spark-submit 03dataTrans.py +-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+ |PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Fare|Embarked|iEmbarked|iSex| +-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+ | 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| 7.25| S| 0.0| 0.0| | 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0|71.2833| C| 1.0| 1.0| | 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0| 7.925| S| 0.0| 1.0| | 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 53.1| S| 0.0| 1.0| | 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 8.05| S| 0.0| 0.0| | 6| 0| 3| Moran, Mr. James| male|30.0| 0| 0| 8.4583| Q| 2.0| 0.0| | 7| 0| 1|McCarthy, Mr. Tim...| male|54.0| 0| 0|51.8625| S| 0.0| 0.0| | 8| 0| 3|Palsson, Master. ...| male| 2.0| 3| 1| 21.075| S| 0.0| 0.0| | 9| 1| 3|Johnson, Mrs. Osc...|female|27.0| 0| 2|11.1333| S| 0.0| 1.0| | 10| 1| 2|Nasser, Mrs. Nich...|female|14.0| 1| 0|30.0708| C| 1.0| 1.0| | 11| 1| 3|Sandstrom, Miss. ...|female| 4.0| 1| 1| 16.7| S| 0.0| 1.0| | 12| 1| 1|Bonnell, Miss. El...|female|58.0| 0| 0| 26.55| S| 0.0| 1.0| | 13| 0| 3|Saundercock, Mr. ...| male|20.0| 0| 0| 8.05| S| 0.0| 0.0| | 14| 0| 3|Andersson, Mr. An...| male|39.0| 1| 5| 31.275| S| 0.0| 0.0| | 15| 0| 3|Vestrom, Miss. Hu...|female|14.0| 0| 0| 7.8542| S| 0.0| 1.0| | 16| 1| 2|Hewlett, Mrs. (Ma...|female|55.0| 0| 0| 16.0| S| 0.0| 1.0| | 17| 0| 3|Rice, Master. Eugene| male| 2.0| 4| 1| 29.125| Q| 2.0| 0.0| | 18| 1| 2|Williams, Mr. Cha...| male|30.0| 0| 0| 13.0| S| 0.0| 0.0| | 19| 0| 3|Vander Planke, Mr...|female|31.0| 1| 0| 18.0| S| 0.0| 1.0| | 20| 1| 3|Masselmani, Mrs. ...|female|30.0| 0| 0| 7.225| C| 1.0| 1.0| +-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+ only showing top 20 rows
(标签型特征)数值化编码
为了方便算法实现,经常需要把字符串类型的标签数据转换成数值类型的索引
PySpark中的StringIndexer转换器可以把一列标签型的特征进行数值化编码。
labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked") model = labelIndexer.fit(df_train) df_train = model.transform(df_train) labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex") model = labelIndexer.fit(df_train) df_train = model.transform(df_train) df_train.show()
输出:
+------+----+----+-----+-----+-------+---------+--------+ |Pclass|iSex| Age|SibSp|Parch| Fare|iEmbarked|Survived| +------+----+----+-----+-----+-------+---------+--------+ | 3| 0.0|22.0| 1| 0| 7.25| 0.0| 0| | 1| 1.0|38.0| 1| 0|71.2833| 1.0| 1| | 3| 1.0|26.0| 0| 0| 7.925| 0.0| 1| | 1| 1.0|35.0| 1| 0| 53.1| 0.0| 1| | 3| 0.0|35.0| 0| 0| 8.05| 0.0| 0| | 3| 0.0|30.0| 0| 0| 8.4583| 2.0| 0| | 1| 0.0|54.0| 0| 0|51.8625| 0.0| 0| | 3| 0.0| 2.0| 3| 1| 21.075| 0.0| 0| | 3| 1.0|27.0| 0| 2|11.1333| 0.0| 1| | 2| 1.0|14.0| 1| 0|30.0708| 1.0| 1| | 3| 1.0| 4.0| 1| 1| 16.7| 0.0| 1| | 1| 1.0|58.0| 0| 0| 26.55| 0.0| 1| | 3| 0.0|20.0| 0| 0| 8.05| 0.0| 0| | 3| 0.0|39.0| 1| 5| 31.275| 0.0| 0| | 3| 1.0|14.0| 0| 0| 7.8542| 0.0| 0| | 2| 1.0|55.0| 0| 0| 16.0| 0.0| 1| | 3| 0.0| 2.0| 4| 1| 29.125| 2.0| 0| | 2| 0.0|30.0| 0| 0| 13.0| 0.0| 1| | 3| 1.0|31.0| 1| 0| 18.0| 0.0| 0| | 3| 1.0|30.0| 0| 0| 7.225| 1.0| 1| +------+----+----+-----+-----+-------+---------+--------+ only showing top 20 rows
特征选择
# 特征选择 features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived'] train_features = df_train[features] train_features.show() # train_labels = df_train['Survived'] # train_labels.show()
将多个列转换成向量
VectorAssembler将给定的字段列表(类型必须为数值)转换为一个向量列
#将多个列转换成向量 df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features") df = df_assembler.transform(train_features) #df["features"].show()-> TypeError: 'Column' object is not callable df["features",].show() df["Survived",].show()
输出:
+--------------------+ | features| +--------------------+ |[3.0,0.0,22.0,1.0...| |[1.0,1.0,38.0,1.0...| |[3.0,1.0,26.0,0.0...| |[1.0,1.0,35.0,1.0...| |(7,[0,2,5],[3.0,3...| |[3.0,0.0,30.0,0.0...| |(7,[0,2,5],[1.0,5...| |[3.0,0.0,2.0,3.0,...| |[3.0,1.0,27.0,0.0...| |[2.0,1.0,14.0,1.0...| |[3.0,1.0,4.0,1.0,...| |[1.0,1.0,58.0,0.0...| |(7,[0,2,5],[3.0,2...| |[3.0,0.0,39.0,1.0...| |[3.0,1.0,14.0,0.0...| |[2.0,1.0,55.0,0.0...| |[3.0,0.0,2.0,4.0,...| |(7,[0,2,5],[2.0,3...| |[3.0,1.0,31.0,1.0...| |[3.0,1.0,30.0,0.0...| +--------------------+ only showing top 20 rows +--------+ |Survived| +--------+ | 0| | 1| | 1| | 1| | 0| | 0| | 0| | 0| | 1| | 1| | 1| | 1| | 0| | 0| | 0| | 1| | 0| | 1| | 0| | 1| +--------+ only showing top 20 rows</nowiki>
全部代码
#import findspark #findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ML") \ .getOrCreate() sc = spark.sparkContext ############################################# df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True) \ .cache() #df_train.printSchema() #891 12 #print(df_train.count(),len(df_train.columns)) #用平均值29.699替换缺失值 df_train = df_train.fillna({'Age': round(29.699,0)}) #用登录最多的港口'S'替换缺失值 df_train = df_train.fillna({'Embarked': 'S'}) #df_train = df_train.fillna({'Cabin': 'unknown'}) #删除列 df_train = df_train.drop("Cabin") df_train = df_train.drop("Ticket") labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked") model = labelIndexer.fit(df_train) df_train = model.transform(df_train) labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex") model = labelIndexer.fit(df_train) df_train = model.transform(df_train) df_train.show() # 特征选择 features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived'] train_features = df_train[features] train_features.show() # train_labels = df_train['Survived'] # train_labels.show() #将多个列转换成向量 df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features") df = df_assembler.transform(train_features) #df["features"].show()-> TypeError: 'Column' object is not callable df["features",].show() df["Survived",].show() ############################################# ''' df_test = spark.read.csv('./data/titanic-test.csv',header=True,inferSchema=True) \ .cache() #用平均值29.699替换缺失值 df_test = df_test.fillna({'Age': round(29.699,0)}) #用登录最多的港口'S'替换缺失值 df_test = df_test.fillna({'Embarked': 'S'}) df_test = df_test.drop("Cabin") df_test = df_test.drop("Ticket") labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked") model = labelIndexer.fit(df_test) df_test = model.transform(df_test) labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex") model = labelIndexer.fit(df_test) df_test = model.transform(df_test) df_test.show() features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'] test_features = df_test[features] test_features.show() #将多个列转换成向量 df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features") df2 = df_assembler.transform(test_features) df2["features",].show() ############################################# sc.stop() '''