PySpark实战:认识资料单元格式

来自CloudWiki
跳转至: 导航搜索

介绍

对于一个大型项目来说,数据源可能非常多,来自不同的异构数据库,文件格式也不一定相同

对于这些结构化的、半结构化和非结构化的数据进行统一处理,是ETL面临的一个难题。

Spark与ETL

在Spark中,对大数据进行ETL处理,首先就是要确定数据源,并明确从数据源获得哪些数据,并通过观察来确定每个数据资料单元格的格式,即确定单元格的数据类型、字段长度、取值范围和能否为空等。对于一些数据,字段缺失非常严重,还要考虑如何补全。

实训步骤

准备数据

电影评价数据集:https://grouplens.org/datasets/movielens/

这里下载精简版的即可:ml-latest-small.zip

  • ratings.csv :电影评分记录,userId 用户ID ,movieId :电影ID ,rating 是用户给电影的打分。

Python21072815.png

抽取数据

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import to_timestamp
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ETL") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
sqlContext=SQLContext(sc)
#相对路径,文件包含标题行
df = spark.read.csv('./ml-small/ratings.csv',header=True)
#打印默认的字段类型信息
df.printSchema()
#打印前20条数据
df.show()
#打印总行数
print(df.count())
##############################################
sc.stop()
  • df = spark.read.csv('./ml-small/ratings.csv',header=True)中header=True表示第一行是标题行

运行程序

python3 extract01.py:

字段类型:

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

前20条数据:

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows

总行数:

 100836

观察:各个字段类型都为string ,因此需要进行人工格式转换,将字符串转换为数值类型

那么在Spark中,能否直接利用数据内容 来自动推断数据类型呢 ?

可以的

自动类型推断


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import to_timestamp
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("PySpark ETL") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
#inferSchema=True自动推断数据类型
df = spark.read.csv('./ml-small/ratings.csv',header=True,inferSchema=True)
df.printSchema()
df.show(5)
print(df.count())
#############################################
df2 = spark.read.csv('./ml-small/tags.csv',header=True,inferSchema=True)
df2.printSchema()
df2.show(5)
print(df2.count())
#############################################
df3 = spark.read.csv('./ml-small/movies.csv',header=True,inferSchema=True)
df3.printSchema()
df3.show(5)
print(df3.count())
#############################################
df4 = spark.read.csv('./ml-small/links.csv',header=True,inferSchema=True)
df4.printSchema()
df4.show(5)
print(df4.count())
##############################################
sc.stop()

注:代码第14行df = spark.read.csv('./ml-small/ratings.csv',header=True,inferSchema=True)设置了inferSchema=True参数,启动了数据类型的自动推断功能。


输出:

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

100836
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows

3683
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

9742
root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows

9742