PySpark实战:Kafka和Flask代码实现
来自CloudWiki
目录
数据生成
kafka_producer.py 代表一个数据模拟程序,可以利用循环语句产生模拟数据,
并写到Kafka特定的Topic中去。
代码
from kafka import KafkaProducer import time import random import json from json import dumps from datetime import datetime #连接kafka producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', \ value_serializer=lambda x: dumps(x).encode('utf-8')) #127.0.0.1:9092代表Kafka服务器地址 #value_serializer表示对业务数据value进行序列化操作 #这里利用json模块中的dumps将JSON格式的数据序列化,且编码指定为utf-8 #用循环构建一系列数据 key='streaming'.encode('utf-8') typeList =['鞋子','裤子','袜子','皮带','化妆品','背包','书籍', '零食','运动服','乐器'] #发送内容,必须是bytes类型 for i in range(0, 1000): vjson = {} for t in typeList: i = i + 1 vjson["bizId"] = str(i) vjson["type"] = t vjson["value"] = 1 #vjson["value"] = random.randint(-50,50) #替换值,让结果更富变化 #vjson["datetime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print("Message to be sent: ", vjson) producer.send('kf2pyspark',value= vjson , key=key) #将产生的数据通过send方法发布到主题为kf2pyspark的Kafka消息队列上去 time.sleep(1) producer.flush() producer.close()
- 一般地,Kafka的Topic消息格式都包含key和value两个字段
输出
Message to be sent: {'bizId': '1', 'type': '鞋子', 'value': 1} Message to be sent: {'bizId': '2', 'type': '裤子', 'value': 1} Message to be sent: {'bizId': '3', 'type': '袜子', 'value': 1} Message to be sent: {'bizId': '4', 'type': '皮带', 'value': 1} Message to be sent: {'bizId': '5', 'type': '化妆品', 'value': 1} Message to be sent: {'bizId': '6', 'type': '背包', 'value': 1} Message to be sent: {'bizId': '7', 'type': '书籍', 'value': 1} Message to be sent: {'bizId': '8', 'type': '零食', 'value': 1} Message to be sent: {'bizId': '9', 'type': '运动服', 'value': 1} Message to be sent: {'bizId': '10', 'type': '乐器', 'value': 1}
消息订阅
当消息发布到Kafka消息队列上之后,后续就可以通过订阅相关主题的消息,来自动获取到特定消息数据。
注:这两个Jar包需要提前下载:spark-sql-kafka-0-10_2.11-2.4.8.jar,kafka-clients-2.5.1.jar
代码
#import findspark #findspark.init() ############################################## from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * KAFKA_TOPIC_READ = "kf2pyspark"#要订阅的主题 KAFKA_WRITE_TOPIC = "pyspark2kf"#要发布的主题 KAFKA_SERVERS = '127.0.0.1:9092'#kafka服务的地址信息 JARS_PATH ="spark-sql-kafka-0-10_2.11-2.4.8.jar,kafka-clients-2.5.1.jar" #要加载的jar包 if __name__ == "__main__": print("PySpark Kafka Started ...") spark = SparkSession \ .builder \ .appName("PySpark Kafka Demo") \ .master("local[*]") \ .config("spark.jars", JARS_PATH) \ .config("spark.executor.extraClassPath", JARS_PATH) \ .config("spark.executor.extraLibrary", JARS_PATH) \ .config("spark.driver.extraClassPath", JARS_PATH) \ .getOrCreate() spark.sparkContext.setLogLevel("ERROR") #从kf2pyspark主题中读取Kafaka流数据 sdf = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KAFKA_SERVERS) \ .option("subscribe", KAFKA_TOPIC_READ) \ .option("startingOffsets", "latest") \ .load() #注意Kafaka流数据的格式,业务数据都是在value字段中 sdf.printSchema() sdf = sdf.selectExpr("CAST(value AS STRING)")#将value字段转换数据类型 bizSchema = StructType() \ .add("type", StringType()) \ .add("value", IntegerType()) bizDf = sdf.select(from_json(col("value"), bizSchema)\ .alias("json"))#将字符串value列转换成一个json格式的值 #bizDf.printSchema() bizDf = bizDf.select("json.*")#只获取业务数据,即json下的所有字段 #sdf3.printSchema() bizDfCalc = bizDf.groupBy("type")\ .agg({'value': 'sum'}) \ .select("type",col("sum(value)").alias("amount"))#对业务数据按type进行分组求和 #通过在bizDf对象上执行writeStream来创建一个写入流,并发布到主题为pyspark2kf的Kafka消息队列上 #Kafka主题数据必须有value字段 write_stream = bizDfCalc \ .select(to_json(struct(col("amount"), col("type"))).alias("value"))\ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", KAFKA_SERVERS) \ .option("topic", KAFKA_WRITE_TOPIC) \ .option("checkpointLocation", "checkpoints/") \ .outputMode("complete") \ .trigger(processingTime='2 seconds') \ .queryName("sink2kafka") query = write_stream.start() query.awaitTermination()
注意:发布到Kafka消息队列上的数据,必须有value字段,否则会报无法找到value的错误。
提交
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8 pyspark-kafka.py
注意:pacages一项必须提供,并且注意格式:groupId:artifactId:version
输出
[SUCCESSFUL ] org.xerial.snappy#snappy-java;1.1.8.2!snappy-java.jar(bundle) (737ms) downloading file:/C:/Users/maxin/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar ... [SUCCESSFUL ] org.slf4j#slf4j-api;1.7.16!slf4j-api.jar (10ms) :: resolution report :: resolve 11032ms :: artifacts dl 3315ms :: modules in use: org.apache.kafka#kafka-clients;2.0.0 from central in [default] org.apache.spark#spark-sql-kafka-0-10_2.11;2.4.8 from central in [default] org.lz4#lz4-java;1.4.0 from local-m2-cache in [default] org.slf4j#slf4j-api;1.7.16 from local-m2-cache in [default] org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default] org.xerial.snappy#snappy-java;1.1.8.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 6 | 6 | 6 | 0 || 6 | 6 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-e0fe7a47-adcc-4653-b75d-e4cabf9470ec confs: [default] 6 artifacts copied, 0 already retrieved (4700kB/40ms) PySpark Kafka Started ... root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
构建服务
Flask环境搭建 参见 PySpark实战:Kafka和Flask环境搭建
组织结构图
其中templates放置index.html
static/js 放置以下三个文件:
app.py
Flask的主程序代码 app.py:
import psutil import time from threading import Lock from flask import Flask, render_template #导入Flask相关模块 from flask_socketio import SocketIO #导入Socket相关模块 from kafka import KafkaConsumer async_mode = None app = Flask(__name__) #创建一个Flask框架的app对象 app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, async_mode=async_mode) thread = None thread_lock = Lock() def bg_task():#后台任务,它被后面的函数调用。它的作用就是订阅kafka上的消息,并返回客户端 以绘制图形 consumer = KafkaConsumer('pyspark2kf', bootstrap_servers=['127.0.0.1:9092']) for msg in consumer: print(msg) socketio.emit('server_response', {'data': str(msg.value,'utf-8'), 'isOk': 1} ,namespace='/kzstream') @app.route('/')#监听请求,在访问网站根目录时,会被此修饰的index函数拦截并处理 def index(): #templates目录下 return render_template('index.html', async_mode=socketio.async_mode) @socketio.on('connect', namespace='/kzstream')#监听socket消息,一旦建立连接,就会启动一个新的后台进程(bg_task)来进行消息处理 def mtest_connect(): global thread with thread_lock: if thread is None: thread = socketio.start_background_task(bg_task) if __name__ == '__main__': socketio.run(app, debug=True)
index.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>实时销量图</title> <script src="/static/js/jquery.js"></script> <script src="/static/js/socket.io.js"></script> <script src="/static/js/echarts.min.js"></script> </head> <body> <div id="kzChart" style="height:800px;width:99%;"> </div> <script type="text/javascript"> var myChart = echarts.init(document.getElementById('kzChart')); myChart.setOption({ title: { text: '实时销量对比图' }, tooltip: {}, legend: { data:['销量'] }, xAxis: { data: [] }, yAxis: {}, series: [{ name: '销量', type: 'bar', data: [] }] }); var xdata = ["","","","","","","","","",""], ydata = [0,0,0,0,0,0,0,0,0,0] var update_mychart = function (data) { myChart.hideLoading(); xdata.push(data.type); ydata.push(parseFloat(data.amount)); if (xdata.length >= 10){ xdata.shift(); ydata.shift(); } myChart.setOption({ xAxis: { data: xdata }, series: [{ name: '销量', data: ydata }] }); }; myChart.showLoading(); $(document).ready(function() { namespace = '/kzstream'; var socket = io.connect(location.protocol + '//' + document.domain + ':' + location.port + namespace); socket.on('server_response', function(res) { console.log(res) update_mychart(JSON.parse(res.data)); }); }); </script> </body> </html>
运行项目
在保证kafka服务开启的前提下
分别执行以下命令:
python kafka_producer.py
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8 pyspark-kafka.py
set FLASK_APP=app
flask run
在浏览器上打开:http://127.0.0.1:5000/