我尝试在kafka streams(kafka 1.0.1)和spring cloud stream(2.0.0-build-snapshot)的帮助下实现一个简单的事件源服务。我的streamlistener方法只是读取与我的聚合的状态更改相对应的kstream事件,并将它们应用于聚合,并将最新状态保存在本地状态存储(kafka提供的状态存储)中。域事件消息也具有与聚合的uuid(字符串)相同的键。代码如下:
@StreamListener(Channels.EVENTS_INPUT_CHANNEL)
public void listen(KStream<String, DomainEvent> stream) {
Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
stream
.groupByKey(Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
Slot::new,
(s, domainEvent, slot) -> slot.handle(domainEvent),
Materialized.<String, Slot, KeyValueStore<Bytes, byte[]>>
as(Repository.SNAPSHOTS_FOR_SLOTS)
.withKeySerde(Serdes.String()).withValueSerde(slotSerde)
);
}
上面的代码生成一个changelog主题(如预期的那样):slot service slots changelog。尽管它也创建了一个重新分区主题:插槽服务插槽重新分区。两个主题似乎有完全相同的消息(键和值)。我的理解是,如果没有对流执行键修改操作,则不需要重新分区。我是不是漏了什么?
更新:这可能不再需要了,因为sobychacko已经提供了解释,但是我确实尝试过不使用云流绑定,如下面所示,并且它没有创建重新分区主题:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfiguration {
@Bean
KafkaTemplate<String, DomainEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
ProducerFactory<String,DomainEvent> producerFactory() {
return new DefaultKafkaProducerFactory<>(config());
}
private Map<String, Object> config() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
StreamsConfig streamsConfig() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "slot-service");
return new StreamsConfig(config);
}
@Bean
KTable<String, Slot> kTable(KStreamBuilder builder) {
Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
return
builder
.stream(Serdes.String(), domainEventSerde, Repository.SLOT_EVENTS)
.groupByKey(Serdes.String(), domainEventSerde)
.aggregate(
Slot::new,
(s, domainEvent, slot) -> slot.handle(domainEvent),
slotSerde,
Repository.SNAPSHOTS_FOR_SLOTS);
}
}
此外,生产商如下:
@Autowired
public Repository(KafkaTemplate<String, DomainEvent> kafkaTemplate, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
this.kafkaTemplate = kafkaTemplate;
this.kStreamBuilderFactoryBean = kStreamBuilderFactoryBean;
}
public void save(Slot slot) {
List<DomainEvent> newEvents = slot.getDirtyEvents();
newEvents.forEach(
domainEvent -> kafkaTemplate.send(SLOT_EVENTS, domainEvent.aggregateUUID().toString(),domainEvent)
);
slot.flushEvents();
}
更新2:
以下是云流的生产者代码:
public void save(Slot slot) {
List<DomainEvent> newEvents = slot.getDirtyEvents();
newEvents.forEach(domainEvent -> channels.eventsOutputChannel().send(MessageBuilder.withPayload(domainEvent)
.setHeader(KafkaHeaders.MESSAGE_KEY, slot.getUuid().toString()).build()));
slot.flushEvents();
}
1条答案
按热度按时间4ioopgfo1#
在调用方法之前有一个map()操作,我们在其中执行入站反序列化(我假设在上面的示例中禁用了本机反序列化)。正如马蒂亚斯指出的,如果
map()
操作,该操作设置一个标志并在随后的groupByKey()
将创建重新分区主题。所以,这是在你的情况下可能发生的事情,框架就是这样做的map
操作作为入站邮件转换的一部分。如果确实要避免创建此重新分区主题,可以启用nativeDecoding
然后使用Serde
由Kafka提供。这条路map
框架不调用操作。问题是JsonSerde
在spring cloud stream中,代码中使用的属性不容易用作serde属性,因为它需要类信息。在下一个版本的springcloudstream中,我们将改进这种情况。同时,您可以提供一个定制的serde。希望这有帮助。