我创建了最简单的kafka接收器连接器配置,并使用confluent 4.1.0:
{
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "test-type",
"tasks.max": "1",
"topics": "dialogs",
"name": "elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"schema.ignore": "true"
}
在主题中,我用json保存消息
{ "topics": "resd"}
但结果我得到了一个错误:
原因:org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错原因:org.apache.kafka.common.errors.serializationexception:未知的魔法字节!
2条答案
按热度按时间niknxzdl1#
发生此错误是因为它试图读取非汇合模式注册表编码的avro消息。
如果主题数据是avro,则需要使用schema注册表。
否则,如果topic数据是json,那么您已经在属性文件的键或值上启动了带有avroconverter的connect集群,您需要使用jsonconverter
7hiiyaii2#
正如cricket\u007所说,如果数据采用的是json反序列化格式,那么需要告诉connect使用json反序列化程序。将此添加到连接器配置中: