使用logstash将整个数据库保存到elasticsearch

hujrc8aj  于 2021-06-20  发布在  Mysql
关注(0)|答案(1)|浏览(386)

我是麋鹿的新手,目前正在做的是:
使用 configuration.conf 在logstash中设置jdbc的文件(输入>过滤>输出)
对于每个 MySQL 具有独立属性的查询 input{} 在logstash配置文件中
或使用 pipilines.yml 使separat config文件在单独的线程中运行(即每个mysql查询都有(存储在)不同的配置文件中)
运行命令 logstash -f config.conf (Windows)或者只是 logstash 对于管道
如何使用logstash获取数据库的所有表,并将它们索引到每个一次性,其中每个表的索引与mysql数据库(windows)中的表名同名。我可以以show tables的形式运行查询,获取list并使用for循环,为每个表定义.conf并将它们保存为.conf文件吗?但我该如何修改.yml文件呢?因为文件是.conf和.yml,而不是.py?
日志存储配置文件映像

czq61nw1

czq61nw11#

官方文档称“每个查询都必须有单独的jdbc”:配置多个sql语句
剧本如下:
gettablenames.py文件

import MySQLdb

# custom made class, Generate

from package_name.generate_conf_yml_logstash import Generate
connection = MySQLdb.connect(host="localhost:3306",
                              user="root/sa", password="password", db="database_name")
cursor = connection.cursor()
cursor.execute("show tables")
tables = cursor.fetchall()
application_name_tables = []
for table in tables:
    application_name_tables.append(table[0])
cursor.close()
connection.close()
Generate.save_files(application_name_tables)

生成\u conf \u yml \u logstash.py

import sys
import os.path

class Generate:
    @staticmethod
    def save_files(tables):

        # save config files
        save_path = "D:\folder_name"
        for table in tables:
            init_f = save_path + "\initial_logstash.conf"
            conf_f_name = table + ".conf"
            save_file = os.path.join(save_path, conf_f_name)
            with open(init_f, "r") as original:  # read only
                data = original.read()
                data = data.replace("table_name", table)
            with open(save_file, "w+") as conf:  # overwrite if exist
                conf.write(data)

        # save yml file
        yml_f_name = "init_logstash_pipelines.yml"
        save_file = os.path.join(save_path, yml_f_name)
        with open(save_file, "r") as original:
            data = original.read()
        with open(save_file, "a+") as yml:  # append
            for table in tables:
                data = data.replace("table", table)
                yml.write(data)
        sys.exit()

配置和yml文件示例如下:
初始化日志存储管道.yml

- pipeline.id: table
    path.config: "../config/table.conf"
    pipeline.workers: 1

初始\u logstash.conf

input {
  jdbc {
    jdbc_driver_library => "../logstash-6.3.0/logstash-core/lib/jars/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://Mysql:3306/database_name"
    jdbc_user => "root"
    jdbc_password => "password"
    statement => "SELECT * from table_name"
    # schedule => "* * * * *"
  }
}
output {
    stdout { codec => json_lines }
    elasticsearch {
    hosts => ["localhost:9200"]
    index => "table_name"
    # as every table has diff. primary key, change this please
    document_id => "%{pk}"
    }
}

请随时更改初始配置,yml文件和代码根据您的需要

相关问题