apache-kafka Kafka连接RabbitMQ时无法使用插入字段转换:[字段插入]仅支持Struct对象,找到:[B

62lalag4  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(108)

我正在尝试使用InsertField kafka连接转换和rabbitmq连接器。我的配置:

"config": {
        "connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
        "confluent.topic.bootstrap.servers": "kafka:29092",
        "topic.creation.default.replication.factor": 1,
        "topic.creation.default.partitions": 1,
        "tasks.max": "2",
        "kafka.topic": "test",
        "rabbitmq.queue": "events",
        "rabbitmq.host": "rabbitmq",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "transforms": "InsertField",
        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertField.static.field": "MessageSource",
        "transforms.InsertField.static.value": "Kafka Connect framework"
    }

我也尝试过使用BytesArrayConverter作为值。使用python,我发送了如下消息:

msg = json.dumps(body)
        self.channel.basic_publish(exchange="", routing_key="events", body=msg)

其中,使用encode()将其转换为字节数组并不奏效。我收到的异常是:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field insertion], found: [B
    at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
    at org.apache.kafka.connect.transforms.InsertField.applyWithSchema(InsertField.java:162)
    at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:133)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more

我理解这个错误,并认为使用JsonConverter可以解决它,但是我错了。我也使用过"value.converter.schemas.enable" : "false",但没有效果。我将感谢任何帮助。我不介意以json格式或字节格式发送数据,我只想在事件中添加一个键:值对。谢谢

4uqofj5v

4uqofj5v1#

如错误所示,您只能将字段插入到结构中。要从RabbitMQ String/Bytes架构中获取Struct,您必须在InsertField转换之前链接HoistField转换。
要从JSONConverter获取任何结构,JSON需要两个名为schemapayload的顶级字段,然后连接器需要

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"

https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
或者,使用Kafka标头获取“源”信息,而不是尝试将其注入值中

相关问题