我正在使用apache flink尝试从kafka获取json记录到influxdb,在这个过程中将它们从一个json记录拆分为多个influxdb点。
我找到了 flatMap
变换,感觉它符合目的。核心代码如下所示:
DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
@Override
public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
Iterator<Entry<String, JsonNode>> iterator = //...
while (iterator.hasNext()) {
// extract point from input
InfluxDBPoint point = //...
out.collect(point);
}
}
});
出于某种原因,我只得到一个收集到的点流到数据库中。
即使我打印出所有Map的条目,它似乎也能正常工作: dataStream.print()
产量:
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3
我误解了吗 flatMap
或者流入连接器中可能有一些错误?
1条答案
按热度按时间r7knjye21#
这个问题实际上与这样一个事实有关:一个序列(由它的标记集和度量定义,如图所示)在流入中一次只能有一个点,因此,即使我的字段不同,最终点也会覆盖所有具有相同时间值的前一个点。