PySpark实战:lookup操作

来自CloudWiki
跳转至: 导航搜索

介绍

lookup操作是一个变换算子,

rdd.lookup(key)

可以根据key值从RDD中查找到相关的元素

代码


import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[1]") \
        .appName("RDD Demo") \
        .getOrCreate();
sc = spark.sparkContext
#############################################
a = range(100)
rdd = sc.parallelize(zip(a, a), 2)
#[32]
print(rdd.lookup(32))
sorted = rdd.sortByKey()
#[32]
print(sorted.lookup(32))
# []
print(sorted.lookup(200))
rdd = sc.parallelize([('a', 'b'), ('c', 'd')])
#['b']
print(rdd.lookup('a'))
rdd = sc.parallelize([['a', 'b'], ['c', 'd']])
#['b']
print(rdd.lookup('a'))
rdd = sc.parallelize([(('a', 'b'), 'c')])
#['c']
print(rdd.lookup(('a', 'b')))
#错误
#rdd = sc.parallelize([1,2,3]).lookup(1)
##############################################
sc.stop()
  • sorted = rdd.sortByKey() 在大数据的情况下,排序后的lookup操作查找速度更快。
  • Python3+环境下,在Spark集群上使用distinct(),reduceByKey()和join()等几个函数时,可能会触发PYTHONHASHSEED 异常,即Randomness of hash of string should be disabled via PYTHONHASHSEED ,此时可以在spark-defaults.conf并加入:spark.executorEnv.PYTHONHASHSEED=0 (未验证)

输出

[32]
[Stage 3:>                                                      [Stage 3:=============================>                                                                                         [32]
[]
['b']
['b']
['c']