“Python消费Kafka数据到ElasticSearch代替Logstash”的版本间的差异
第4行: | 第4行: | ||
'''ElasticSearch介绍'''<br> | '''ElasticSearch介绍'''<br> | ||
− | Elasticsearch是-个开源的分布式、RESTful 风格的搜索和数据分析引擎,它的底层是开源库Apache Lucene。 | + | Elasticsearch是-个开源的分布式、RESTful 风格的搜索和数据分析引擎,它的底层是开源库Apache Lucene。<br> |
− | Lucene可以说是当下最先进、高性能、全功能的搜索|擎库一无论是开源还是私有,但它也仅仅只是一个库。为了充分发挥其功能,你需要使用Java并将Lucene直接集成到应用程序中。更糟糕的是 ,您可能需要获得信息检索学位才能了解其工作原理,因为Lucene非常复杂。 | + | Lucene可以说是当下最先进、高性能、全功能的搜索|擎库一无论是开源还是私有,但它也仅仅只是一个库。为了充分发挥其功能,你需要使用Java并将Lucene直接集成到应用程序中。更糟糕的是 ,您可能需要获得信息检索学位才能了解其工作原理,因为Lucene非常复杂。<br> |
− | 为了解决Lucene使用时的繁复性,于是Elasticsearch便应运而生。它使用Java编写,内部采用Lucene做索引与搜索,但是它的目标是使全文检索变得更简单,简单来说,就是对Lucene做了一层封装,它提供了一套简单-致的RESTful API来帮助我们实现存储和检索。 | + | 为了解决Lucene使用时的繁复性,于是Elasticsearch便应运而生。它使用Java编写,内部采用Lucene做索引与搜索,但是它的目标是使全文检索变得更简单,简单来说,就是对Lucene做了一层封装,它提供了一套简单-致的RESTful API来帮助我们实现存储和检索。<br> |
− | + | ||
'''ELK日志系统改进之ELFK''' <br> | '''ELK日志系统改进之ELFK''' <br> | ||
− | 因为我们的主要业务的开发语言是PHP,PHP产生的日志并不多,但是PHP毕竟是解释性的语言,运行效率并不高,但是我们公司业务并发却非常高。并发至少有10万以上。 | + | 因为我们的主要业务的开发语言是PHP,PHP产生的日志并不多,但是PHP毕竟是解释性的语言,运行效率并不高,但是我们公司业务并发却非常高。并发至少有10万以上。<br> |
− | 有些业务是Java,比如位置上报的业务,微服务也是公司自己开发的,可能是框架也不完善,不像Spring Boot哪样成熟,打出的日志特别多,-个Java的微服务每天就要产生就几个T的数据。有些微服务的日志还是info级别的。 | + | 有些业务是Java,比如位置上报的业务,微服务也是公司自己开发的,可能是框架也不完善,不像Spring Boot哪样成熟,打出的日志特别多,-个Java的微服务每天就要产生就几个T的数据。有些微服务的日志还是info级别的。<br> |
− | 随着时间的积累,公司的业务系统每天产生至少5个T的日志,日志已经有肌百T以及有PB级别的日志量了。 | + | 随着时间的积累,公司的业务系统每天产生至少5个T的日志,日志已经有肌百T以及有PB级别的日志量了。<br> |
− | 同时大数据部门]也是查ElasticSearch集群的接口,导致ElasticSearch的压力特别大。这样导致有时候查询历史日志会很慢,如果在业务的高峰期,如果ElasticSearch的压励过大的话, Logstash送给ElasticSearch的数据会无响应,所以会有丢数据的风险目前采用的Filbeat + Logstash+ ElasticSearch+ Kibana的架构已经无法满足需求了。 | + | 同时大数据部门]也是查ElasticSearch集群的接口,导致ElasticSearch的压力特别大。这样导致有时候查询历史日志会很慢,如果在业务的高峰期,如果ElasticSearch的压励过大的话, Logstash送给ElasticSearch的数据会无响应,所以会有丢数据的风险目前采用的Filbeat + Logstash+ ElasticSearch+ Kibana的架构已经无法满足需求了。<br> |
− | 于是我们想到使用MQ进行缓冲,消息队列进行缓冲那应该选哪个产品了,消息中间件考虑的几个软件有Redis, Rabitmq, ActiveMq, Kafka等 ,对于这几个的考虑我们毫不犹豫地选择了Kafka,因为Kafak的吞吐量此其他都高,Kafka性能远超过 ActiveMQ、RabbitMQ等 | + | 于是我们想到使用MQ进行缓冲,消息队列进行缓冲那应该选哪个产品了,消息中间件考虑的几个软件有Redis, Rabitmq, ActiveMq, Kafka等 ,对于这几个的考虑我们毫不犹豫地选择了Kafka,因为Kafak的吞吐量此其他都高,Kafka性能远超过 ActiveMQ、RabbitMQ等.<br> |
第42行: | 第42行: | ||
mv kafka_2.11-2.0.0/opt/application/kafka/ | mv kafka_2.11-2.0.0/opt/application/kafka/ | ||
− | ''' | + | '''启动Kafka:''' |
cd /opt/application/kafka/kafka_2.11-2.0.0/ | cd /opt/application/kafka/kafka_2.11-2.0.0/ | ||
第133行: | 第133行: | ||
'''启动Filebeat''' | '''启动Filebeat''' | ||
+ | |||
+ | ./filebeat test config -c filebeat.yml #测试配置文件是否正常 | ||
+ | ./filebeat -c filebeat.yml & #启动并后台运行 | ||
+ | |||
+ | '''ELFK组件功能分析'''<br> | ||
+ | 上面的配置文件就讲到这里,因为我们要编写程序代替L ogstash,所以我们先要把各个组件的内容弄清楚,那样我们才知道如何编写功能代码。对于上面的组件配置功能,总结和代码相关的功能如下:<br> | ||
+ | |||
+ | Filbeat主要相关的功能:<br> | ||
+ | (1) output.kafka于输出到Kafka中的相关功能。<br> | ||
+ | (2) filebeat.yml主配置文件中的%{[topic]}于输出到指定的topic,其中以daemon.yml为例就是输出到kakfa的log-alert的topic中。<br> | ||
+ | (3) filebeat采集的是/home/data/logs 下的日志文件, 組还打了type标签为daemon<br> |
2020年8月16日 (日) 10:17的版本
Python消费Kafka数据到ElasticSearch代替Logstash
ElasticSearch介绍
Elasticsearch是-个开源的分布式、RESTful 风格的搜索和数据分析引擎,它的底层是开源库Apache Lucene。
Lucene可以说是当下最先进、高性能、全功能的搜索|擎库一无论是开源还是私有,但它也仅仅只是一个库。为了充分发挥其功能,你需要使用Java并将Lucene直接集成到应用程序中。更糟糕的是 ,您可能需要获得信息检索学位才能了解其工作原理,因为Lucene非常复杂。
为了解决Lucene使用时的繁复性,于是Elasticsearch便应运而生。它使用Java编写,内部采用Lucene做索引与搜索,但是它的目标是使全文检索变得更简单,简单来说,就是对Lucene做了一层封装,它提供了一套简单-致的RESTful API来帮助我们实现存储和检索。
ELK日志系统改进之ELFK
因为我们的主要业务的开发语言是PHP,PHP产生的日志并不多,但是PHP毕竟是解释性的语言,运行效率并不高,但是我们公司业务并发却非常高。并发至少有10万以上。
有些业务是Java,比如位置上报的业务,微服务也是公司自己开发的,可能是框架也不完善,不像Spring Boot哪样成熟,打出的日志特别多,-个Java的微服务每天就要产生就几个T的数据。有些微服务的日志还是info级别的。
随着时间的积累,公司的业务系统每天产生至少5个T的日志,日志已经有肌百T以及有PB级别的日志量了。
同时大数据部门]也是查ElasticSearch集群的接口,导致ElasticSearch的压力特别大。这样导致有时候查询历史日志会很慢,如果在业务的高峰期,如果ElasticSearch的压励过大的话, Logstash送给ElasticSearch的数据会无响应,所以会有丢数据的风险目前采用的Filbeat + Logstash+ ElasticSearch+ Kibana的架构已经无法满足需求了。
于是我们想到使用MQ进行缓冲,消息队列进行缓冲那应该选哪个产品了,消息中间件考虑的几个软件有Redis, Rabitmq, ActiveMq, Kafka等 ,对于这几个的考虑我们毫不犹豫地选择了Kafka,因为Kafak的吞吐量此其他都高,Kafka性能远超过 ActiveMQ、RabbitMQ等.
实验环境介绍
操作系统:CentOS7.6 ES版本:7.3.0 filebat版本:7.3.0 Kafka版本:2.0.0 日志路径:/home/data/logs
Kafka安装配置:
因为Kafka是依赖Java的,所以我们要想把Java环境安装好: 测试命令 java -version
Kafka的配置文件直接用默认的,不需要其他处理,我们这里直接默认配置文件让它跑起来。
mkdir /opt/tools cd /opt/tools wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz //运行wget下载速度太慢,可以使用迅雷下载完成后,xftp上传到linux mkdir /opt/application/kafka mv kafka_2.11-2.0.0/opt/application/kafka/
启动Kafka:
cd /opt/application/kafka/kafka_2.11-2.0.0/ bin/zookeeper-server-start.sh config/zookeeper.properties //启动内置Zookeeper bin/zookeeper-server-start.sh -daemon config/zookeeper.properties //使用守护进程启动 bin/kafka-server-start.sh config/server.properties //前台启动Kafka bin/kafka-server-start.sh -daemon config/server.properties daemon方式启动Kafka
Kafka常用命令介绍:
./kafka-topics.sh --list --zookeeper localhost:2181 //topic描述 ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxxxxx //创建topic ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic javadaemon --from-beginning //控制台消费topic的数据输出 ./kafka-console-producer.sh --broker-list localhost:9092 --topic xxxxxxxx //生产数据 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list master:9092 --topic javadaemon --time -1 //查看topic对应的消息数量
我们创建一个topic命名为javadaemon 用于后续测试
./kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic javadaemon
查看是否创建成功:
./kafka-topics.sh --list --zookeeper master:2181
Filebeat的安装和配置
因为之前ElasticSearch采用的是7.3.0的版本,那么其他Elastic 家族的其他组件要一致。所以Filbeat也是7.3.0。
mkdir /opt/tools/ cd /opt/tools wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.3.0-linux-x86_64.tar.gz //运行wget下载速度太慢,可以使用迅雷下载完成后,xftp上传到linux tar xzf filebeat-7.3.0-linux-x86_64.tar.gz mv filebeat-7.3.0-linux-x86_64 /opt/application/elk/ 创建日志目录 mkdir -p /home/data/filebeat/logs 创建filbeat配置文件目录 mkdir /home/data/filebeat/conf.d/ 创建配置文件 touch /home/data/filebeat/conf.d/daemon.yml
Filebeat配置:
采集日志手机到Kafka, 我们首先配置filebeat.yml
打开filebeat.yml配置文件 添加如下配置
#输出到Kafka output.kafka: hosts: ["master:9092"] topic: 'javadaemon' compression: gzip partition.round_robin: reachable_only: false required_acks: 1 max_message_bytes: 1000000 worker: 3 bulk_max_size: 10240 version: 2.0.0 #日志配置 logging.level: info logging.to_files: true logging.files: path: /home/data/filebeat/logs/ name: filebeat keepfiles: 7 permissions: 0644 #采集配置导入 filebeat.config.inputs: enabled: true path: /home/data/filebeat/conf.d/*.yml #这个路径就是采集日志配置文件
vi /home/data/filebeat/conf.d/daemon.yml
- type: log paths: - /home/data/logs/*/*.log exclude_files: ['\.gz$','\.access\.log.*','.*debug.*','.*multiline.*'] fields: type: daemon topic: javadaemon fields_under_root: true tail_files: false ignore_older: 5m scan_frequency: 1s processors: - drop_fields: fields: ['input','offset','prospector']
启动Filebeat
./filebeat test config -c filebeat.yml #测试配置文件是否正常 ./filebeat -c filebeat.yml & #启动并后台运行
ELFK组件功能分析
上面的配置文件就讲到这里,因为我们要编写程序代替L ogstash,所以我们先要把各个组件的内容弄清楚,那样我们才知道如何编写功能代码。对于上面的组件配置功能,总结和代码相关的功能如下:
Filbeat主要相关的功能:
(1) output.kafka于输出到Kafka中的相关功能。
(2) filebeat.yml主配置文件中的%{[topic]}于输出到指定的topic,其中以daemon.yml为例就是输出到kakfa的log-alert的topic中。
(3) filebeat采集的是/home/data/logs 下的日志文件, 組还打了type标签为daemon