PySpark实战:Kafka和Flask代码实现

来自CloudWiki
跳转至: 导航搜索

数据生成

kafka_producer.py 代表一个数据模拟程序,可以利用循环语句产生模拟数据,

并写到Kafka特定的Topic中去。

代码

from kafka import KafkaProducer
import time
import random
import json
from json import dumps
from datetime import datetime
#连接kafka
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', \
    value_serializer=lambda x: dumps(x).encode('utf-8'))
#127.0.0.1:9092代表Kafka服务器地址
#value_serializer表示对业务数据value进行序列化操作
#这里利用json模块中的dumps将JSON格式的数据序列化,且编码指定为utf-8

#用循环构建一系列数据
key='streaming'.encode('utf-8')
typeList =['鞋子','裤子','袜子','皮带','化妆品','背包','书籍',
    '零食','运动服','乐器']
#发送内容,必须是bytes类型
for i in range(0, 1000):
    vjson = {}
    for t in typeList:
        i = i + 1
        vjson["bizId"] = str(i)
        vjson["type"] = t
        vjson["value"] = 1
        #vjson["value"] = random.randint(-50,50)
        #替换值,让结果更富变化
        #vjson["datetime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print("Message to be sent: ", vjson)
        producer.send('kf2pyspark',value= vjson , key=key)
        #将产生的数据通过send方法发布到主题为kf2pyspark的Kafka消息队列上去
        time.sleep(1)
producer.flush() 
producer.close()

  • 一般地,Kafka的Topic消息格式都包含key和value两个字段

输出

Message to be sent:  {'bizId': '1', 'type': '鞋子', 'value': 1}
Message to be sent:  {'bizId': '2', 'type': '裤子', 'value': 1}
Message to be sent:  {'bizId': '3', 'type': '袜子', 'value': 1}
Message to be sent:  {'bizId': '4', 'type': '皮带', 'value': 1}
Message to be sent:  {'bizId': '5', 'type': '化妆品', 'value': 1}
Message to be sent:  {'bizId': '6', 'type': '背包', 'value': 1}
Message to be sent:  {'bizId': '7', 'type': '书籍', 'value': 1}
Message to be sent:  {'bizId': '8', 'type': '零食', 'value': 1}
Message to be sent:  {'bizId': '9', 'type': '运动服', 'value': 1}
Message to be sent:  {'bizId': '10', 'type': '乐器', 'value': 1}

消息订阅

当消息发布到Kafka消息队列上之后,后续就可以通过订阅相关主题的消息,来自动获取到特定消息数据。

注:这两个Jar包需要提前下载:spark-sql-kafka-0-10_2.11-2.4.8.jar,kafka-clients-2.5.1.jar

代码

#import findspark
#findspark.init()
##############################################
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

KAFKA_TOPIC_READ = "kf2pyspark"#要订阅的主题
KAFKA_WRITE_TOPIC = "pyspark2kf"#要发布的主题
KAFKA_SERVERS = '127.0.0.1:9092'#kafka服务的地址信息
JARS_PATH ="spark-sql-kafka-0-10_2.11-2.4.8.jar,kafka-clients-2.5.1.jar"
#要加载的jar包
if __name__ == "__main__":
    print("PySpark  Kafka  Started ...")
    spark = SparkSession \
        .builder \
        .appName("PySpark Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", JARS_PATH) \
        .config("spark.executor.extraClassPath", JARS_PATH) \
        .config("spark.executor.extraLibrary", JARS_PATH) \
        .config("spark.driver.extraClassPath", JARS_PATH) \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    #从kf2pyspark主题中读取Kafaka流数据
    sdf = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVERS) \
        .option("subscribe", KAFKA_TOPIC_READ) \
        .option("startingOffsets", "latest") \
        .load()
    #注意Kafaka流数据的格式,业务数据都是在value字段中
    sdf.printSchema()
    sdf = sdf.selectExpr("CAST(value AS STRING)")#将value字段转换数据类型
   
    bizSchema = StructType() \
        .add("type", StringType()) \
        .add("value", IntegerType()) 
        
    bizDf = sdf.select(from_json(col("value"), bizSchema)\
        .alias("json"))#将字符串value列转换成一个json格式的值

    #bizDf.printSchema()
    bizDf = bizDf.select("json.*")#只获取业务数据,即json下的所有字段
    #sdf3.printSchema()
    bizDfCalc = bizDf.groupBy("type")\
        .agg({'value': 'sum'}) \
        .select("type",col("sum(value)").alias("amount"))#对业务数据按type进行分组求和
        
    #通过在bizDf对象上执行writeStream来创建一个写入流,并发布到主题为pyspark2kf的Kafka消息队列上
    #Kafka主题数据必须有value字段
    write_stream = bizDfCalc \
        .select(to_json(struct(col("amount"), col("type"))).alias("value"))\
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVERS) \
        .option("topic", KAFKA_WRITE_TOPIC) \
        .option("checkpointLocation", "checkpoints/") \
        .outputMode("complete") \
        .trigger(processingTime='2 seconds') \
        .queryName("sink2kafka")

    query = write_stream.start()
    query.awaitTermination()


注意:发布到Kafka消息队列上的数据,必须有value字段,否则会报无法找到value的错误。

提交

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8 pyspark-kafka.py

注意:pacages一项必须提供,并且注意格式:groupId:artifactId:version

输出

        [SUCCESSFUL ] org.xerial.snappy#snappy-java;1.1.8.2!snappy-java.jar(bundle) (737ms)
downloading file:/C:/Users/maxin/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar ...
        [SUCCESSFUL ] org.slf4j#slf4j-api;1.7.16!slf4j-api.jar (10ms)
:: resolution report :: resolve 11032ms :: artifacts dl 3315ms
        :: modules in use:
        org.apache.kafka#kafka-clients;2.0.0 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.11;2.4.8 from central in [default]
        org.lz4#lz4-java;1.4.0 from local-m2-cache in [default]
        org.slf4j#slf4j-api;1.7.16 from local-m2-cache in [default]
        org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
        org.xerial.snappy#snappy-java;1.1.8.2 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   6   |   6   |   6   |   0   ||   6   |   6   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e0fe7a47-adcc-4653-b75d-e4cabf9470ec
        confs: [default]
        6 artifacts copied, 0 already retrieved (4700kB/40ms)
PySpark  Kafka  Started ...
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

构建服务

Flask环境搭建 参见 PySpark实战:Kafka和Flask环境搭建

组织结构图

Python21080302.png

其中templates放置index.html

static/js 放置以下三个文件:

Python21081701.png

app.py

Flask的主程序代码 app.py:

import psutil
import time
from threading import Lock
from flask import Flask, render_template #导入Flask相关模块
from flask_socketio import SocketIO #导入Socket相关模块
from kafka import KafkaConsumer

async_mode = None
app = Flask(__name__) #创建一个Flask框架的app对象
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()

def bg_task():#后台任务,它被后面的函数调用。它的作用就是订阅kafka上的消息,并返回客户端 以绘制图形
    consumer = KafkaConsumer('pyspark2kf', bootstrap_servers=['127.0.0.1:9092'])
    for msg in consumer:
        print(msg)
        socketio.emit('server_response', 
        {'data': str(msg.value,'utf-8'), 'isOk': 1}
        ,namespace='/kzstream')
        
 
@app.route('/')#监听请求,在访问网站根目录时,会被此修饰的index函数拦截并处理
def index():
    #templates目录下
    return render_template('index.html', async_mode=socketio.async_mode)

@socketio.on('connect', namespace='/kzstream')#监听socket消息,一旦建立连接,就会启动一个新的后台进程(bg_task)来进行消息处理
def mtest_connect():
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(bg_task)


if __name__ == '__main__':
    socketio.run(app, debug=True)

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>实时销量图</title>
    <script src="/static/js/jquery.js"></script>
    <script src="/static/js/socket.io.js"></script>
    <script src="/static/js/echarts.min.js"></script>
</head>
 
<body>
    <div id="kzChart" style="height:800px;width:99%;">
    </div>
 
    <script type="text/javascript">

    var myChart = echarts.init(document.getElementById('kzChart'));
    myChart.setOption({
        title: {
            text: '实时销量对比图'
        },
        tooltip: {},
        legend: {
            data:['销量']
        },
        xAxis: {
            data: []
        },
        yAxis: {},
        series: [{
            name: '销量',
            type: 'bar',
            data: []
        }]
    });
    var xdata = ["","","","","","","","","",""],
        ydata = [0,0,0,0,0,0,0,0,0,0]

    var update_mychart = function (data) {
        myChart.hideLoading();
        xdata.push(data.type);
        ydata.push(parseFloat(data.amount));
        if (xdata.length >= 10){
            xdata.shift();
            ydata.shift();
        }
        myChart.setOption({
            xAxis: {
                data: xdata
            },
            series: [{
                name: '销量', 
                data: ydata
            }]
        });
 
    };
    myChart.showLoading();
 
    $(document).ready(function() {
        namespace = '/kzstream';
        var socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port + namespace);
        socket.on('server_response', function(res) {
            console.log(res)
            update_mychart(JSON.parse(res.data));
        });
 
    });
 
    </script>
</body>
</html>

运行项目

在保证kafka服务开启的前提下

分别执行以下命令:

python kafka_producer.py

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8 pyspark-kafka.py

set FLASK_APP=app

flask run

在浏览器上打开:http://127.0.0.1:5000/

Python21080303.png