Python消费Kafka数据到ElasticSearch代替Logstash

来自CloudWiki
Heart讨论 | 贡献2020年8月16日 (日) 09:34的版本
跳转至: 导航搜索

Python消费Kafka数据到ElasticSearch代替Logstash

ElasticSearch介绍
Elasticsearch是-个开源的分布式、RESTful 风格的搜索和数据分析引擎,它的底层是开源库Apache Lucene。 Lucene可以说是当下最先进、高性能、全功能的搜索|擎库一无论是开源还是私有,但它也仅仅只是一个库。为了充分发挥其功能,你需要使用Java并将Lucene直接集成到应用程序中。更糟糕的是 ,您可能需要获得信息检索学位才能了解其工作原理,因为Lucene非常复杂。 为了解决Lucene使用时的繁复性,于是Elasticsearch便应运而生。它使用Java编写,内部采用Lucene做索引与搜索,但是它的目标是使全文检索变得更简单,简单来说,就是对Lucene做了一层封装,它提供了一套简单-致的RESTful API来帮助我们实现存储和检索。 当然,Elasticsearch 不仅仅是Lucene,且也不仅仅只是一个全文搜索引擎。它可


ELK日志系统改进之ELFK
因为我们的主要业务的开发语言是PHP,PHP产生的日志并不多,但是PHP毕竟是解释性的语言,运行效率并不高,但是我们公司业务并发却非常高。并发至少有10万以上。 有些业务是Java,比如位置上报的业务,微服务也是公司自己开发的,可能是框架也不完善,不像Spring Boot哪样成熟,打出的日志特别多,-个Java的微服务每天就要产生就几个T的数据。有些微服务的日志还是info级别的。 随着时间的积累,公司的业务系统每天产生至少5个T的日志,日志已经有肌百T以及有PB级别的日志量了。 同时大数据部门]也是查ElasticSearch集群的接口,导致ElasticSearch的压力特别大。这样导致有时候查询历史日志会很慢,如果在业务的高峰期,如果ElasticSearch的压励过大的话, Logstash送给ElasticSearch的数据会无响应,所以会有丢数据的风险目前采用的Filbeat + Logstash+ ElasticSearch+ Kibana的架构已经无法满足需求了。 于是我们想到使用MQ进行缓冲,消息队列进行缓冲那应该选哪个产品了,消息中间件考虑的几个软件有Redis, Rabitmq, ActiveMq, Kafka等 ,对于这几个的考虑我们毫不犹豫地选择了Kafka,因为Kafak的吞吐量此其他都高,Kafka性能远超过 ActiveMQ、RabbitMQ等


实验环境介绍

操作系统:CentOS7.6
ES版本:7.3.0
filebat版本:7.3.0
Kafka版本:2.0.0
日志路径:/home/data/logs

Kafka安装配置:

因为Kafka是依赖Java的,所以我们要想把Java环境安装好: 测试命令 java -version

2020-08-16 170858.png

Kafka的配置文件直接用默认的,不需要其他处理,我们这里直接默认配置文件让它跑起来。

mkdir /opt/tools
cd /opt/tools
wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz  //运行wget下载速度太慢,可以使用迅雷下载完成后,xftp上传到linux
mkdir /opt/application/kafka
mv kafka_2.11-2.0.0/opt/application/kafka/

启动:

cd /opt/application/kafka/kafka_2.11-2.0.0/
bin/zookeeper-server-start.sh config/zookeeper.properties   //启动内置Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties //使用守护进程启动

 bin/kafka-server-start.sh config/server.properties  //前台启动Kafka
 bin/kafka-server-start.sh -daemon config/server.properties daemon方式启动Kafka

Kafka常用命令介绍:

./kafka-topics.sh --list --zookeeper localhost:2181  //topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxxxxx //创建topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic javadaemon --from-beginning //控制台消费topic的数据输出
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxxxxxxx  //生产数据
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list master:9092 --topic javadaemon --time -1  //查看topic对应的消息数量