Kafka 如何在单元测试中使用spring-cloud-stream的输出绑定?

nbysray5  于 2023-01-08  发布在  Apache
关注(0)|答案(1)|浏览(113)

我正在使用spring cloud stream修改一个Kafka Topic,并使用.toTable()将结果数据写入一个表中。在application.yaml中,我将设置输入和输出绑定。这在Kafka集群上工作正常,但在我当前的测试设置中不工作。

@Configuration
public class ObjectTopology {

    @Bean
    public static Serde<Object> objSerde() {
        return new ProtobufSerde<>(Object.parser());
    }

    @Bean
    public Function<KStream<String, Object>, KTable<String, Object>> obj() {

        return objKStream -> objKStream
                .transform((TransformerSupplier<String, Object, KeyValue<String, Object>>) SomeTransformer::new)
                .toTable();
    }
}

application.yaml:

spring.cloud.stream.bindings:
    obj-in-0:
        destination: input-name
    obj-out-0:
        destination: output-name

在下面的代码中,我如何访问由“toTable”生成的KTable?有没有办法在我的单元测试中使用spring-cloud-stream绑定?

ObjectTopology objectTopology = new ObjectTopology();
StreamsBuilder streamsBuilder = new StreamsBuilder();

Serde<String> keySerde = Serdes.String();
Serde<Object> valueSerde = objSerde();

KStream<String, Object> objKStream = streamsBuilder.stream("input-topic-name", Consumed.with(keySerde, valueSerde));

objectTopology.obj().apply(objKStream);

TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build());

TestInputTopic<String, Object> objTestInputTopic = topologyTestDriver.createInputTopic("input-topic-name", keySerde.serializer(), valueSerde.serializer());
KeyValueStore<String, Object> objStore = topologyTestDriver.getKeyValueStore("???"); // I would like to use the name defined by the output binding in application.yaml "output-name"

Object object = createObject();

objTestInputTopic.pipeInput("elem_0", object);

Object result = objStore.get("elem_0");
assertThat(result).isEqualTo(object);
tsm1rwdh

tsm1rwdh1#

toTable创建了一个内部主题。您可能应该使用topologyTestDriver.createOutputTopic来读取它。
对于命名,有https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/kstream/KStream.html#toTable--

相关问题