ELK —— Logstash 将 MySQL 数据同步至 ElasticSearch

x33g5p2x  于2021-12-06 转载在 Logstash  
字(4.0k)|赞(0)|评价(0)|浏览(534)

Author:Gorit
Date:2021/4/7
Refer:各种同类文章参考融合 + 自己的思考总结
2021年发表博文: 16/50

一、搭建环境

官网介绍
下载地址

1.0 环境依赖

  1. windows 10 开发环境
  2. jdk1.8 环境(最低版本要求,我看到目前最新)
  3. 配置好 JAVA_HOME,以及 classpath
  4. 相同版本的 ELK (我目前用的是 7.10.0,最新版本的已经更新到了 7.12.0。一定要下载相同版本的,不然会出现莫名其妙的 BUG
  5. mysql-connector-java.jar (8.0 或者 5.5 都可以,这个从maven 仓库里面找,因为同步数据用的是 jdbc)
  6. ELK 三个下载好的软件放在一起,目录中不要出现 空格,中文什么的,也会出现莫名其妙的 BUG

1.1 搭建 ElasticSearch 环境

1.1.1 ElasticSearch 简介

ElasticSearch 是基于 Lucence 的分布式搜索引擎,也可以作为“数据库”存储一些数据,同类产品还有一个叫做 solr 的,这里就不做描述

1.1.2 启动 ElasticSearch

  1. 不了解 ES 的可以先看这篇 文章,毕竟ES 的概念还是挺多的

  2. ES 的项目结构

  1. ES 启动很简单,一键启动即可 bin/elasticsearch.bat
  2. ES 会占用 92009300 端口
[2021-04-07T15:02:36,121][INFO ][o.e.t.TransportService   ] [DESKTOP-8HFODO1] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[::1]:9300}

[2021-04-07T15:02:39,181][INFO ][o.e.h.AbstractHttpServerTransport] [DESKTOP-8HFODO1] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[::1]:9200}
  1. 打开游览器访问 http://localhost:9200,可以看到你的 ES 基本信息,说明你就搭建完成了

1.2 Logstash(多数据源采集系统)

  1. 项目结构

  1. 这里需要配置一些东西才能启动,并且需要启动参数才能解决。启动 Logstash logstash -f …/config/logstash-sample.conf 即可
  2. 但是看不到效果,因为要和 ES 配合使用才行

1.3 Kibana(可视化面板)

  1. 是一个纯前端项目,下载好后,项目结构如下

  1. 启动方式同 ES,在 bin/kibana.bat ,双击即可启动
  2. 输入 http://localhost:5601 即可看到 Kibana 的控制面板,但是发现页面全是英文的,但是 kibana 也是支持中文的。
  3. 进入 config/kibana.yml ,的最后一行

  1. 然后重新启动即可

  1. 进入工作页

二、Logstash 配置

2.1 配置数据库连接

  1. 将下载好的 mysql-connector-java.8.22.jar 拷贝到 lib/mysql/

  1. 进入 config 目录,拷贝 logstash-sample.conf 并重命名为 logstash.conf
  2. 查看 logstash.conf 的内容
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

# logstash 收集模块,从日志,数据库中采集数据
input {
  beats {
    port => 5044
  }
}

# logstash 输出模块,将采集好的数据同步至 ES
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
}
  1. 因此我们需要修改 input 中的内容
input {
  beats {
    port => 5044
  }
  # 可以在 logstash 控制台输入相对应的参数,来改变 output 的行为
  stdin {}
  jdbc {
  	type => "jdbc"
  	# 数据库连接地址,我的是 MySQL 8.0 的,所以连接必须带上时区
  	jdbc_connection_string => "jdbc:mysql://连接地址:3306/数据库名称?characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai"
  	# 数据库连接账号密码
	jdbc_user => "root"
    jdbc_password => "root"
    # MySQL依赖包路径,名称一定要对应;
    jdbc_driver_library => "../lib/mysql/mysql-connector-java-8.0.22.jar"
    # the name of the driver class for mysql
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # 数据库重连尝试次数
    connection_retry_attempts => "3"
    # 判断数据库连接是否可用,默认false不开启
    jdbc_validate_connection => "true"
    # 数据库连接可用校验超时时间,默认3600S
    jdbc_validation_timeout => "3600"
    # 开启分页查询(默认false不开启);
    jdbc_paging_enabled => "true"
    # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
    jdbc_page_size => "500"
    # statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
    # sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
    # 这个你需要自己多尝试,执行 sql 文件
    #statement_filepath => "../lib/mysql/jdbc.sql"
	# 查询语句,高级一点的就是增加查询条件
    statement => "select * from `xxx`"
    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
    lowercase_column_names => false
    # Value can be any of: fatal,error,warn,info,debug,默认info;
    sql_log_level => warn
    # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
    record_last_run => true
    # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
    use_column_value => true
    # 需要记录的字段,用于增量同步,需是数据库字段
    tracking_column => "ModifyTime"
    # Value can be any of: numeric,timestamp,Default value is "numeric"
    tracking_column_type => timestamp
    # record_last_run上次数据存放位置;
    last_run_metadata_path => "mysql/last_id.txt"
    # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
    clean_run => false
    # 同步频率(分 时 天 月 年),默认每分钟同步一次; 定时任务中的 corn 表达式
    schedule => "* * * * *"
  }
}

2.2 配置同步 ES

output {
  elasticsearch {
  	# 作为数组可以存储集群数据
    hosts => ["http://localhost:9200"]
    # 索引名字
    index => "blog"
    # index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    # 数据的唯一索引,就是你查询的表的主键或者一个唯一 ID,自动替换为 ES 的 _id 字段
    document_id => "%{blog_id}"
  }
  stdout {
	  codec => json_lines
  }
}

2.3 重新启动

可以看到 MySQL 数据库中的内容已经同步过来了

三、下一步更新计划

可能考虑做一下 ELK 做日志系统吧

相关文章