如何在ApacheFlink中平面Map到数据库?

p8h8hvxi  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(362)

我正在使用apache flink尝试从kafka获取json记录到influxdb,在这个过程中将它们从一个json记录拆分为多个influxdb点。
我找到了 flatMap 变换,感觉它符合目的。核心代码如下所示:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
    @Override
    public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
        Iterator<Entry<String, JsonNode>> iterator = //...

        while (iterator.hasNext()) {
            // extract point from input
            InfluxDBPoint point = //...

            out.collect(point);
        }
    }
});

出于某种原因,我只得到一个收集到的点流到数据库中。
即使我打印出所有Map的条目,它似乎也能正常工作: dataStream.print() 产量:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

我误解了吗 flatMap 或者流入连接器中可能有一些错误?

r7knjye2

r7knjye21#

这个问题实际上与这样一个事实有关:一个序列(由它的标记集和度量定义,如图所示)在流入中一次只能有一个点,因此,即使我的字段不同,最终点也会覆盖所有具有相同时间值的前一个点。

相关问题