Kafka:如何执行检查和单位转换?

bqjvbblv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(196)

我有一个Kafka蒸汽执行货币和地理聚合,与特定的时间。
当某一特定货币的总和发生变化时,Kafka制作者会发送新的金额。
英国、法国、意大利都收到了这笔款项,但有些国家的目标货币有所不同。
交易以不同的金额进行(例如,欧元移到英国,需要转换),因此,在对英国进行汇总之前需要进行转换(欧元需要转换成英镑)。
具体如下:

private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}

然而,并非所有针对英国的货币数据都以英镑为单位。因此,我想进行检查
“我的金额是英镑吗?”
如果“是”我转换,我“否”我不转换。
我也想确定时间是正确的,如果不转换成英国时间。
我怎么能这么做?
我能在同一个班做这个吗?
我是否应该反过来为每个国家创建一个特定的生产者/消费者?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题