Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

x33g5p2x  于2021-09-19 转载在 Kafka  
字(11.1k)|赞(0)|评价(0)|浏览(860)

一、前言

随着时间的积累,日志数据会越来越多,当你需要查看并分析庞杂的日志数据时,可通过 Filebeat+Kafka+Logstash+Elasticsearch 采集日志数据到 Elasticsearch(简称ES)中,并通过 Kibana 进行可视化展示与分析。

本文介绍具体的实现方法。

二、背景信息

Kafka 是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。在实际应用场景中,为了满足大数据实时检索的需求,一般可以使用 Filebeat 采集日志数据,将 Kafka 作为 Filebeat 的输出端。Kafka 实时接收到 Filebeat 采集的数据后,以 Logstash 作为输出端输出。输出到 Logstash 中的数据在格式或内容上可能不能满足你的需求,此时可以通过 Logstash 的 filter 插件过滤数据。最后将满足需求的数据输出到 ES 中进行分布式检索,并通过 Kibana 进行数据分析与展示。

简单处理流程如下:

三、操作流程

  1. 准备工作
  • 完成环境准备
  • 包括创建对应服务
  • 安装 Filebeat 。
  • 配置 Filebeat:配置 Filebeat 的 input 为系统日志,outpu 为 Kafka,将日志数据采集到 Kafka 的指定 Topic 中。
  • 配置 Logstash 管道:配置 Logstash 管道的 input 为 Kafka,output 为ES,使用 Logstash 消费 Topic 中的数据并传输到ES 中。
  • 查看日志消费状态:在消息队列 Kafka 中查看日志数据的消费的状态,验证日志数据是否采集成功。
  • 通过 Kibana 过滤日志数据:在 Kibana 控制台的 Discover 页面,通过 Filter 过滤出 Kafka 相关的日志。

四、准备工作

CenterOS 7.6 版本,推荐 8G 以上内存。

1、Docker 环境

执行命令如下:

  1. # 在 docker 节点执行
  2. # 腾讯云 docker hub 镜像
  3. # export REGISTRY_MIRROR="https://mirror.ccs.tencentyun.com"
  4. # DaoCloud 镜像
  5. # export REGISTRY_MIRROR="http://f1361db2.m.daocloud.io"
  6. # 阿里云 docker hub 镜像
  7. export REGISTRY_MIRROR=https://registry.cn-hangzhou.aliyuncs.com
  8. # 安装 docker
  9. # 参考文档如下
  10. # https://docs.docker.com/install/linux/docker-ce/centos/
  11. # https://docs.docker.com/install/linux/linux-postinstall/
  12. # 卸载旧版本
  13. yum remove -y docker \
  14. docker-client \
  15. docker-client-latest \
  16. docker-ce-cli \
  17. docker-common \
  18. docker-latest \
  19. docker-latest-logrotate \
  20. docker-logrotate \
  21. docker-selinux \
  22. docker-engine-selinux \
  23. docker-engine
  24. # 设置 yum repository
  25. yum install -y yum-utils \
  26. device-mapper-persistent-data \
  27. lvm2
  28. yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
  29. # 安装并启动 docker
  30. yum install -y docker-ce-19.03.11 docker-ce-cli-19.03.11 containerd.io-1.2.13
  31. mkdir /etc/docker || true
  32. cat > /etc/docker/daemon.json <<EOF { "registry-mirrors": ["${REGISTRY_MIRROR}"], "exec-opts": ["native.cgroupdriver=systemd"], "log-driver": "json-file", "log-opts": { "max-size": "100m" }, "storage-driver": "overlay2", "storage-opts": [ "overlay2.override_kernel_check=true" ] } EOF
  33. mkdir -p /etc/systemd/system/docker.service.d
  34. # Restart Docker
  35. systemctl daemon-reload
  36. systemctl enable docker
  37. systemctl restart docker
  38. # 关闭 防火墙
  39. systemctl stop firewalld
  40. systemctl disable firewalld
  41. # 关闭 SeLinux
  42. setenforce 0
  43. sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config
  44. # 关闭 swap
  45. swapoff -a
  46. yes | cp /etc/fstab /etc/fstab_bak
  47. cat /etc/fstab_bak |grep -v swap > /etc/fstab

验证下 docker info:

  1. [root@vm-1]# docker info
  2. Client:
  3. Debug Mode: false
  4. Server:
  5. Containers: 16
  6. Running: 11
  7. Paused: 0
  8. Stopped: 5
  9. Images: 22
  10. Server Version: 19.03.11
  11. Storage Driver: overlay2
  12. Backing Filesystem: xfs
  13. Supports d_type: true
  14. Native Overlay Diff: true
  15. Logging Driver: json-file
  16. Cgroup Driver: systemd
  17. Plugins:
  18. Volume: local
  19. Network: bridge host ipvlan macvlan null overlay
  20. Log: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslog
  21. Swarm: inactive
  22. Runtimes: runc
  23. Default Runtime: runc
  24. Init Binary: docker-init
  25. containerd version: 7ad184331fa3e55e52b890ea95e65ba581ae3429
  26. runc version: dc9208a3303feef5b3839f4323d9beb36df0a9dd
  27. init version: fec3683
  28. Security Options:
  29. seccomp
  30. Profile: default
  31. Kernel Version: 3.10.0-1127.el7.x86_64
  32. Operating System: CentOS Linux 7 (Core)
  33. OSType: linux
  34. Architecture: x86_64
  35. CPUs: 4
  36. Total Memory: 11.58GiB
  37. Name: vm-autotest-server
  38. ID: KQ5B:KAG5:LLB5:CUD4:NQZX:4GHL:5XLY:FM7X:KRJ5:X3WK:42GV:QLON
  39. Docker Root Dir: /var/lib/docker
  40. Debug Mode: false
  41. Registry: https://index.docker.io/v1/
  42. Labels:
  43. Experimental: false
  44. Insecure Registries:
  45. 172.16.62.179:5000
  46. 127.0.0.0/8
  47. Registry Mirrors:
  48. https://registry.cn-hangzhou.aliyuncs.com/
  49. Live Restore Enabled: false

2、Docker Compose 环境

Docker Compose是一个用于定义和运行多个 docker 容器应用的工具。使用 Compose 你可以用 YAML 文件来配置你的应用服务,然后使用一个命令,你就可以部署你配置的所有服务了。

  1. # 下载 Docker Compose
  2. sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
  3. # 修改该文件的权限为可执行
  4. chmod +x /usr/local/bin/docker-compose
  5. # 验证信息
  6. docker-compose --version

3、版本准备

组件版本部署方式
elasticsearch7.6.2Docker Compose
logstash7.6.2Docker Compose
kibana7.6.2Docker Compose
zookeeperlatestDocker Compose
kafkalatestDocker Compose
filebeat7.4.2二进制

4、环境初始化

执行命令如下:

  1. # 需要设置系统内核参数,否则 ES 会因为内存不足无法启动
  2. # 改变设置
  3. sysctl -w vm.max_map_count=262144
  4. # 使之立即生效
  5. sysctl -p
  6. # 创建 logstash 目录,并将 Logstash 的配置文件 logstash.conf 拷贝到该目录
  7. mkdir -p /mydata/logstash
  8. # 需要创建 elasticsearch/data 目录并设置权限,否则 ES 会因为无权限访问而启动失败
  9. mkdir -p /mydata/elasticsearch/data/
  10. chmod 777 /mydata/elasticsearch/data/

5、服务安装

docker-compose.yml 文件内容为:

  1. version: '3'
  2. services:
  3. elasticsearch:
  4. image: elasticsearch:7.6.2
  5. container_name: elasticsearch
  6. user: root
  7. environment:
  8. - "cluster.name=elasticsearch" #设置集群名称为elasticsearch
  9. - "discovery.type=single-node" #以单一节点模式启动
  10. volumes:
  11. - /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins #插件文件挂载
  12. - /mydata/elasticsearch/data:/usr/share/elasticsearch/data #数据文件挂载
  13. - /etc/localtime:/etc/localtime:ro
  14. - /usr/share/zoneinfo:/usr/share/zoneinfo
  15. ports:
  16. - 9200:9200
  17. - 9300:9300
  18. networks:
  19. - elastic
  20. logstash:
  21. image: logstash:7.6.2
  22. container_name: logstash
  23. environment:
  24. - TZ=Asia/Shanghai
  25. volumes:
  26. - /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf #挂载logstash的配置文件
  27. depends_on:
  28. - elasticsearch #kibana在elasticsearch启动之后再启动
  29. links:
  30. - elasticsearch:es #可以用es这个域名访问elasticsearch服务
  31. ports:
  32. - 5044:5044
  33. networks:
  34. - elastic
  35. kibana:
  36. image: kibana:7.6.2
  37. container_name: kibana
  38. links:
  39. - elasticsearch:es #可以用es这个域名访问elasticsearch服务
  40. depends_on:
  41. - elasticsearch #kibana在elasticsearch启动之后再启动
  42. environment:
  43. - "elasticsearch.hosts=http://es:9200" #设置访问elasticsearch的地址
  44. - /etc/localtime:/etc/localtime:ro
  45. - /usr/share/zoneinfo:/usr/share/zoneinfo
  46. ports:
  47. - 5601:5601
  48. networks:
  49. - elastic
  50. zookeeper:
  51. image: wurstmeister/zookeeper
  52. container_name: zookeeper
  53. volumes:
  54. - /mydata/zookeeper/data:/data
  55. - /mydata/zookeeper/log:/datalog
  56. - /etc/localtime:/etc/localtime:ro
  57. - /usr/share/zoneinfo:/usr/share/zoneinfo
  58. networks:
  59. - elastic
  60. ports:
  61. - "2181:2181"
  62. kafka:
  63. container_name: kafka
  64. image: wurstmeister/kafka
  65. depends_on:
  66. - zookeeper
  67. volumes:
  68. - /var/run/docker.sock:/var/run/docker.sock
  69. - /mydata/kafka:/kafka
  70. - /etc/localtime:/etc/localtime:ro
  71. - /etc/localtime:/etc/localtime:ro
  72. links:
  73. - zookeeper
  74. ports:
  75. - "9092:9092"
  76. networks:
  77. - elastic
  78. environment:
  79. - KAFKA_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
  80. - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092, OUT://kafka:29092
  81. - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUT:PLAINTEXT
  82. - KAFKA_INTER_BROKER_LISTENER_NAME=OUT
  83. - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
  84. - KAFKA_MESSAGE_MAX_BYTES=2000000
  85. - KAFKA_CREATE_TOPICS=logs:1:1
  86. networks:
  87. elastic:

将该文件上传的 linux 服务器上,执行 docker-compose up 命令即可启动所有服务。

  1. [root@vm-1]# docker-compose -f docker-compose.yml up -d
  2. [root@vm-1]# docker-compose ps
  3. Name Command State Ports
  4. -----------------------------------------------------------------------------------------------------------
  5. elasticsearch /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp
  6. kafka start-kafka.sh Up 0.0.0.0:9092->9092/tcp
  7. kibana /usr/local/bin/dumb-init - ... Up 0.0.0.0:5601->5601/tcp
  8. logstash /usr/local/bin/docker-entr ... Up 0.0.0.0:5044->5044/tcp, 9600/tcp
  9. zookeeper /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
  10. [root@vm-autotest-server elk]#

filebeat 客户端安装方式:

  1. curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.4.2-linux-x86_64.tar.gz
  2. tar xzvf filebeat-7.4.2-linux-x86_64.tar.gz
  3. cd filebeat-7.4.2-linux-x86_64

6、服务设置

当所有依赖服务启动完成后,需要对以下服务进行一些设置。

  1. # elasticsearch 需要安装中文分词器 IKAnalyzer,并重新启动。
  2. docker exec -it elasticsearch /bin/bash
  3. #此命令需要在容器中运行
  4. elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.2/elasticsearch-analysis-ik-7.6.2.zip
  5. docker restart elasticsearch
  6. # logstas h需要安装 json_lines 插件,并重新启动。
  7. docker exec -it logstash /bin/bash
  8. logstash-plugin install logstash-codec-json_lines
  9. docker restart logstash

五、配置 Filebeat

修改 filebeat.yml 文件内容

  1. ilebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /var/log/nginx/*.log
  6. filebeat.config.modules:
  7. path: ${path.config}/modules.d/*.yml
  8. reload.enabled: false
  9. setup.template.settings:
  10. index.number_of_shards: 1
  11. setup.dashboards.enabled: false
  12. setup.kibana:
  13. host: "http://kafka:5601"
  14. output.kafka:
  15. hosts: ["kafka:9092"]
  16. topic: 'logs'
  17. codec.json:
  18. pretty: false

参数说明:

参数说明
type输入类型。设置为log,表示输入源为日志。
enabled设置配置是否生效。true表示生效,false表示不生效。
paths需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。
hosts消息队列Kafka实例的接入点。
topic日志输出到消息队列Kafka的Topic,请指定为已创建的Topic。

注意:
客户端 hosts 添加 kafka 对应 server 的 ip 地址 以及 filebeat 配置建议使用 ansible。

  1. [root@vm-1# cat /etc/hosts
  2. 172.16.62.179 kafka
  3. # 客户端启动服务
  4. [root@vm-1#./filebeat &

更多配置请参见:

六、配置 Logstash 管道

修改 logstash.conf 内容:

  1. input {
  2. # # 来源beats
  3. # beats {
  4. # 端口
  5. # port => "5044"
  6. # }
  7. kafka {
  8. bootstrap_servers => "kafka:29092"
  9. topics => ["logs"]
  10. group_id => "logstash"
  11. codec => json
  12. }
  13. }
  14. # 分析、过滤插件,可以多个
  15. # filter {
  16. # grok {
  17. # match => { "message" => "%{COMBINEDAPACHELOG}"}
  18. # }
  19. # geoip {
  20. # source => "clientip"
  21. # }
  22. # }
  23. output {
  24. # 选择elasticsearch
  25. elasticsearch {
  26. hosts => ["http://es:9200"]
  27. #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  28. index => "logs-%{+YYYY.MM.dd}"
  29. }
  30. }

input 参数说明:

参数说明
bootstrap_servers消息队列 Kafka 实例的接入点
group_id指定已创建的 Consumer Group 的名称。
topics指定为已创建的 Topic 的名称,需要与 Filebeat 中配置的 Topic 名称保持一致。
codec设置为 json,表示解析 JSON 格式的字段,便于在 Kibana 中分析。

output 参数说明:

参数说明
hostsES的访问地址,取值为http://<es内网地址>:9200
user访问 ES 的用户名,默认为 elastic。
password访问 ES 的密码。
index索引名称。设置为 /*/logs‐%{+YYYY.MM.dd} //*表示索引名称以 logs 为前缀,以日期为后缀,例如 logs-2021.09.28

注意:
logstash 中最为关键的地方在于 filter,为了调试 filter 的配置。

更多配置请参见:

七、查看 kafka 日志消费状态

操作命令如下:

  1. # 进入容器
  2. docker exec -it kafka bash
  3. # kafka 默认安装在 /opt/kafka
  4. cd opt/kafka
  5. # 要想查询消费数据,必须要指定组
  6. bash-5.1# bin/kafka-consumer-groups.sh --bootstrap-server 172.16.62.179:9092 --list
  7. logstash
  8. # 查看 topic
  9. bash-5.1# bin/kafka-topics.sh --list --zookeeper 172.16.62.179:2181
  10. __consumer_offsets
  11. logs
  12. # 查看消费情况
  13. bash-5.1# bin/kafka-consumer-groupsdescribe --bootstrap-server 172.16.62.179:9092 --group logstash
  14. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  15. logstash logs 0 107335 107335 0 logstash-0-c6d82a1c-0f14-4372-b49f-8cd476f54d90 /172.19.0.2 logstash-0
  16. #参数解释:
  17. #--describe 显示详细信息
  18. #--bootstrap-server 指定kafka连接地址
  19. #--group 指定组。

字段解释:

TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
topic名字分区id当前已消费的条数总条数未消费的条数消费id主机ip客户端id

从上面的信息可以看出,topic 为 logs 总共消费了 107335 条信息, 未消费的条数为 0。也就是说,消费数据没有积压的情况.

八、查看 ES 内容

通过 elasticsearch-head 插件查看 ES 中是否收到了由 logstash 发送过来的日志

九、通过 Kibana 过滤日志数据

1、创建 index-pattern

打开 es,进入首页后,点击“connect to your Elasticsearch index”

填入 es 中的索引名,支持正则匹配,输入 Index pattern(本文使用 logs-/*),单击 Next step。

选择“@timestamp”作为时间过滤字段,然后点击“create index pattern”:

创建完成后:

2、查看日志

在左侧导航栏,单击 Discover。

从页面左侧的下拉列表中,选择已创建的索引模式(logs-/*)。
在页面右上角,选择一段时间,查看对应时间段内的 Filebeat 采集的日志数据。

十、小结

在企业实际项目中,elk 是比较成熟且广泛使用的技术方案。logstash 性能稍弱于 filebeat,一般不直接运行于采集点,推荐使用filebeat。在日志进入elk前,从经验性角度,前置 kafka,一方面作为队列和缓冲,另一方面提供了统一的入口渠道。

源码地址:

相关文章