从kafka ktable中正确删除记录

z18hc3ub  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(337)

我想从ktable中正确删除对象。
我有以下流处理数据:

input
.map(this::deserialize)
.filter(((key, event) ->
    Arrays.asList(
        "AssessmentInitialized",
        "AssessmentRenamed",
        "AssessmentDeleted"
    ).contains(event.eventType())))
.map( ... do mapping  ... )
.groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde))
.aggregate(
    Assessment::new,
    (key, event, assessment) -> assessment.handleEvent(event),
    Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)
        .withKeySerde(Serdes.String())
        .withValueSerde(assessmentSerde)
);

以及 assessment.handleEvent(event) 退货 null 当它处理 AssessmentDeleted 事件。塞德是 this.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper); 其中mapper是默认的com.fasterxml.jackson.databind.objectmapperbean。
此流处理i事件后,我在kafka ktable changelog中看到以下值:

{
"topic": "events-consumer-v1-assessments-store-changelog",
"key": "5d6d7a70-c907-460f-ac88-69705f079939",
"value": "ée",
"partition": 0,
"offset": 2
}

而且它看起来不像我想要的东西。这种删除对象的方法正确吗?我希望如果我将null作为值推送到聚合操作中,它会被删除,但看起来像是留下了一些垃圾,我不确定它的Map程序是否有问题,删除方式是否不正确,ktable行为是否正确。

g2ieeal7

g2ieeal71#

在您的情况下,问题似乎出现在检查工具中。因为某种原因它反序列化了 null 值不正确。先用Kafka工具检查总是好的( kafka-console-consumer.sh , kafka-console-producer.sh ).

相关问题