PySpark实战:PySpark Shell 建立RDD

来自CloudWiki
跳转至: 导航搜索

实训步骤

打开Spark

cd /root/wmtools/spark-3.1.3-bin-hadoop2.7/bin

./pyspark

建立RDD实例

PySpark Shell 默认会自动创建sc对象和spark对象

sc对象是SparkContext的实例,spark对象是SparkSession的实例

Using Python version 3.7.5 (default, May 25 2021 14:04:16)
SparkSession available as 'spark'.
>>> rdd = sc.parallelize(["hello world","hello spark"]);
>>> rdd2 = rdd.flatMap(lambda line:line.split(" "));
>>> rdd3 = rdd2.map(lambda word:(word,1));
>>> rdd5 = rdd3.reduceByKey(lambda a, b : a + b);
>>> rdd5.collect();
[Stage 0:>                                                          (0                                                                        [('world', 1), ('hello', 2), ('spark', 1)]
>>> quit();
[root@localhost bin]#

  • sc.parallelize: 创建RDD,接受一个集合,返回一个包含多行内容的RDD
  • rdd.flatMap: 接受一个函数,用于遍历处理RDD中每行文本内容
  • lambda line:line.split(" "): 冒号左边表示输入参数,右边表示函数里面执行的处理逻辑
  • rdd2.map(lambda word:(word,1)):map操作会遍历集合中的每个单词,对遍历到其中的每一个单词,执行lamda函数,把单词word转换成元组(word,1)
  • rdd3.reduceByKey(lambda a,b: a+b):执行reduceByKey操作,这个操作会把所有RDD元素按照key进行分组,然后对于分组后的元素,执行规约操作。