PySpark实战:认识资料单元格式
来自CloudWiki
介绍
对于一个大型项目来说,数据源可能非常多,来自不同的异构数据库,文件格式也不一定相同
对于这些结构化的、半结构化和非结构化的数据进行统一处理,是ETL面临的一个难题。
Spark与ETL
在Spark中,对大数据进行ETL处理,首先就是要确定数据源,并明确从数据源获得哪些数据,并通过观察来确定每个数据资料单元格的格式,即确定单元格的数据类型、字段长度、取值范围和能否为空等。对于一些数据,字段缺失非常严重,还要考虑如何补全。
实训步骤
准备数据
电影评价数据集:https://grouplens.org/datasets/movielens/
这里下载精简版的即可:ml-latest-small.zip
- ratings.csv :电影评分记录,userId 用户ID ,movieId :电影ID ,rating 是用户给电影的打分。
抽取数据
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); sc = spark.sparkContext ############################################# sqlContext=SQLContext(sc) #相对路径,文件包含标题行 df = spark.read.csv('./ml-small/ratings.csv',header=True) #打印默认的字段类型信息 df.printSchema() #打印前20条数据 df.show() #打印总行数 print(df.count()) ############################################## sc.stop()
- df = spark.read.csv('./ml-small/ratings.csv',header=True)中header=True表示第一行是标题行
运行程序
python3 extract01.py:
字段类型:
root |-- userId: string (nullable = true) |-- movieId: string (nullable = true) |-- rating: string (nullable = true) |-- timestamp: string (nullable = true)
前20条数据:
+------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1| 4.0|964982703| | 1| 3| 4.0|964981247| | 1| 6| 4.0|964982224| | 1| 47| 5.0|964983815| | 1| 50| 5.0|964982931| | 1| 70| 3.0|964982400| | 1| 101| 5.0|964980868| | 1| 110| 4.0|964982176| | 1| 151| 5.0|964984041| | 1| 157| 5.0|964984100| | 1| 163| 5.0|964983650| | 1| 216| 5.0|964981208| | 1| 223| 3.0|964980985| | 1| 231| 5.0|964981179| | 1| 235| 4.0|964980908| | 1| 260| 5.0|964981680| | 1| 296| 3.0|964982967| | 1| 316| 3.0|964982310| | 1| 333| 5.0|964981179| | 1| 349| 4.0|964982563| +------+-------+------+---------+ only showing top 20 rows
总行数:
100836
观察:各个字段类型都为string ,因此需要进行人工格式转换,将字符串转换为数值类型
那么在Spark中,能否直接利用数据内容 来自动推断数据类型呢 ?
可以的
自动类型推断
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext from pyspark.sql.functions import to_timestamp spark = SparkSession.builder \ .master("local[*]") \ .appName("PySpark ETL") \ .getOrCreate(); sc = spark.sparkContext ############################################# #inferSchema=True自动推断数据类型 df = spark.read.csv('./ml-small/ratings.csv',header=True,inferSchema=True) df.printSchema() df.show(5) print(df.count()) ############################################# df2 = spark.read.csv('./ml-small/tags.csv',header=True,inferSchema=True) df2.printSchema() df2.show(5) print(df2.count()) ############################################# df3 = spark.read.csv('./ml-small/movies.csv',header=True,inferSchema=True) df3.printSchema() df3.show(5) print(df3.count()) ############################################# df4 = spark.read.csv('./ml-small/links.csv',header=True,inferSchema=True) df4.printSchema() df4.show(5) print(df4.count()) ############################################## sc.stop()
注:代码第14行df = spark.read.csv('./ml-small/ratings.csv',header=True,inferSchema=True)设置了inferSchema=True参数,启动了数据类型的自动推断功能。
输出:
root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- rating: double (nullable = true) |-- timestamp: integer (nullable = true) +------+-------+------+---------+ |userId|movieId|rating|timestamp| +------+-------+------+---------+ | 1| 1| 4.0|964982703| | 1| 3| 4.0|964981247| | 1| 6| 4.0|964982224| | 1| 47| 5.0|964983815| | 1| 50| 5.0|964982931| +------+-------+------+---------+ only showing top 5 rows 100836 root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- tag: string (nullable = true) |-- timestamp: integer (nullable = true) +------+-------+---------------+----------+ |userId|movieId| tag| timestamp| +------+-------+---------------+----------+ | 2| 60756| funny|1445714994| | 2| 60756|Highly quotable|1445714996| | 2| 60756| will ferrell|1445714992| | 2| 89774| Boxing story|1445715207| | 2| 89774| MMA|1445715200| +------+-------+---------------+----------+ only showing top 5 rows 3683 root |-- movieId: integer (nullable = true) |-- title: string (nullable = true) |-- genres: string (nullable = true) +-------+--------------------+--------------------+ |movieId| title| genres| +-------+--------------------+--------------------+ | 1| Toy Story (1995)|Adventure|Animati...| | 2| Jumanji (1995)|Adventure|Childre...| | 3|Grumpier Old Men ...| Comedy|Romance| | 4|Waiting to Exhale...|Comedy|Drama|Romance| | 5|Father of the Bri...| Comedy| +-------+--------------------+--------------------+ only showing top 5 rows 9742 root |-- movieId: integer (nullable = true) |-- imdbId: integer (nullable = true) |-- tmdbId: integer (nullable = true) +-------+------+------+ |movieId|imdbId|tmdbId| +-------+------+------+ | 1|114709| 862| | 2|113497| 8844| | 3|113228| 15602| | 4|114885| 31357| | 5|113041| 11862| +-------+------+------+ only showing top 5 rows 9742