PySpark实战:lookup操作
来自CloudWiki
介绍
lookup操作是一个变换算子,
rdd.lookup(key)
可以根据key值从RDD中查找到相关的元素
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# a = range(100) rdd = sc.parallelize(zip(a, a), 2) #[32] print(rdd.lookup(32)) sorted = rdd.sortByKey() #[32] print(sorted.lookup(32)) # [] print(sorted.lookup(200)) rdd = sc.parallelize([('a', 'b'), ('c', 'd')]) #['b'] print(rdd.lookup('a')) rdd = sc.parallelize([['a', 'b'], ['c', 'd']]) #['b'] print(rdd.lookup('a')) rdd = sc.parallelize([(('a', 'b'), 'c')]) #['c'] print(rdd.lookup(('a', 'b'))) #错误 #rdd = sc.parallelize([1,2,3]).lookup(1) ############################################## sc.stop()
- sorted = rdd.sortByKey() 在大数据的情况下,排序后的lookup操作查找速度更快。
- Python3+环境下,在Spark集群上使用distinct(),reduceByKey()和join()等几个函数时,可能会触发PYTHONHASHSEED 异常,即Randomness of hash of string should be disabled via PYTHONHASHSEED ,此时可以在spark-defaults.conf并加入:spark.executorEnv.PYTHONHASHSEED=0 (未验证)
输出
[32] [Stage 3:> [Stage 3:=============================> [32] [] ['b'] ['b'] ['c']