PySpark实战:资料清理与变形

来自CloudWiki
跳转至: 导航搜索

介绍

现实采集的数据,往往都有缺失值,

或者数据格式不符合特定机器学习算法的要求

因此需要进行资料的清理和变形。

实训步骤

处理缺失值

#import findspark
#findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ML") \
        .getOrCreate()
sc = spark.sparkContext
#############################################
df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True) \
  .cache()
#df_train.printSchema()
#891 12
#print(df_train.count(),len(df_train.columns))
#用平均值29.699替换缺失值
df_train = df_train.fillna({'Age': round(29.699,0)})
#用登录最多的港口'S'替换缺失值
df_train = df_train.fillna({'Embarked': 'S'})
#df_train = df_train.fillna({'Cabin': 'unknown'})
#删除列
df_train = df_train.drop("Cabin")
df_train = df_train.drop("Ticket")

输出:

D:\Tech\PySpark实战\PySpark源代码\ch06\test>spark-submit 03dataTrans.py
+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Embarked|iEmbarked|iSex|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|       S|      0.0| 0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|       C|      1.0| 1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|  7.925|       S|      0.0| 1.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|   53.1|       S|      0.0| 1.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|   8.05|       S|      0.0| 0.0|
|          6|       0|     3|    Moran, Mr. James|  male|30.0|    0|    0| 8.4583|       Q|      2.0| 0.0|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|51.8625|       S|      0.0| 0.0|
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 21.075|       S|      0.0| 0.0|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|11.1333|       S|      0.0| 1.0|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|30.0708|       C|      1.0| 1.0|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|   16.7|       S|      0.0| 1.0|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|  26.55|       S|      0.0| 1.0|
|         13|       0|     3|Saundercock, Mr. ...|  male|20.0|    0|    0|   8.05|       S|      0.0| 0.0|
|         14|       0|     3|Andersson, Mr. An...|  male|39.0|    1|    5| 31.275|       S|      0.0| 0.0|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0| 7.8542|       S|      0.0| 1.0|
|         16|       1|     2|Hewlett, Mrs. (Ma...|female|55.0|    0|    0|   16.0|       S|      0.0| 1.0|
|         17|       0|     3|Rice, Master. Eugene|  male| 2.0|    4|    1| 29.125|       Q|      2.0| 0.0|
|         18|       1|     2|Williams, Mr. Cha...|  male|30.0|    0|    0|   13.0|       S|      0.0| 0.0|
|         19|       0|     3|Vander Planke, Mr...|female|31.0|    1|    0|   18.0|       S|      0.0| 1.0|
|         20|       1|     3|Masselmani, Mrs. ...|female|30.0|    0|    0|  7.225|       C|      1.0| 1.0|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
only showing top 20 rows



(标签型特征)数值化编码

为了方便算法实现,经常需要把字符串类型的标签数据转换成数值类型的索引

PySpark中的StringIndexer转换器可以把一列标签型的特征进行数值化编码。

labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)

labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)

df_train.show()

输出:

+------+----+----+-----+-----+-------+---------+--------+
|Pclass|iSex| Age|SibSp|Parch|   Fare|iEmbarked|Survived|
+------+----+----+-----+-----+-------+---------+--------+
|     3| 0.0|22.0|    1|    0|   7.25|      0.0|       0|
|     1| 1.0|38.0|    1|    0|71.2833|      1.0|       1|
|     3| 1.0|26.0|    0|    0|  7.925|      0.0|       1|
|     1| 1.0|35.0|    1|    0|   53.1|      0.0|       1|
|     3| 0.0|35.0|    0|    0|   8.05|      0.0|       0|
|     3| 0.0|30.0|    0|    0| 8.4583|      2.0|       0|
|     1| 0.0|54.0|    0|    0|51.8625|      0.0|       0|
|     3| 0.0| 2.0|    3|    1| 21.075|      0.0|       0|
|     3| 1.0|27.0|    0|    2|11.1333|      0.0|       1|
|     2| 1.0|14.0|    1|    0|30.0708|      1.0|       1|
|     3| 1.0| 4.0|    1|    1|   16.7|      0.0|       1|
|     1| 1.0|58.0|    0|    0|  26.55|      0.0|       1|
|     3| 0.0|20.0|    0|    0|   8.05|      0.0|       0|
|     3| 0.0|39.0|    1|    5| 31.275|      0.0|       0|
|     3| 1.0|14.0|    0|    0| 7.8542|      0.0|       0|
|     2| 1.0|55.0|    0|    0|   16.0|      0.0|       1|
|     3| 0.0| 2.0|    4|    1| 29.125|      2.0|       0|
|     2| 0.0|30.0|    0|    0|   13.0|      0.0|       1|
|     3| 1.0|31.0|    1|    0|   18.0|      0.0|       0|
|     3| 1.0|30.0|    0|    0|  7.225|      1.0|       1|
+------+----+----+-----+-----+-------+---------+--------+
only showing top 20 rows


特征选择

# 特征选择
features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
train_features = df_train[features]
train_features.show()
# train_labels = df_train['Survived']
# train_labels.show()


将多个列转换成向量

VectorAssembler将给定的字段列表(类型必须为数值)转换为一个向量列

#将多个列转换成向量
df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")
df = df_assembler.transform(train_features)

#df["features"].show()-> TypeError: 'Column' object is not callable
df["features",].show()
df["Survived",].show()

输出:

+--------------------+
|            features|
+--------------------+
|[3.0,0.0,22.0,1.0...|
|[1.0,1.0,38.0,1.0...|
|[3.0,1.0,26.0,0.0...|
|[1.0,1.0,35.0,1.0...|
|(7,[0,2,5],[3.0,3...|
|[3.0,0.0,30.0,0.0...|
|(7,[0,2,5],[1.0,5...|
|[3.0,0.0,2.0,3.0,...|
|[3.0,1.0,27.0,0.0...|
|[2.0,1.0,14.0,1.0...|
|[3.0,1.0,4.0,1.0,...|
|[1.0,1.0,58.0,0.0...|
|(7,[0,2,5],[3.0,2...|
|[3.0,0.0,39.0,1.0...|
|[3.0,1.0,14.0,0.0...|
|[2.0,1.0,55.0,0.0...|
|[3.0,0.0,2.0,4.0,...|
|(7,[0,2,5],[2.0,3...|
|[3.0,1.0,31.0,1.0...|
|[3.0,1.0,30.0,0.0...|
+--------------------+
only showing top 20 rows

+--------+
|Survived|
+--------+
|       0|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       0|
|       1|
|       1|
|       1|
|       1|
|       0|
|       0|
|       0|
|       1|
|       0|
|       1|
|       0|
|       1|
+--------+
only showing top 20 rows</nowiki>

全部代码


#import findspark
#findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ML") \
        .getOrCreate()
sc = spark.sparkContext
#############################################
df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True) \
  .cache()
#df_train.printSchema()
#891 12
#print(df_train.count(),len(df_train.columns))
#用平均值29.699替换缺失值
df_train = df_train.fillna({'Age': round(29.699,0)})
#用登录最多的港口'S'替换缺失值
df_train = df_train.fillna({'Embarked': 'S'})
#df_train = df_train.fillna({'Cabin': 'unknown'})
#删除列
df_train = df_train.drop("Cabin")
df_train = df_train.drop("Ticket")

labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)

labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)

df_train.show()

# 特征选择
features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
train_features = df_train[features]
train_features.show()
# train_labels = df_train['Survived']
# train_labels.show()

#将多个列转换成向量
df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")
df = df_assembler.transform(train_features)

#df["features"].show()-> TypeError: 'Column' object is not callable
df["features",].show()
df["Survived",].show()
#############################################
'''
df_test = spark.read.csv('./data/titanic-test.csv',header=True,inferSchema=True) \
  .cache()
#用平均值29.699替换缺失值
df_test = df_test.fillna({'Age': round(29.699,0)})
#用登录最多的港口'S'替换缺失值
df_test = df_test.fillna({'Embarked': 'S'})
df_test = df_test.drop("Cabin")
df_test = df_test.drop("Ticket")

labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
model = labelIndexer.fit(df_test)
df_test = model.transform(df_test)

labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
model = labelIndexer.fit(df_test)
df_test = model.transform(df_test)
df_test.show()

features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked']
test_features = df_test[features]
test_features.show()

#将多个列转换成向量
df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")
df2 = df_assembler.transform(test_features)

df2["features",].show()
#############################################
sc.stop()
'''