groupbykey创建重分区主题,即使没有密钥更改

rryofs0p  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(321)

我尝试在kafka streams(kafka 1.0.1)和spring cloud stream(2.0.0-build-snapshot)的帮助下实现一个简单的事件源服务。我的streamlistener方法只是读取与我的聚合的状态更改相对应的kstream事件,并将它们应用于聚合,并将最新状态保存在本地状态存储(kafka提供的状态存储)中。域事件消息也具有与聚合的uuid(字符串)相同的键。代码如下:

@StreamListener(Channels.EVENTS_INPUT_CHANNEL)
public void listen(KStream<String, DomainEvent> stream) {
    Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
    Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
    stream
        .groupByKey(Serialized.with(Serdes.String(), domainEventSerde))
        .aggregate(
                Slot::new, 
                (s, domainEvent, slot) -> slot.handle(domainEvent),
                Materialized.<String, Slot, KeyValueStore<Bytes, byte[]>>
                as(Repository.SNAPSHOTS_FOR_SLOTS)
                    .withKeySerde(Serdes.String()).withValueSerde(slotSerde)
        );
}

上面的代码生成一个changelog主题(如预期的那样):slot service slots changelog。尽管它也创建了一个重新分区主题:插槽服务插槽重新分区。两个主题似乎有完全相同的消息(键和值)。我的理解是,如果没有对流执行键修改操作,则不需要重新分区。我是不是漏了什么?
更新:这可能不再需要了,因为sobychacko已经提供了解释,但是我确实尝试过不使用云流绑定,如下面所示,并且它没有创建重新分区主题:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfiguration {

    @Bean
    KafkaTemplate<String, DomainEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    ProducerFactory<String,DomainEvent> producerFactory() {
        return new DefaultKafkaProducerFactory<>(config());
    }

    private Map<String, Object> config() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return config;
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    StreamsConfig streamsConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "slot-service");
        return new StreamsConfig(config);
    }

    @Bean
    KTable<String, Slot> kTable(KStreamBuilder builder) {
        Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
        Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);

        return
                builder
                .stream(Serdes.String(), domainEventSerde, Repository.SLOT_EVENTS)
                .groupByKey(Serdes.String(), domainEventSerde)
                .aggregate(
                    Slot::new, 
                    (s, domainEvent, slot) -> slot.handle(domainEvent),
                    slotSerde,
                    Repository.SNAPSHOTS_FOR_SLOTS);
    }

    }

此外,生产商如下:

@Autowired
    public Repository(KafkaTemplate<String, DomainEvent> kafkaTemplate, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
        this.kafkaTemplate = kafkaTemplate;
        this.kStreamBuilderFactoryBean = kStreamBuilderFactoryBean;
    }

    public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(
            domainEvent -> kafkaTemplate.send(SLOT_EVENTS, domainEvent.aggregateUUID().toString(),domainEvent) 
        );
        slot.flushEvents();
    }

更新2:
以下是云流的生产者代码:

public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(domainEvent -> channels.eventsOutputChannel().send(MessageBuilder.withPayload(domainEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, slot.getUuid().toString()).build()));
        slot.flushEvents();
    }
4ioopgfo

4ioopgfo1#

在调用方法之前有一个map()操作,我们在其中执行入站反序列化(我假设在上面的示例中禁用了本机反序列化)。正如马蒂亚斯指出的,如果 map() 操作,该操作设置一个标志并在随后的 groupByKey() 将创建重新分区主题。所以,这是在你的情况下可能发生的事情,框架就是这样做的 map 操作作为入站邮件转换的一部分。如果确实要避免创建此重新分区主题,可以启用 nativeDecoding 然后使用 Serde 由Kafka提供。这条路 map 框架不调用操作。问题是 JsonSerde 在spring cloud stream中,代码中使用的属性不容易用作serde属性,因为它需要类信息。在下一个版本的springcloudstream中,我们将改进这种情况。同时,您可以提供一个定制的serde。希望这有帮助。

相关问题