PySpark实战:intersection操作
来自CloudWiki
介绍
intersection操作是一个变换算子,
调用形式为rdd.intersection(otherRDD),
它返回一个此RDD和另一个otherRDD的交集。
代码
import findspark findspark.init() ############################################## from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("local[1]") \ .appName("RDD Demo") \ .getOrCreate(); sc = spark.sparkContext ############################################# rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) ret = rdd1.intersection(rdd2).collect() #[2, 1, 3] print(ret) rdd1 = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("b", 3)]) rdd2 = sc.parallelize([("a", 2), ("b", 1), ("e", 5)]) ret = rdd1.intersection(rdd2).collect() #[('b', 1), ('a', 2)] print(ret) ############################################## sc.stop()
输出
[2, 1, 3]
[('b', 1), ('a', 2)]