kafka(合流平台)输入用于日志存储-中断消息编码

but5z9lq  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(302)

我有一个融合平台(版本4.1.1)。它被配置为从数据库中读取数据。配置如下:

name = source-mysql-requests
connection.url = jdbc:mysql://localhost:3306/Requests
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
connection.user =***
connection.password =***
mode = incrementing
incrementing.column.name = ID
tasks.max = 5
topic.prefix = requests_
poll.interval.ms = 1000
batch.max.rows = 100
table.poll.interval.ms = 1000

我还有一个日志(版本6.2.4)来阅读Kafka的相关主题。其配置如下:

kafka { 
    bootstrap_servers => "localhost:9092" 
    topics => ["requests_Operation"] 
    add_field => { "[@metadata][flag]" => "operation" }
}
output { 
 if [@metadata][flag] == "operation" { 
    stdout { 
        codec => rubydebug 
    } 
 } 
}

当我运行“kafka avro console consumer”进行测试时,会收到以下类型的消息:

{"ID":388625154,"ISSUER_ID":"8e427b6b-1176-4d4a-8090-915fedcef870","SERVICE_ID":"mercury-g2b.service:1.4","OPERATION":"prepareOutcomingConsignmentRequest","STATUS":"COMPLETED","RECEIVE_REQUEST_DATE":1525381951000,"PRODUCE_RESULT_DATE":1525381951000}

但在logstash我有一些可怕的和不可读的东西:

"\u0000\u0000\u0000\u0000\u0001����\u0002Hfdebfb95-218a-11e2-a69b-b499babae7ea.mercury-g2b.service:1.4DprepareOutcomingConsignmentRequest\u0012COMPLETED���X���X"

会出什么问题?

rdrgkggo

rdrgkggo1#

通过更改的配置,可以将kafka connect更改为不使用avro value.converter 以及 key.converter 例如,使用json。
否则,您将需要logstash来了解如何解释模式注册表编码的avro数据,并将其转换为人类可读的格式。
或者,您可以使用connect的elasticsearch或console sink,完全跳过logstash,假设这就是目标
您可以使用connect smt替换logstash add_field : operation 同时配置

相关问题