PySpark实战:sortByKey操作

来自CloudWiki
跳转至: 导航搜索

介绍

sortByKey操作是一个变换算子,可以实现按Key进行排序的功能。

调用形式为:rdd.sortByKey(ascending=True,numPartitions=None,keyfunc=<function RDD.<lambda>>)

参数keyfunc(排序函数)是可选的。

代码

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
x = [('a', 6), ('f', 2), ('c', 7), ('d', 4), ('e', 5)]
rdd = sc.parallelize(x).sortByKey(True, 1)
#[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
print(rdd.collect())
rdd = sc.parallelize(x).sortByKey(False, 1)
#[('f', 2), ('e', 5), ('d', 4), ('c', 7), ('a', 6)]
print(rdd.collect())
x = [(['a','g'], 6), (['f','h'], 2), (['c','m'], 7), (['d','n'], 4), (['e','w'], 5)]
def f(x):
        #['a','g'],['f', 'h'],['c', 'm'],['d', 'n'],['e', 'w']
        print(x)
        return x[1]

rdd = sc.parallelize(x).sortByKey(False, 1, f)
#[(['e', 'w'], 5), (['d', 'n'], 4), (['c', 'm'], 7), (['f', 'h'], 2), (['a', 'g'], 6)]
print(rdd.collect())
##############################################
sc.stop()
~

  • sortByKey操作的前提RDD的元素格式为KV格式
  • sc.parallelize(x).sortByKey(False, 1, f): 根据f函数的内容进行排序,此时是按照key第二个元素进行排序。

输出

[('a', 6), ('c', 7), ('d', 4), ('e', 5), ('f', 2)]
[('f', 2), ('e', 5), ('d', 4), ('c', 7), ('a', 6)]
['a', 'g']
['f', 'h']
['c', 'm']
['d', 'n']
['e', 'w']
[(['e', 'w'], 5), (['d', 'n'], 4), (['c', 'm'], 7), (['f', 'h'], 2), (['a', 'g'], 6)]