复杂的事件处理—我可以在ApacheFlink中打印datasteam< t>的单个元素而不使用内置的print()函数吗

sbdsn5lh  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(416)

我正在尝试打印在flink中检测到的警告值
//为每个匹配的警告模式生成温度警告

  1. DataStream<TemperatureEvent> warnings = tempPatternStream.select(
  2. (Map<String, MonitoringEvent> pattern) -> {
  3. TemperatureEvent first = (TemperatureEvent) pattern.get("first");
  4. return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
  5. }
  6. );
  7. // Print the warning and alert events to stdout
  8. warnings.print();

我得到如下输出(根据eventsource函数的tostring)

  1. Rack id = 99 and temprature = 76.0

有人能告诉我,有没有什么方法可以不使用print打印datastream的值?一个例子是,如果我只想打印温度,如何访问数据流中的各个元素。
提前谢谢

tkclm6bt

tkclm6bt1#

我已经找到了访问单个元素的方法,假设我们有一个数据流

  1. HeartRate<Integer,Integer>

它有两个属性

  1. private Integer Patient_id ;
  2. private Integer HR;

//使用自定义函数生成datasteam

  1. DataStream<HREvent> hrEventDataStream = envrionment
  2. .addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

假设您已经使用custom函数生成了datasteam,现在我们可以打印heartrateevent的各个元素的值,如下所示

  1. hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
  2. @Override
  3. public Integer getKey(HREvent hrEvent) throws Exception {
  4. return hrEvent.getPatient_id();
  5. }
  6. })
  7. .window(TumblingEventTimeWindows.of(milliseconds(10)))
  8. .apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
  9. @Override
  10. public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {
  11. for(HREvent in : iterable){
  12. System.out.println("Patient id = " + in.getPatient_id() + " Heart Rate = " + in.getHR());
  13. }//for
  14. }//apply
  15. });

希望有帮助!

展开查看全部

相关问题