如何使用mqtt source connect更改消息的密钥

tmb3ates  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(274)

我有一个java应用程序将数据发送到mosquitto代理(mqtt)中,使用kafka connect,我将此数据从mqtt代理发送到kafka主题,但问题是,当mqtt source connect发送数据时,默认情况下,键总是kafka主题的名称,我需要更改它。我使用了smt(单消息转换),我可以更改密钥,但它是用base64编码的,你知道我怎样才能转换并只得到id的值吗?
我的连接器配置与转换:

name=MqttSourceConnector
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.qos=1
tasks.max=2
mqtt.clean.session.enabled=true
mqtt.server.uri=tcp://mosquitto-server:1883
mqtt.connect.timeout.seconds=30
key.converter.schemas.enable=false
value.converter.schemas.enable=false
mqtt.topics=mqtt_topic
mqtt.keepalive.interval.seconds=60
kafka.topic=kafka_topic
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
transforms=createMap,createKey,extractInt
transforms.createMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.createMap.field=id
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractInt.field=id

我的Kafka主题是:

Key { id: eyJpZCI6IlNCMDQiLCJ0ZW1wZXJhdHVyZSI6MjQuOTk2OTQyOTk0NDIyMTM4LCJodW1pZGl0eSI6MzkuNzUyNjQzNTk3MjcyMjYsInRpbWVzdGFtcCI6MTUzMjY4NTEzMTI2Nn0= }
Value { id: SB04, temperature: 24.996942994422138, humidity: 39.75264359727226, timestamp: 1532685131266 }

所以,我只需要将sb04值设置为键。你知道吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题