通量风暴配置中的lambda函数

63lcw9qa  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(255)

我需要将lambda函数作为参数传递给fluxyaml配置文件中的构造函数或方法(由apachestorm使用)。
例如,假设我想创建 org.apache.storm.kafka.spout.ByTopicRecordTranslator . 它有一个具有以下签名的构造函数:

public ByTopicRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields)

在java中,我会这样示例化它:

ByTopicRecordTranslator<String, String> translator = new ByTopicRecordTranslator<>((K) -> new Values(K.value()), new Fields("oneOrMoreStrings"));

但我似乎不知道如何在fluxyaml配置文件中这样做。以下是我尝试的失败代码:

- id: "field"
    className: "org.apache.storm.tuple.Fields"
    constructorArgs:
      - ["oneOrMoreStrings"]

  - id: "KafkaMessageTranslator"
    className: "org.apache.storm.kafka.spout.ByTopicRecordTranslator"
    constructorArgs:
      - (K) -> new Values(K.value())
      - ref: "field"

以下是我收到的错误:

Exception in thread "main" java.lang.IllegalArgumentException: Couldn't find a suitable constructor for class 'org.apache.storm.kafka.spout.ByTopicRecordTranslator' with arguments '[(K) -> new Values(K.value()), [oneOrMoreStrings]]'.
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:358)
    at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:421)
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:101)
    at some.package.KafkaConsumerTestTopology.main(KafkaConsumerTestTopology.java:20)

我在文件里找不到关于它的任何东西。我很确定这和我如何格式化lambda有关。
如何将lambda函数作为参数传递给fluxyaml配置文件中的构造函数或方法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题