apache flink:allwindowed stream上的窗口函数-结合kafka主题

7gcisfzg  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(371)

我正在尝试使用一个kafka使用者在一个主题列表上组合两个kafka主题,进一步将流中的json字符串转换为pojo。然后,通过keyby(on event time field)将它们合并为一个fat json,我计划使用一个窗口流并在窗口流上应用一个窗口函数。假设topic-a和topic-b可以在事件时间连接起来,并且只有一对(topica(json)、topic b(json)会出现在同一个eventtime中。因此,他计划在eventtime上使用coutwindow(2)post keyby。
我有两个同样的问题;
这种方法适合合并主题和创建单个json吗?
所有窗口流上的窗口函数似乎都不能正常工作;任何提示都将不胜感激。
代码段:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");

List < String > names = new ArrayList < > ();

names.add("Topic-A");

names.add("Topic-B");

DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {

public void invoke(String value) throws Exception {

  // Yet to be implemented - Merge two POJO into one 

 }

});

try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);

 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {

   System.out.println(value);

   obj = mapper.readValue(value, TopicPojo.class);

  } catch (JsonParseException e) {

   // TODO Auto-generated catch block

   throw new IOException("Failed to deserialize JSON object.");

  } catch (JsonMappingException e) {

   // TODO Auto-generated catch block

   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {

   // TODO Auto-generated catch block

   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

}

我得到-
allwindowedstream类型中的方法apply(allwindowfunction)不适用于参数(mywindowfunction)错误。

m528fe3b

m528fe3b1#

allwindowedstream是无键流,因此allwindowedstreams的apply方法没有键参数。因为您正在对一个键控流进行窗口化,所以您的数据窗口应该是一个键控流。

相关问题