elasticsearchsinkconnector无法将数据反序列化到avro

bxgwgixi  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(446)

我创建了最简单的kafka接收器连接器配置,并使用confluent 4.1.0:

  1. {
  2. "connector.class":
  3. "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  4. "type.name": "test-type",
  5. "tasks.max": "1",
  6. "topics": "dialogs",
  7. "name": "elasticsearch-sink",
  8. "key.ignore": "true",
  9. "connection.url": "http://localhost:9200",
  10. "schema.ignore": "true"
  11. }

在主题中,我用json保存消息

  1. { "topics": "resd"}

但结果我得到了一个错误:
原因:org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错原因:org.apache.kafka.common.errors.serializationexception:未知的魔法字节!

niknxzdl

niknxzdl1#

发生此错误是因为它试图读取非汇合模式注册表编码的avro消息。
如果主题数据是avro,则需要使用schema注册表。
否则,如果topic数据是json,那么您已经在属性文件的键或值上启动了带有avroconverter的connect集群,您需要使用jsonconverter

7hiiyaii

7hiiyaii2#

正如cricket\u007所说,如果数据采用的是json反序列化格式,那么需要告诉connect使用json反序列化程序。将此添加到连接器配置中:

  1. "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  2. "value.converter.schemas.enable": "false",
  3. "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  4. "key.converter.schemas.enable": "false"

相关问题