PySpark实战:DataFrame建立
来自CloudWiki
目录
介绍
在Spark中,除了RDD这种数据容器外,还有一种更容易操作的分布式数据容器-DataFrame
它更像传统关系数据库的二维表,这样就可以利用类似SQL的语言来进行数据访问
DataFrame可以从多个数据来源上进行构建,如结构化数据文件、Hive中的表、外部数据库或现有RDD.
创建第一个DataFrame
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); ############################################# a = [('Jack', 32),('Smith', 33)] df = spark.createDataFrame(a) #基于变量a创建一个dataframe对象 #[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)] print(df.collect())#dataframe是row构成的一个数组 df.show() # +-----+---+ # | _1| _2| # +-----+---+ # | Jack| 32| # |Smith| 33| # +-----+---+ df2 = spark.createDataFrame(a, ['name', 'age']) #[Row(name='Jack', age=32), Row(name='Smith', age=33)] print(df2.collect()) df2.show() # +-----+---+ # | name|age| # +-----+---+ # | Jack| 32| # |Smith| 33| # +-----+---+ ##############################################
输出
[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)] +-----+---+ | _1| _2| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+ [Row(name='Jack', age=32), Row(name='Smith', age=33)] +-----+---+ | name|age| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+
使用Row创建DataFrame对象
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# a = [('Jack', 32),('Smith', 33)] rdd = sc.parallelize(a)#创建RDD from pyspark.sql import Row Person = Row('name', 'age')#创建一个包含列名的row person = rdd.map(lambda r: Person(*r)) #map是一个变换算子,将rdd进行映射,转换成一个Person对象,并返回一个新的rdd print(person.collect()) df = spark.createDataFrame(person)#基于rdd对象person创建出一个DataFrame对象 #[Row(name='Jack', age=32), Row(name='Smith', age=33)] print(df.collect()) df.show() # +-----+---+ # | name|age| # +-----+---+ # | Jack| 32| # |Smith| 33| # +-----+---+ ##############################################
输出
[Row(name='Jack', age=32), Row(name='Smith', age=33)] [Row(name='Jack', age=32), Row(name='Smith', age=33)] +-----+---+ | name|age| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+
创建具有字段类型的DataFrame对象
前面创建的df 每个字段没有指定类型,
可以通过下面的代码创建具有字段类型的DataFrame对象
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# a = [('Jack', 32),('Smith', 33)] rdd = sc.parallelize(a) from pyspark.sql.types import * #创建了一个表schema,在表里指定了每个字段的类型 #如 name 后面的 StringType()表示为字符串类型,True代表是否可以为空 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) df = spark.createDataFrame(rdd, schema) #[Row(name='Jack', age=32), Row(name='Smith', age=33)] print(df.collect()) df.show() # +-----+---+ # | name|age| # +-----+---+ # | Jack| 32| # |Smith| 33| # +-----+---+ df.printSchema()#打印df对象的结构 # root # |-- name: string (nullable = true) # |-- age: integer (nullable = true) ##############################################
输出
[Row(name='Jack', age=32), Row(name='Smith', age=33)] +-----+---+ | name|age| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true)
创建具有字段类型DataFrame的简单方法
借助Structype方法可以创建类型化的DataFrame对象,但是操作起来有点繁琐
介绍一个简单一点的方法,同样创建具有字段类型DataFrame
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# a = [('Jack', 32),('Smith', 33)] rdd = sc.parallelize(a) df = spark.createDataFrame(rdd, "name: string, age: int") #使用字符串对字段类型进行定义 #[Row(name='Jack', age=32), Row(name='Smith', age=33)] print(df.collect()) df.show() # +-----+---+ # | name|age| # +-----+---+ # | Jack| 32| # |Smith| 33| # +-----+---+ df.printSchema() # root # |-- name: string (nullable = true) # |-- age: integer (nullable = true) rdd2 = rdd.map(lambda row: row[0])#提取了对象的第一列元素 构成了一个新的RDD df2 = spark.createDataFrame(rdd2, "string")#基于rdd2创建df,类型为string #[Row(value='Jack'), Row(value='Smith')] print(df2.collect()) df2.show() # +-----+ # |value| # +-----+ # | Jack| # |Smith| # +-----+ df2.printSchema() # root # |-- value: string (nullable = true) ##############################################
输出
[Row(name='Jack', age=32), Row(name='Smith', age=33)] +-----+---+ | name|age| +-----+---+ | Jack| 32| |Smith| 33| +-----+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) [Row(value='Jack'), Row(value='Smith')] +-----+ |value| +-----+ | Jack| |Smith| +-----+ root |-- value: string (nullable = true)