PySpark实战:DataFrame建立

来自CloudWiki
跳转至: 导航搜索

介绍

在Spark中,除了RDD这种数据容器外,还有一种更容易操作的分布式数据容器-DataFrame

它更像传统关系数据库的二维表,这样就可以利用类似SQL的语言来进行数据访问

DataFrame可以从多个数据来源上进行构建,如结构化数据文件、Hive中的表、外部数据库或现有RDD.

创建第一个DataFrame


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
#############################################
a = [('Jack', 32),('Smith', 33)]
df = spark.createDataFrame(a) #基于变量a创建一个dataframe对象
#[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)]
print(df.collect())#dataframe是row构成的一个数组
df.show()
# +-----+---+
# |   _1| _2|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# +-----+---+
df2 = spark.createDataFrame(a, ['name', 'age'])
#[Row(name='Jack', age=32), Row(name='Smith', age=33)]
print(df2.collect())
df2.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# +-----+---+
##############################################


输出

[Row(_1='Jack', _2=32), Row(_1='Smith', _2=33)]
+-----+---+
|   _1| _2|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+

[Row(name='Jack', age=32), Row(name='Smith', age=33)]
+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+

使用Row创建DataFrame对象

代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
a = [('Jack', 32),('Smith', 33)]
rdd = sc.parallelize(a)#创建RDD
from pyspark.sql import Row
Person = Row('name', 'age')#创建一个包含列名的row 
person = rdd.map(lambda r: Person(*r)) #map是一个变换算子,将rdd进行映射,转换成一个Person对象,并返回一个新的rdd
print(person.collect())
df = spark.createDataFrame(person)#基于rdd对象person创建出一个DataFrame对象
#[Row(name='Jack', age=32), Row(name='Smith', age=33)]
print(df.collect())
df.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# +-----+---+
##############################################

输出

[Row(name='Jack', age=32), Row(name='Smith', age=33)]
[Row(name='Jack', age=32), Row(name='Smith', age=33)]
+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+

创建具有字段类型的DataFrame对象

前面创建的df 每个字段没有指定类型,

可以通过下面的代码创建具有字段类型的DataFrame对象

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
a = [('Jack', 32),('Smith', 33)]
rdd = sc.parallelize(a)
from pyspark.sql.types import *
#创建了一个表schema,在表里指定了每个字段的类型
#如 name 后面的 StringType()表示为字符串类型,True代表是否可以为空
schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True)])
df = spark.createDataFrame(rdd, schema)
#[Row(name='Jack', age=32), Row(name='Smith', age=33)]
print(df.collect())
df.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# +-----+---+
df.printSchema()#打印df对象的结构
# root
#  |-- name: string (nullable = true)
#  |-- age: integer (nullable = true)
##############################################

输出

[Row(name='Jack', age=32), Row(name='Smith', age=33)]
+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

创建具有字段类型DataFrame的简单方法

借助Structype方法可以创建类型化的DataFrame对象,但是操作起来有点繁琐

介绍一个简单一点的方法,同样创建具有字段类型DataFrame

代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
a = [('Jack', 32),('Smith', 33)]
rdd = sc.parallelize(a)
df = spark.createDataFrame(rdd, "name: string, age: int")
#使用字符串对字段类型进行定义
#[Row(name='Jack', age=32), Row(name='Smith', age=33)]
print(df.collect())
df.show()
# +-----+---+
# | name|age|
# +-----+---+
# | Jack| 32|
# |Smith| 33|
# +-----+---+
df.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- age: integer (nullable = true)

rdd2 = rdd.map(lambda row: row[0])#提取了对象的第一列元素 构成了一个新的RDD
df2 = spark.createDataFrame(rdd2, "string")#基于rdd2创建df,类型为string
#[Row(value='Jack'), Row(value='Smith')]
print(df2.collect())
df2.show()
# +-----+
# |value|
# +-----+
# | Jack|
# |Smith|
# +-----+
df2.printSchema()
# root
#  |-- value: string (nullable = true)
##############################################

输出

[Row(name='Jack', age=32), Row(name='Smith', age=33)]
+-----+---+
| name|age|
+-----+---+
| Jack| 32|
|Smith| 33|
+-----+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

[Row(value='Jack'), Row(value='Smith')]
+-----+
|value|
+-----+
| Jack|
|Smith|
+-----+

root
 |-- value: string (nullable = true)