有没有Kafka连接器,从csv读取并转换成avro,然后再推到主题?我经历了众所周知的事情https://github.com/jcustenborder/kafka-connect-spooldir,但它只是阅读和推到主题。我计划修改我的自定义使用的代码库,但在我作出更改之前,我只是想检查是否已经有这样的连接器可用。
dpiehjr41#
kafka-connect-spooldir 完全按照你描述的做。运行它时,只需将kafka connect设置为使用avro转换器。例如:
kafka-connect-spooldir
"key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081",
看到了吗https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained 有关转换器和连接器的关系的详细信息。根据您的评论进行编辑:当我使用kafka控制台时,我将数据视为 103693(2018-03-11T09:19:17Z Sugar - assa8.7 当我使用kafka avro控制台时,用户格式是 {"order_id":{"string":"1035"},"customer_id":{"string":"93"},"order_ts":{"string":"2018-03-11T09:19:17Z"},"product":{"string":"Sugar - assa"},"order_total_usd":{"string":"8.7"}}. 这表明它是关于你的主题的avro数据。我的意思是 kafka-avro-console-consumer 它对二进制avro数据进行解码并以普通格式呈现。输出来自 kafka-console-consumer 显示了原始avro,其中的一些部分看起来是可读的( Sugar - assa )但其他人显然不是( 103693 )
103693(2018-03-11T09:19:17Z Sugar - assa8.7
{"order_id":{"string":"1035"},"customer_id":{"string":"93"},"order_ts":{"string":"2018-03-11T09:19:17Z"},"product":{"string":"Sugar - assa"},"order_total_usd":{"string":"8.7"}}.
kafka-avro-console-consumer
kafka-console-consumer
Sugar - assa
103693
1条答案
按热度按时间dpiehjr41#
kafka-connect-spooldir
完全按照你描述的做。运行它时,只需将kafka connect设置为使用avro转换器。例如:看到了吗https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained 有关转换器和连接器的关系的详细信息。
根据您的评论进行编辑:
当我使用kafka控制台时,我将数据视为
103693(2018-03-11T09:19:17Z Sugar - assa8.7
当我使用kafka avro控制台时,用户格式是{"order_id":{"string":"1035"},"customer_id":{"string":"93"},"order_ts":{"string":"2018-03-11T09:19:17Z"},"product":{"string":"Sugar - assa"},"order_total_usd":{"string":"8.7"}}.
这表明它是关于你的主题的avro数据。我的意思是kafka-avro-console-consumer
它对二进制avro数据进行解码并以普通格式呈现。输出来自kafka-console-consumer
显示了原始avro,其中的一些部分看起来是可读的(Sugar - assa
)但其他人显然不是(103693
)