kafka mqtt连接器如何将mqtt主题作为密钥发送?

nxagd54h  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(409)

我有一个mqtt代理和一个kafka代理正在运行,我使用了kafka连接器:https://github.com/landoop/stream-reactor,下一个配置:

name=Mqtt-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO test SELECT * FROM + WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=test_mqtt_connector
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://mqtt-broker:1883
connect.mqtt.service.quality=1

在kql中,我定义了kafka应该作为键的消息字段,是否使用mqtt主题作为键?所以我不需要定义 WITHKEY() 在kql中。

xpszyzbs

xpszyzbs1#

我不知道landoop的kql,但是假设主题是消息值的一部分,您可以像这样将其移动到键

transforms=ReplaceKey,ExtractKey
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey

# change the field accordingly

transforms.ReplaceKey.fields=mqtt_topic
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key

# make sure this is the same field as above

transforms.ExtractKey.field=mqtt_topic

如果没有,则可以静态插入它

transforms=AddKey
transforms.AddKey.type=org.apache.kafka.connect.transforms.InsertField$Key

# The exclamation makes this a required field

transforms.AddKey.static.field=mqtt_topic!
transforms.AddKey.static.value="<<your topic name>>"

但是,上述方法可能不适用于 SELECT * FROM + ,从中选择所有mqtt主题

相关问题