我们正在尝试将mqtt源连接器链接到XDB接收器连接器。目前前者运行良好,但后者给出了以下例外情况:
org.apache.kafka.connect.errors.connectexception:由于不可恢复的异常而退出workersinktask。在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:484)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:265)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:182)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:150)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:146)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:190)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)
这是XDB配置文件
connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
connect.influx.url=http://localhost:8086
connect.influx.db=iot
tasks.max=1
topics=simMetrics
connect.influx.kcql=INSERT INTO sensorMetrics SELECT * FROM simMetrics WITHTIMESTAMP sys_time()
name=influxdb-sink
connect.influx.username=""
这是消息结构:
{“timestamp”:1524572345184,“partition”:0,“key”:{“topic”:“machine/sensor/mytopic/test”,“id”:“1”},“offset”:0,“topic”:“simmetrics”,“value”:{“metrics”:{“buzzer”:0,“led”:0,“water”:false,“buzzer\u timestamp”:1524571762798,“temperature\u timestamp”:1524571762816,“water\u timestamp”:1524571762835,“fan”:0,“light”:500,“温度”:27.371554588194957,“资产名称”:“simopcua”,“风扇时间戳”:1524571762791,“灯光时间戳”:1524571762808,“led时间戳”:1524571762827}
mqtt源连接器配置:
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
name=mqtt-source
connect.mqtt.kcql=INSERT INTO simMetrics SELECT * FROM machine/sensor/mytopic/test WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.mqtt.service.quality=1
connect.mqtt.hosts=tcp://192.168.208.203:1884
更新
我们发现问题出在温度值格式上。由于我们没有配置字段的类型,influxdb将温度值理解为double。所有带有小数点分隔符的值都被正确保存,当kafka发送值时出现问题,没有小数部分,忽略了小数点分隔符。我们如何解决这个问题?
ps:实际的解决方法是将所有输入温度加上0.00000001。
暂无答案!
目前还没有任何答案,快来回答吧!