apache-kafka Kafka KStream到KStream的连接不适用于Avro SpecificRecords

xdnvmnnf  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(103)

我想加入两个Avro SpecificRecords的KStream。我可以独立处理这两个流,但我不理解用来加入这两个流的代码。以下是我目前为止的代码:

KStream<String, RecordOne> recOneStream = streamsBuilder.stream(recOneTopic, Consumed.with(Serdes.String(), recOneSpecificSerde));
    KStream<String, RecordTwo> recTwoStream = streamsBuilder.stream(recTwoTopic, Consumed.with(Serdes.String(), recTwoSpecificSerde));

    // change key to match key of rec two
    KStream<String, RecordOne> recOneChangedKeyStream = recOneStream.selectKey((k, v) -> v.getKeyValue().toString());

    // folowing works
    recOneChangedKeyStream.peek((k, v) -> System.out.println("Key : " + k + " Value : " + v)); // output is as expected here

    // trying to make following work?
    KStream<String, JoinedRecord> joinedRecord = recOneChangedKeyStream.join(recTwoStream, (recOn, recTwo) -> {
                JoinedRecord jr = new JoinedRecord();
                jr.setFieldOne...
                return jr;
            },
            JoinWindows.of(Duration.ofSeconds(60)),
            // if I add following line the code breaks at compile time, if I don't add it then it breaks at runtime
            Joined.with(Serdes.String(), recOneSpecificSerde, recTwoSpecificSerde)
    );

所以问题出在Joined. with中。我想我正确地遵循了这个例子:confluent-example,因为它也以类似的方式使用Joined.with。但是,在我的示例中,我在IDE中看到了以下异常:

Cannot resolve method 'join(org.apache.kafka.streams.kstream.KStream<java.lang.String, RecordTwo>, <lambda expression>, org.apache.kafka.streams.kstream.JoinWindows, org.apache.kafka.streams.kstream.Joined<K,V,VO>)'

我的计算机上安装的confluent版本是confluent-7.2.2.tar.gz

nzkunb0c

nzkunb0c1#

如下面的屏幕截图所示,文档中声明应使用Joined.with。但在阅读IDE中的错误消息并继续GitHub examples之后,我注意到此文档未更新,因为所需参数的类型为StreamJoined,而不是Joined

相关问题