我正在尝试使用InsertField
kafka连接转换和rabbitmq连接器。我的配置:
"config": {
"connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"confluent.topic.bootstrap.servers": "kafka:29092",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"tasks.max": "2",
"kafka.topic": "test",
"rabbitmq.queue": "events",
"rabbitmq.host": "rabbitmq",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "MessageSource",
"transforms.InsertField.static.value": "Kafka Connect framework"
}
我也尝试过使用BytesArrayConverter
作为值。使用python,我发送了如下消息:
msg = json.dumps(body)
self.channel.basic_publish(exchange="", routing_key="events", body=msg)
其中,使用encode()
将其转换为字节数组并不奏效。我收到的异常是:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion], found: [B
at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
at org.apache.kafka.connect.transforms.InsertField.applyWithSchema(InsertField.java:162)
at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:133)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
我理解这个错误,并认为使用JsonConverter
可以解决它,但是我错了。我也使用过"value.converter.schemas.enable" : "false"
,但没有效果。我将感谢任何帮助。我不介意以json格式或字节格式发送数据,我只想在事件中添加一个键:值对。谢谢
1条答案
按热度按时间4uqofj5v1#
如错误所示,您只能将字段插入到结构中。要从RabbitMQ String/Bytes架构中获取Struct,您必须在InsertField转换之前链接HoistField转换。
要从JSONConverter获取任何结构,JSON需要两个名为
schema
和payload
的顶级字段,然后连接器需要https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
或者,使用Kafka标头获取“源”信息,而不是尝试将其注入值中