我有一个融合平台(版本4.1.1)。它被配置为从数据库中读取数据。配置如下:
name = source-mysql-requests
connection.url = jdbc:mysql://localhost:3306/Requests
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
connection.user =***
connection.password =***
mode = incrementing
incrementing.column.name = ID
tasks.max = 5
topic.prefix = requests_
poll.interval.ms = 1000
batch.max.rows = 100
table.poll.interval.ms = 1000
我还有一个日志(版本6.2.4)来阅读Kafka的相关主题。其配置如下:
kafka {
bootstrap_servers => "localhost:9092"
topics => ["requests_Operation"]
add_field => { "[@metadata][flag]" => "operation" }
}
output {
if [@metadata][flag] == "operation" {
stdout {
codec => rubydebug
}
}
}
当我运行“kafka avro console consumer”进行测试时,会收到以下类型的消息:
{"ID":388625154,"ISSUER_ID":"8e427b6b-1176-4d4a-8090-915fedcef870","SERVICE_ID":"mercury-g2b.service:1.4","OPERATION":"prepareOutcomingConsignmentRequest","STATUS":"COMPLETED","RECEIVE_REQUEST_DATE":1525381951000,"PRODUCE_RESULT_DATE":1525381951000}
但在logstash我有一些可怕的和不可读的东西:
"\u0000\u0000\u0000\u0000\u0001����\u0002Hfdebfb95-218a-11e2-a69b-b499babae7ea.mercury-g2b.service:1.4DprepareOutcomingConsignmentRequest\u0012COMPLETED���X���X"
会出什么问题?
1条答案
按热度按时间rdrgkggo1#
通过更改的配置,可以将kafka connect更改为不使用avro
value.converter
以及key.converter
例如,使用json。否则,您将需要logstash来了解如何解释模式注册表编码的avro数据,并将其转换为人类可读的格式。
或者,您可以使用connect的elasticsearch或console sink,完全跳过logstash,假设这就是目标
您可以使用connect smt替换logstash
add_field : operation
同时配置