Windows下搭建Kafka环境
来自CloudWiki
介绍
Kafka是当前一个非常流行的开源流处理平台,
也是一个具有高吞吐量的分布式订阅消息系统。
支持每秒数百万的消息处理。
- Kafka的运行需要安装JDK
- Kafka的运行依赖于Zookeeper,
Kafka的部署分为单机模式和分布式模式,单机模型可以利用Kafka自带的Zookeeper,如果是分布式模式,则需要采用外部的Zookeeper.
Kafka部署
下载Kafka
http://kafka.apache.org/downloads
解压kafka_2.12-2.5.1.tgz到 D:\kafka
配置信息:在D:\kafka\config目录中,
用文本编辑器打开server.properties 进行编辑
log.dirs=log.dirs=D:\kafka\data
启动Kafka,打开cmd,切换到D:\kafka\bin\windows 目录下
zookeeper-server-start.bat ..\..\config\zookeeper.properties
zookeeper运行之后,另开一个CMD窗口开启kafka:
D:\>cd D:\kafka\bin\windows
D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties
输出:
[2021-08-01 18:15:39,184] INFO Kafka version: 2.5.1 (org.apache.kafka.common.utils.AppInfoParser) [2021-08-01 18:15:39,187] INFO Kafka commitId: 0efa8fb0f4c73d92 (org.apache.kafka.common.utils.AppInfoParser) [2021-08-01 18:15:39,191] INFO Kafka startTimeMs: 1627812939159 (org.apache.kafka.common.utils.AppInfoParser) [2021-08-01 18:15:39,203] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
安装python库
pip3 install kafka
pip3 install kafka-python
这是用python语言与kafka交互的库。
Kafka验证
生产消息
运行以下测试代码:
from kafka import KafkaProducer import time import random import json from json import dumps from datetime import datetime #连接kafka producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', \ value_serializer=lambda x: dumps(x).encode('utf-8')) #127.0.0.1:9092代表Kafka服务器地址 #value_serializer表示对业务数据value进行序列化操作 #这里利用json模块中的dumps将JSON格式的数据序列化,且编码指定为utf-8 #用循环构建一系列数据 key='streaming'.encode('utf-8') typeList =['鞋子','裤子','袜子','皮带','化妆品','背包','书籍', '零食','运动服','乐器'] #发送内容,必须是bytes类型 for i in range(0, 1000): vjson = {} for t in typeList: i = i + 1 vjson["bizId"] = str(i) vjson["type"] = t vjson["value"] = 1 #vjson["datetime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print("Message to be sent: ", vjson) producer.send('kf2pyspark',value= vjson , key=key) #将产生的数据通过send方法发布到主题为kf2pyspark的Kafka消息队列上去 time.sleep(1) producer.flush() producer.close()
如果出现下述输出 ,说明kafka安装成功。
Message to be sent: {'bizId': '18', 'type': '化妆品', 'value': 1} Message to be sent: {'bizId': '19', 'type': '背包', 'value': 1} Message to be sent: {'bizId': '20', 'type': '书籍', 'value': 1} Message to be sent: {'bizId': '21', 'type': '零食', 'value': 1} Message to be sent: {'bizId': '22', 'type': '运动服', 'value': 1}
消费消息
import psutil from kafka import KafkaConsumer #订阅kafka上的消息,并打印 def bg_task(): consumer = KafkaConsumer('kf2pyspark', bootstrap_servers=['127.0.0.1:9092']) for msg in consumer: #print(msg) print(msg.value) bg_task()
输出:
b'{"bizId": "213", "type": "\\u4e50\\u5668", "value": -23}' b'{"bizId": "205", "type": "\\u978b\\u5b50", "value": -5}' b'{"bizId": "206", "type": "\\u88e4\\u5b50", "value": -36}' b'{"bizId": "207", "type": "\\u889c\\u5b50", "value": -29}' b'{"bizId": "208", "type": "\\u76ae\\u5e26", "value": 24}' b'{"bizId": "209", "type": "\\u5316\\u5986\\u54c1", "value":