Windows下搭建Kafka环境

来自CloudWiki
跳转至: 导航搜索

介绍

Kafka是当前一个非常流行的开源流处理平台,

也是一个具有高吞吐量的分布式订阅消息系统。

支持每秒数百万的消息处理。

  • Kafka的运行需要安装JDK
  • Kafka的运行依赖于Zookeeper,

Kafka的部署分为单机模式和分布式模式,单机模型可以利用Kafka自带的Zookeeper,如果是分布式模式,则需要采用外部的Zookeeper.

Kafka部署

下载Kafka

http://kafka.apache.org/downloads

解压kafka_2.12-2.5.1.tgz到 D:\kafka

配置信息:在D:\kafka\config目录中,

用文本编辑器打开server.properties 进行编辑

log.dirs=log.dirs=D:\kafka\data

启动Kafka,打开cmd,切换到D:\kafka\bin\windows 目录下

zookeeper-server-start.bat ..\..\config\zookeeper.properties

zookeeper运行之后,另开一个CMD窗口开启kafka:

D:\>cd D:\kafka\bin\windows

D:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

输出:

[2021-08-01 18:15:39,184] INFO Kafka version: 2.5.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-01 18:15:39,187] INFO Kafka commitId: 0efa8fb0f4c73d92 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-01 18:15:39,191] INFO Kafka startTimeMs: 1627812939159 (org.apache.kafka.common.utils.AppInfoParser)
[2021-08-01 18:15:39,203] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

安装python库

pip3 install kafka

pip3 install kafka-python

这是用python语言与kafka交互的库。

Kafka验证

生产消息

运行以下测试代码:

from kafka import KafkaProducer
import time
import random
import json
from json import dumps
from datetime import datetime
#连接kafka
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', \
    value_serializer=lambda x: dumps(x).encode('utf-8'))
#127.0.0.1:9092代表Kafka服务器地址
#value_serializer表示对业务数据value进行序列化操作
#这里利用json模块中的dumps将JSON格式的数据序列化,且编码指定为utf-8

#用循环构建一系列数据
key='streaming'.encode('utf-8')
typeList =['鞋子','裤子','袜子','皮带','化妆品','背包','书籍',
    '零食','运动服','乐器']
#发送内容,必须是bytes类型
for i in range(0, 1000):
    vjson = {}
    for t in typeList:
        i = i + 1
        vjson["bizId"] = str(i)
        vjson["type"] = t
        vjson["value"] = 1
        #vjson["datetime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print("Message to be sent: ", vjson)
        producer.send('kf2pyspark',value= vjson , key=key)
        #将产生的数据通过send方法发布到主题为kf2pyspark的Kafka消息队列上去
        time.sleep(1)
producer.flush() 
producer.close()


如果出现下述输出 ,说明kafka安装成功。

 
Message to be sent:  {'bizId': '18', 'type': '化妆品', 'value': 1}
Message to be sent:  {'bizId': '19', 'type': '背包', 'value': 1}
Message to be sent:  {'bizId': '20', 'type': '书籍', 'value': 1}
Message to be sent:  {'bizId': '21', 'type': '零食', 'value': 1}
Message to be sent:  {'bizId': '22', 'type': '运动服', 'value': 1}

消费消息

import psutil

from kafka import KafkaConsumer

#订阅kafka上的消息,并打印
def bg_task():
    consumer = KafkaConsumer('kf2pyspark', bootstrap_servers=['127.0.0.1:9092'])
    for msg in consumer:
        #print(msg)
        print(msg.value)

bg_task()


输出:

b'{"bizId": "213", "type": "\\u4e50\\u5668", "value": -23}'
b'{"bizId": "205", "type": "\\u978b\\u5b50", "value": -5}'
b'{"bizId": "206", "type": "\\u88e4\\u5b50", "value": -36}'
b'{"bizId": "207", "type": "\\u889c\\u5b50", "value": -29}'
b'{"bizId": "208", "type": "\\u76ae\\u5e26", "value": 24}'
b'{"bizId": "209", "type": "\\u5316\\u5986\\u54c1", "value":