Python消费Kafka数据到ElasticSearch代替Logstash

来自CloudWiki
跳转至: 导航搜索

Python消费Kafka数据到ElasticSearch代替Logstash

ElasticSearch介绍
Elasticsearch是-个开源的分布式、RESTful 风格的搜索和数据分析引擎,它的底层是开源库Apache Lucene。
Lucene可以说是当下最先进、高性能、全功能的搜索|擎库一无论是开源还是私有,但它也仅仅只是一个库。为了充分发挥其功能,你需要使用Java并将Lucene直接集成到应用程序中。更糟糕的是 ,您可能需要获得信息检索学位才能了解其工作原理,因为Lucene非常复杂。
为了解决Lucene使用时的繁复性,于是Elasticsearch便应运而生。它使用Java编写,内部采用Lucene做索引与搜索,但是它的目标是使全文检索变得更简单,简单来说,就是对Lucene做了一层封装,它提供了一套简单-致的RESTful API来帮助我们实现存储和检索。



ELK日志系统改进之ELFK
因为我们的主要业务的开发语言是PHP,PHP产生的日志并不多,但是PHP毕竟是解释性的语言,运行效率并不高,但是我们公司业务并发却非常高。并发至少有10万以上。
有些业务是Java,比如位置上报的业务,微服务也是公司自己开发的,可能是框架也不完善,不像Spring Boot哪样成熟,打出的日志特别多,-个Java的微服务每天就要产生就几个T的数据。有些微服务的日志还是info级别的。
随着时间的积累,公司的业务系统每天产生至少5个T的日志,日志已经有肌百T以及有PB级别的日志量了。
同时大数据部门]也是查ElasticSearch集群的接口,导致ElasticSearch的压力特别大。这样导致有时候查询历史日志会很慢,如果在业务的高峰期,如果ElasticSearch的压励过大的话, Logstash送给ElasticSearch的数据会无响应,所以会有丢数据的风险目前采用的Filbeat + Logstash+ ElasticSearch+ Kibana的架构已经无法满足需求了。
于是我们想到使用MQ进行缓冲,消息队列进行缓冲那应该选哪个产品了,消息中间件考虑的几个软件有Redis, Rabitmq, ActiveMq, Kafka等 ,对于这几个的考虑我们毫不犹豫地选择了Kafka,因为Kafak的吞吐量此其他都高,Kafka性能远超过 ActiveMQ、RabbitMQ等.


实验环境介绍

操作系统:CentOS7.6
ES版本:7.3.0
filebat版本:7.3.0
Kafka版本:2.0.0
日志路径:/home/data/logs

ElasticSearch安装

因为Kafka和ElasticSearch都是依赖Java的,所以我们要想把Java环境安装好: 测试命令 java -version

2020-08-16 170858.png


ElasticSearch下载安装包:

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.3.0-linux-x86_64.tar.gz

文件打开数据限制解决
vi /etc/security/limits.conf

*                soft    nofile          640000
*                hard    nofile          640000

elasticsearch用户有内存限制调整
vi /etc/sysctl.conf

vm.max_map_count=262144

使用sudo sysctl -p 生效

创建启动用户,目的是为了安全着想启动Elastic最好用普通用户启动

useradd www
mkdir /opt/tools
tar zxvf elasticsearch-7.3.0-linux-x86_64.tar.gz
mkdir /opt/application/elk
mv elasticsearch-7.3.0 /opt/application/elk
创建数据和日志目录
mkdir -p /data/to/logs
mkdir -p /data/to/data
授权启动用户
chown -R www:www /data/to
chown -R www:www /opt/

vim /opt/application/elk/elasticsearch-7.3.0/config/elasticsearch.yml

network.host: 0.0.0.0 
http.port: 9200
cluster.initial_ master_ nodes: ["vm201"]
node.name: vm201
path.data: /data/to/data
path.logs: /datal/o/logs
http.cors .enabled: true
http.cors allow-origin:“*”

切换到普通用户www进行启动

cd /opt/application/elk/elasticsearch-7.3.0/bin

./elasticsearch #前台启动
./elasticsearch -d #后台启动


Kafka安装配置:

Kafka的配置文件直接用默认的,不需要其他处理,我们这里直接默认配置文件让它跑起来。

mkdir /opt/tools
cd /opt/tools
wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz  //运行wget下载速度太慢,可以使用迅雷下载完成后,xftp上传到linux
mkdir /opt/application/kafka
mv kafka_2.11-2.0.0/opt/application/kafka/

启动Kafka:

cd /opt/application/kafka/kafka_2.11-2.0.0/
bin/zookeeper-server-start.sh config/zookeeper.properties   //启动内置Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties //使用守护进程启动

 bin/kafka-server-start.sh config/server.properties  //前台启动Kafka
 bin/kafka-server-start.sh -daemon config/server.properties daemon方式启动Kafka

Kafka常用命令介绍:

./kafka-topics.sh --list --zookeeper localhost:2181  //topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxxxxx //创建topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic javadaemon --from-beginning //控制台消费topic的数据输出
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxxxxxxx  //生产数据
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list master:9092 --topic javadaemon --time -1  //查看topic对应的消息数量


我们创建一个topic命名为javadaemon 用于后续测试

 ./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic javadaemon

查看是否创建成功:

./kafka-topics.sh --list --zookeeper master:2181


Filebeat的安装和配置

因为之前ElasticSearch采用的是7.3.0的版本,那么其他Elastic 家族的其他组件要一致。所以Filbeat也是7.3.0。

mkdir /opt/tools/
cd /opt/tools
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.3.0-linux-x86_64.tar.gz  //运行wget下载速度太慢,可以使用迅雷下载完成后,xftp上传到linux
tar xzf filebeat-7.3.0-linux-x86_64.tar.gz
mv filebeat-7.3.0-linux-x86_64 /opt/application/elk/
创建日志目录
mkdir -p /home/data/filebeat/logs
创建filbeat配置文件目录
mkdir /home/data/filebeat/conf.d/
创建配置文件
touch /home/data/filebeat/conf.d/daemon.yml

Filebeat配置:

采集日志手机到Kafka, 我们首先配置filebeat.yml

打开filebeat.yml配置文件 添加如下配置

#输出到Kafka
output.kafka:
  hosts: ["master:9092"]
  topic: 'javadaemon'
  compression: gzip
  partition.round_robin:
   reachable_only: false
  required_acks: 1
  max_message_bytes: 1000000
  worker: 3
  bulk_max_size: 10240
  version: 2.0.0
#日志配置
logging.level: info
logging.to_files: true
logging.files:
 path: /home/data/filebeat/logs/
 name: filebeat
 keepfiles: 7
 permissions: 0644
#采集配置导入
filebeat.config.inputs:
   enabled: true
   path: /home/data/filebeat/conf.d/*.yml  #这个路径就是采集日志配置文件


vi /home/data/filebeat/conf.d/daemon.yml

- type: log
  paths:
  - /home/data/logs/*/*.log
  exclude_files: ['\.gz$','\.access\.log.*','.*debug.*','.*multiline.*']
  fields:
   type: daemon
   topic: javadaemon
  fields_under_root: true
  tail_files: false
  ignore_older: 5m
  scan_frequency: 1s
  processors:
   - drop_fields:
       fields: ['input','offset','prospector']


启动Filebeat

./filebeat test config -c filebeat.yml  #测试配置文件是否正常
./filebeat -c filebeat.yml &  #启动并后台运行

ELFK组件功能分析
上面的配置文件就讲到这里,因为我们要编写程序代替L ogstash,所以我们先要把各个组件的内容弄清楚,那样我们才知道如何编写功能代码。对于上面的组件配置功能,总结和代码相关的功能如下:

Filbeat主要相关的功能:
(1) output.kafka于输出到Kafka中的相关功能。
(2) filebeat.yml主配置文件中的%{[topic]}于输出到指定的topic,其中以daemon.yml为例就是输出到kakfa的log-alert的topic中。
(3) filebeat采集的是/home/data/logs 下的日志文件, 組还打了type标签为daemon


接下来开始Python代码编写

首先安装Python依赖

pip3 install kafka-python

代码如下:

#coding:utf-8
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json
import time
from multiprocessing import Process
# 计算当前的时间
l_date = time.strftime("%Y.%m.%d", time.localtime())
local_date=str(l_date)
boot_server=['172.26.142.43:9092']
##读es
es = Elasticsearch('http://172.26.142.43:9200')
## 定义kafka到elastic的函数
def kfk2es(consumer):
   for msg in consumer:
       print("1")
       j_data = msg.value.decode('utf-8')  # 流编码成utf-8字符串
       dict_data = json.loads(j_data)
       #新建一个字典my_dict_data
       my_dict_data = dict_data
       message =dict_data['message']
       keywords =['CRITICAL','ERROR','error','Error','WARNING','Warning','warning']
       for k in keywords:
           if k in message:
               my_dict_data['msg_type'] = k
               break
       index_1 = dict_data["log"]["file"]["path"]
       #print(index_1)
       index_2 = index_1.split("/")[4]
       #print(index_2)
       index = index_2 + "-" + dict_data["topic"] + "-" + local_date
       print(index)
       ## 先判断有没有索引,没有就创建日期类的索引
       aa = es.indices.exists(index=index)
       print(dict_data)
       print("索引是:" + index)
       if aa:
           print("索引存在...不需要再创建了")
       else:
           print("创建索引")
           res = es.indices.create(index=index)
       # 读取kafka的数据写入elastic
       es.index(index=index, doc_type="_doc", body=my_dict_data)
kfk2es(consumer)


接下来启动脚本

python3 kfk2es.py

测试往日志目录里写数据:

echo "ABC" >> /home/data/logs/xxxxx/1.log

然后查看偏移量,发现偏移量增加了1,说明写入了topic:
./kafka-run-class.sh kafka.tools.GetOffsetShell -broker-list master:9092 --topic javadaemon

2020-08-16 183011.png


我们查看es里有没有创建索引命令:

curl '127.0.0.1:9200/_cat/indices?v'

通过浏览器查询安装elasticsearch-head插件方便观看:
2020-08-16 183950.png



附上几个查询方法
ES查询某个服务的日志情况
指定ID号查询mysql日志单条记录:

curl 'master:9200/mysql-javadaemon-2020.08.15/_doc/GUvN8XMBBYudbxkWxOHa?pretty=true'

列出mysql索引下的全部日志记录:

curl 'master:9200/mysql-javadaemon-2020.08.15/_doc/_search?pretty=true'

ES查询最近5分钟/24h的日志情况

curl  'master:9200/mysql-javadaemon-2020.08.16/_doc/_search?pretty' -H 'content-Type:application/json' -d '
{
 "query":{
"range":{
"post_date":{
"gte":"now-5m","lt":"now"
}
}
}
}'

ES统计不同级别日志的数量
列出Error级别的日志(GET查询):

curl -XGET 'master:9200/mysql-javadaemon-2020.08.15/_doc/_search?q=msg_type:Error'

列出Waring级别的日志(GET查询): 回显信息value为出现Waring的数量

curl  'master:9200/mysql-javadaemon-2020.08.15/_doc/_search?pretty' -H 'content-Type:application/json' -d '
{
"query":{"match":{"msg_type":"Warning"}},
"from":1,
"size":1
}'

ES统计不同服务的异常情况(发现哪个服务最容易出错)