为什么这个processwindowfunction总是计算每个传入元素而不是一个窗口的所有元素?

zsbz8rwp  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(368)

我正在努力建立一个apache flink流作业,它计算非常简单的物联网数据。它使用rabbitmq的(source),因此使用rmqsource。这很好,而且对这些数据的解析也很好。
然而,对于这个解析的数据流(triplet'string,double,long'(sensorid,value of'pm2,5',timestamp)类型),之后应用的函数似乎很奇怪。
首先,我想在传感器上设置流的密钥。
第二,我想创建一个窗口,其中包含每10秒或15秒根据id设置关键帧的所有元素。
第三,应该在这个窗口上执行一个非常基本的ProcessWindow函数,它只计算该窗口中的元素基本上,就像文档中的例子。
最后,processwindowfunction的输出应该打印到std.out。
你可以看到下面的相关部分。我使用jmeter和mqtt以及kafkameter插件发送测试数据,一次发送大约50个请求,然后看看会发生什么。
当我发送10个请求时,结果如下所示:

  1. nope :(
  2. nope :(
  3. nope :(
  4. nope :(
  5. nope :(
  6. nope :(
  7. nope :(
  8. nope :(
  9. nope :(
  10. nope :(

对于我的逻辑,这意味着processwindowfunction为每个值计算,而不是为一个窗口计算一次。
我现在的问题是:
有什么我不知道的吗?为什么它不只计算一次函数并输出一个窗口中的元素数呢(我已经尝试了几十种不同的版本,通读了所有相关的stackoverflow帖子,阅读了大量不同的教程,等等,都没有效果)。
打印到std.out是否与此有关(也尝试将其写入文件=>相同的行为)?
我也试过用这个 .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 希望它能工作,但它没有。
非常感谢你的帮助。

  1. extractedDataStream
  2. .keyBy(t -> t.getValue0()) // keyed by sensor IDs
  3. //.timeWindow(Time.seconds(10))
  4. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  5. .process(new DetectTooHighAirPollution())
  6. .print();
  7. // execute program
  8. env.execute("MQTT Detection StreamingJob");
  9. }
  10. public static class DetectTooHighAirPollution
  11. extends ProcessWindowFunction<Triplet<String, Double, Long>, String, String, TimeWindow> {
  12. @Override
  13. public void process(String key, Context context, Iterable<Triplet<String, Double, Long>> input, Collector<String> out) throws IOException {
  14. long count = 0;
  15. for (Triplet<String, Double, Long> i : input) {
  16. count++;
  17. }
  18. if (count > 1) {
  19. out.collect("yap :D!: " + count);
  20. } else {
  21. out.collect("nope :(");
  22. }
  23. }
  24. }
  25. }
  26. }

为了完整性,代码的其余部分将执行它应该执行的操作:
ps:我正在发送mqtt消息,有效负载是json对象,我现在“手动”解析它。
pps:已删除配置详细信息。

  1. import org.apache.flink.api.common.functions.RichMapFunction;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.TimeCharacteristic;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
  11. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
  12. import org.apache.flink.util.Collector;
  13. import org.javatuples.Triplet;
  14. import java.io.IOException;
  15. public class StreamingJob {
  16. public static void main(String[] args) throws Exception {
  17. // set up the streaming execution environment
  18. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  20. //env.setParallelism(1);
  21. // Set up a configuration for the RabbitMQ Source
  22. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  23. .setHost("")
  24. .setPort()
  25. .setUserName("")
  26. .setPassword("")
  27. .setVirtualHost("")
  28. .build();
  29. // Initiating a Data Stream from RabbitMQ
  30. final DataStream<String> RMQstream = env
  31. .addSource(new RMQSource<String>(
  32. connectionConfig, // config for the RabbitMQ connection
  33. "", // name of the RabbitMQ queue to consume
  34. false, // use correlation ids; can be false if only at-least-once is required
  35. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  36. .setParallelism(1); // parallel Source
  37. //Extraction of values of the Data Stream
  38. final DataStream<Triplet<String, Double, Long>> extractedDataStream = RMQstream.map(
  39. new RichMapFunction<String, Triplet<String, Double, Long>>() {
  40. @Override
  41. public Triplet<String, Double, Long> map(String s) throws Exception {
  42. // Extract the payload of the message
  43. String[] input = s.split(",");
  44. // Extract the sensor ID
  45. String sensorID = input[1];
  46. String unformattedID = sensorID.split(":")[1];
  47. String id = unformattedID.replaceAll(" ", "");
  48. // Extract longitude
  49. String sensorLONG = input[2];
  50. String unformattedLONGTD = sensorLONG.split(":")[1];
  51. String longtd = unformattedLONGTD.replaceAll(" ", "");
  52. // Extract latitude
  53. String sensorLAT = input[3];
  54. String unformattedLATD = sensorLAT.split(":")[1];
  55. String latd = unformattedLATD.replaceAll(" ", "");
  56. // Extract the particulate matter
  57. String sensorPM2 = input[6];
  58. String unformattedPM2 = sensorPM2.split(":")[1];
  59. String pm2String = unformattedPM2.replaceAll("[ }]+", "");
  60. double pm2 = Double.valueOf(pm2String).doubleValue();
  61. long ts = System.currentTimeMillis();
  62. Triplet<String, Double, Long> sensorData = Triplet.with(id, pm2, ts);
  63. return sensorData;
  64. }
  65. }
  66. );

再次感谢,希望有人经历过这一点,或者可以只是指出(可能是明显的)错误,我正在做。

更新21.11.2019

我找到了解决问题的办法。我误解了“键控流”的概念。对于我的用例来说,在应用processwindowfunction时从窗口中获取单个值的结果,我根本不需要“keyedstreams”。相反,我必须使用以下代码:
在我的例子中,'.keyby'确实为每个sensorid构造了一个窗口。因此,当100个传感器(100个不同的ID)在很短的时间跨度(毫秒)内发送请求时,我得到100个窗口和100个processwindowfunction结果。
这不是我想要的,所以我必须使用“.windowall”操作来获得一个包含流中所有元素的窗口。后来我不得不应用“processallwindowfunction”而不是“processwindowfunction”,瞧:它成功了d

  1. ...
  2. extractedDataStream
  3. //.filter(t -> t.getValue1() > 30) //This is just a use-case specific => 71/100 sensor requests have a higher value than 30.
  4. .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)))
  5. .process(new DetectTooHighAirPollution())
  6. .print();
  7. ...
  8. public static class DetectTooHighAirPollution
  9. extends ProcessAllWindowFunction<Triplet<String, Double, Long>, String, TimeWindow> {
  10. @Override
  11. public void process(Context context, Iterable<Triplet<String, Double, Long>> input, Collector<String> out) throws IOException {
  12. long count = 0;
  13. for (Triplet<String, Double, Long> i : input) {
  14. count++;
  15. }
  16. if (count >= 10) {
  17. out.collect(count + " Sensors, report a too high concentration of PM2!");
  18. } else {
  19. out.collect("Upps something went wrong :/");
  20. }
  21. }
  22. }

干杯!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题