查找KafkaStream(KStream)对象中的最小/最大值

b4qexyjb  于 2022-12-26  发布在  Apache
关注(0)|答案(1)|浏览(135)

我有一个Kafka流应用程序和Avro模式的每一个主题,也为关键。关键主题模式是相同的。
现在,有一个KafkaStream(KStream)对象,它以已知的key对象作为key,还有一个value对象(从AvroSchema派生),它扩展了org.apache.avro.specific.SpecificRecordBase,但它可以是我的主题内容的任何avro模式。
KStream<CustomKey, ? extends SpecificRecordBase> myStream = ...
我想要实现的是在这个流上运行min和max函数。问题是我不知道?对象是什么,并且由于有30多个(将来还会增加)主题,我不想做开关用例。所以我有以下内容:

public KStream<CustomKey, ? extends SpecificRecordBase> max(
    final KStream<CustomKey, ? extends SpecificRecordBase> myStream,
    final String attributeName) {

    SpecificRecordBase maxValue = ...;
    myStream.foreach((key, value) -> {
      value.get(attributeName) // I want to find the max value for this attribute, 
                               // but at this point we don't know it's type and
                               // and we can't assign maxValue = value, because this is a lambda 
                               // function.
    });

    // find and return the max value
  }

我的问题是,如何计算myStreamattributeName属性上的最大值?

mf98qq94

mf98qq941#

它可以是主题内容的任何avro模式
然后你需要extends ClassWithMinMaxFields。否则,你将无法从通用的SpecificRecordBase对象中提取它。
另外,你的方法返回一个流,你不能return最小值/最大值,如果这是你的目标,你需要一个普通的消费者来扫描整个主题,从开始到(最终)结束。
要使用Streams API(正确地)执行此操作,您可以
1.我需要为每个值构建一个KTable,按键分组,然后根据需要扫描表以查找最小值/最大值。
1.使用aggregate DSL函数创建一个新主题,初始化为{"min": +Inf, "max": -Inf},然后在新记录上检查新旧记录,如果有新的最小值和/或最大值,则设置它们并返回新记录。然后,您仍然需要外部消费者来获取最近的最小/最大事件。
1.如果您有一致的Avro类型,则可以使用ksqlDB函数

相关问题