scala—尝试在patternstream上选择时出现“输入不匹配:应为元组类型”

30byixjq  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(435)

我在测试新的Flink1.0.0功能时遇到了一些问题。我一直在修补cep,但还没有运行一个简单的演示代码:

  1. val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
  2. val patternStream = CEP.pattern(stream.javaStream, pattern);
  3. class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
  4. override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
  5. pattern.get("start")
  6. }
  7. }
  8. val alerts = patternStream.select(new MyPatternSelectFunction())

代码编译得很好,maven没有显示任何警告。trafficevent是一个只有几个简单字段的类,stream是该类的scala数据流。当代码在flink上运行时,错误就会出现。它运行一秒钟,然后代码退出并显示以下错误消息:
程序已完成,但出现以下异常:

  1. Input mismatch: Tuple type expected.
  2. org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
  3. org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
  4. org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
  5. com.demo.DemoTraffic$.main(DemoTraffic.scala:311)

我试图通过构建这样一个静态类,将功能转移到java中(也许从scala调用api时会遇到一些奇怪的问题):

  1. public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
  2. Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
  3. PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
  4. DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
  5. @Override
  6. public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
  7. return pattern.get("start");
  8. }
  9. });
  10. return rvalue;
  11. }

但结果完全相同,并且在patternstream.select行中抛出相同的错误。关于我可以尝试什么或我做错了什么有什么提示吗?正如你所看到的,这个模式非常愚蠢,它只用于测试purpouse。它只接受所有事件,并返回该事件作为响应。flink是1.0.0,使用scala 2.10版本。
谢谢

xsuvu9jc

xsuvu9jc1#

我想 TrafficEvent 是一个scala案例类。cep库是为flink的javaapi编写的,因此还不支持scala case类。
作为一种变通方法,您可以将case类转换为普通的scala类。
还有一个jira票证跟踪cepscalaapi的开发。

相关问题