我正在为一个个人项目试验Apache Flink,我一直在努力将结果流输出到StdOut并将其发送到Kafka主题orders-output
。
我的目标是在3分钟的滚动时间窗口内计算每个产品的价格字段的总和。基本上,Apache flink作业,从两个Kafka源流(orders-a
和orders-b
)接收JSON格式的字符串订单,将它们连接在一起,获得形状为(product_name, product_price (double)
的元组,然后按产品对其进行分组,应用3分钟的滚动窗口,并使用ReduceFunction
计算该窗口中每个产品的价格总和。代码如下:
FlinkKafkaConsumer<String> consumerA = new FlinkKafkaConsumer<>("orders-a", new SimpleStringSchema(), props);
FlinkKafkaConsumer<String> consumerB = new FlinkKafkaConsumer<>("orders-b", new SimpleStringSchema(), props);
DataStream<String> streamA = env.addSource(consumerA);
DataStream<String> streamB = env.addSource(consumerB);
DataStream<Tuple2<String,Double>> mergedOrders = streamA
.union(streamB)
.map(new MapFunction<String, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(String s) throws Exception {
return DataHelper.getTuple(s);
}
});
DataStream<Tuple2<String, Double>> totals = mergedOrders
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Double>>() {
public Tuple2<String, Double> reduce(Tuple2<String, Double> v1, Tuple2<String, Double> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
DataStream<String> result = totals.map(new MapFunction<Tuple2<String, Double>, String>() {
@Override
public String map(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
LOG.info(stringDoubleTuple2.toString());
return stringDoubleTuple2.toString();
}
});
result.print();
result.addSink(new FlinkKafkaProducer<>("orders-output", new SimpleStringSchema(), props));
env.execute("Product revenues per 3m");
这段代码完成了前面所描述的工作,DataHelper只是一个自定义助手类,我创建它来帮助我将接收到的订单从JSON字符串转换为Tuple2和其他类型。作业启动后运行良好(我在本地运行所有内容),我甚至可以看到数据在Flink UI中接收(见下图)。
问题是,我在StdOut(在终端和Flink UI中)和Kafka输出主题中都没有看到任何结果(我在另一个终端中独立启动了orders-output
的消费者,但我没有收到任何结果)。
我会很感激你的帮助,因为我已经被困在这两天了。
2条答案
按热度按时间xurqigkl1#
我可能不会回答你的问题,但我可能会帮助你找到问题所在。
首先,FlinkKafkaProducer和FlinkKafkaConsumer已弃用,请使用KafkaSink + KafkaSource。第二,我没有看到使用指定的时间策略(事件或处理),但也许不必明确说明(不确定我只使用事件时间)。
关于问题:您可以清楚地看到,数据正在进入您的最后一个操作符,该操作符执行窗口,Map和下沉。如果你想确定这些函数中的哪个函数有问题,你可以进行自定义链接,并将每个函数Map到一个独立的操作符(请参阅https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups)。
我可以看到,你添加了日志到最后的Map功能之前沉没,但你看不到任何正在记录。如果你的日志配置是正确的,那么你知道,数据被卡在窗口函数中。我想到的唯一原因是,为什么这会是一个问题,因为Flink的时间不会提前,所以它不会关闭任何窗口,所以数据不会被处理。您可以在管道中添加更多函数,只是出于调试的原因,这样您就可以记录管道中的实际时间并查看数据的位置(或使用那些自定义链)。
你可以在窗口之前的keyby函数之后添加函数,在窗口之后你已经记录了元素。通过这种方式,您可以确定数据到达但不进一步进行的确切位置。比Flink的时间更长。您可以尝试创建一些计时器并重写onTimer方法,以查看时间是否向前移动。
最后要添加的-您可以检查操作员的指标。它可以通过Web UI访问。检查numRecordsOut(或类似的东西)的最后一个操作符,看看它是否工作。BytesSent对于接收操作符将为零,因为它不是“发送”数据到下一个操作符,而是接收它们
q3qa4bjr2#
您正在使用事件时间窗口,但我不知道您在哪里设置水印策略(它还为您的记录分配时间戳)。如果传入的记录没有任何时间戳,则需要使用处理时间窗口。
另外,正如Dominik21所指出的,您应该使用较新的KafkaSource和KafkaSink。