PySpark实战:Spark SQL基本用法

来自CloudWiki
跳转至: 导航搜索

介绍

Spark SQL可以用类似SQL的语句对dataframe对象中的数据进行各种操作。

比如查询、过滤、分组统计和变换等。

示例

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataFrame(rdd, "name: string, age: int")
#3
print("记录条数:")
print(df.count())
#创建数据表user
df.createOrReplaceTempView("user")
#打印查询结果
df2 = spark.sql("select count(*) as counter from user")
df2.show()
# +-------+
# |counter|
# +-------+
# |      3|
# +-------+
df2 = spark.sql("select *,age+1 as next from user where age < 36")
df2.show()
# +-----+---+----+
# | name|age|next|
# +-----+---+----+
# | Jack| 32|  33|
# |Smith| 33|  34|
# +-----+---+----+
##############################################

结果

记录条数:
3
+-------+
|counter|
+-------+
|      3|
+-------+

+-----+---+----+
| name|age|next|
+-----+---+----+
| Jack| 32|  33|
|Smith| 33|  34|
+-----+---+----+

使用自定义函数扩展SQL功能

有些函数非常复杂,

内置的SQL函数不方便解决

能不能支持自定义函数 来扩展SQL功能呢?

代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .config("spark.sql.shuffle.partitions",4) \
        .getOrCreate();
sc = spark.sparkContext
#############################################
#获取spark上下文设置
conf = sc.getConf().get("spark.sql.shuffle.partitions")
print(conf)

#定义自定义函数
strlen = spark.udf.register("strLen", lambda x: len(x))
#利用spark.udf.register方法注册了一个自定义的用户函数strLen,它的逻辑是lambda x:len(x),即返回参数的长度

#创建数据表user
a = [('Jack', 32),('Smith', 33),('李四', 36)]
rdd = sc.parallelize(a)
df = spark.createDataFrame(rdd, "name: string, age: int")
df.createOrReplaceTempView("user")

#利用自定义函数进行查询
df2 = spark.sql("select *,strLen(name) as len from user")
df2.show()
# +-----+---+---+
# | name|age|len|
# +-----+---+---+
# | Jack| 32|  4|
# |Smith| 33|  5|
# | 李四| 36|  2|
# +-----+---+---+

输出

4
+-----+---+---+
| name|age|len|
+-----+---+---+
| Jack| 32|  4|
|Smith| 33|  5|
| 李四| 36|  2|
+-----+---+---+