kafka connect filestreamsink连接器删除了引号,并将json消息的冒号改为等号

kcugc4gi  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(360)

摘要

当我和游戏制作人进行流媒体时

{"id":1337,"status":"example_topic_1 success"}

我从我的文件流消费者那里得到这个

/数据/示例\u主题\u 1.txt

{id=1337, status=example_topic_1 success}

这对我来说是个大问题,因为如果不对引号的位置进行假设,就无法恢复原始的json消息。如何将消息输出到文件中,同时保留引号?

详细信息

首先,启动文件接收器连接器。


# sh bin/connect-standalone.sh \

>   config/worker.properties \
>   config/connect-file-sink-example_topic_1.properties

其次,我启动console consumer(也内置于kafka中),这样我就可以轻松直观地确认消息是否正确通过。


# sh bin/kafka-console-consumer.sh \

>   --bootstrap-server kafka_broker:9092 \
>   --topic example_topic_1

最后,我启动一个控制台生成器来发送消息,然后输入一条消息。


# sh bin/kafka-console-producer.sh \

>   --broker-list kafka_broker:9092 \
>   --topic example_topic_1

从控制台使用者,消息正确弹出,并带有引号。

{"id":1337,"status":"example_topic_1 success"}

但我从我的FileStreamLink消费者那里得到了这样的信息:

/数据/示例\u主题\u 1.txt

{id=1337, status=example_topic_1 success}

我的配置

配置/worker.properties

offset.storage.file.filename=/tmp/example.offsets

bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

config/connect-file-sink-example\主题\ 1.properties

name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1
qoefvg9y

qoefvg9y1#

由于您实际上并不想解析json数据,而是直接将其作为一个文本块传递,因此需要使用stringconverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

本文详细介绍了转换器的细微差别:https://rmoff.net/2019/05/08/when-a-kafka-connect-converter-is-not-a-converter/. 这显示了一个示例,说明您正在尝试做什么,尽管使用 kafkacat 代替控制台生产者/消费者。

相关问题