PySpark实战:PySpark Shell 建立RDD
来自CloudWiki
实训步骤
打开Spark
cd /root/wmtools/spark-3.1.3-bin-hadoop2.7/bin
./pyspark
建立RDD实例
PySpark Shell 默认会自动创建sc对象和spark对象
sc对象是SparkContext的实例,spark对象是SparkSession的实例
Using Python version 3.7.5 (default, May 25 2021 14:04:16) SparkSession available as 'spark'. >>> rdd = sc.parallelize(["hello world","hello spark"]); >>> rdd2 = rdd.flatMap(lambda line:line.split(" ")); >>> rdd3 = rdd2.map(lambda word:(word,1)); >>> rdd5 = rdd3.reduceByKey(lambda a, b : a + b); >>> rdd5.collect(); [Stage 0:> (0 [('world', 1), ('hello', 2), ('spark', 1)] >>> quit(); [root@localhost bin]#
- sc.parallelize: 创建RDD,接受一个集合,返回一个包含多行内容的RDD
- rdd.flatMap: 接受一个函数,用于遍历处理RDD中每行文本内容
- lambda line:line.split(" "): 冒号左边表示输入参数,右边表示函数里面执行的处理逻辑
- rdd2.map(lambda word:(word,1)):map操作会遍历集合中的每个单词,对遍历到其中的每一个单词,执行lamda函数,把单词word转换成元组(word,1)
- rdd3.reduceByKey(lambda a,b: a+b):执行reduceByKey操作,这个操作会把所有RDD元素按照key进行分组,然后对于分组后的元素,执行规约操作。