我有一个kafkastreams拓扑,其中有一个处理器api。在处理器内部,有一个调用外部api的逻辑。如果api返回503,则需要重试尝试的消息。现在,我尝试将此消息推送到另一个kafka主题&使用“标点”方法每分钟从失败的主题中提取一批消息,然后重试。有没有更好的方法解决这个问题?。
3hvapo4f1#
另一种不同但健壮的方法是使用状态存储。它们被Kafka作为压缩的变更日志主题支持。您可以将失败的消息存储在状态存储中,并通过调用schedule(标点符号)来处理它们,然后删除所有成功处理的消息。例如:
public class MyProcessor { private final long schedulerIntervalMs = 60000; private final String entityStoreName = "failed-message-store"; private KeyValueStore<String, Object> entityStore; @Override public void init(ProcessorContext context) { this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName); context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME, timestamp -> processFailedMessagesStore()); } @Override public void process(String key, Object value) { boolean apiCallSuccessful = // call API if (!apiCallSuccesfull) { entityStore.put(key, value); } } private void processFailedMessagesStore() { try (KeyValueIterator<String, Object> allItems = entityStore.all()) { allItems.forEachRemaining(item -> { boolean successfullyProcessed = // re-process if (successfullyProcessed) { entityStore.delete(item.key); } }); } } }
1条答案
按热度按时间3hvapo4f1#
另一种不同但健壮的方法是使用状态存储。它们被Kafka作为压缩的变更日志主题支持。
您可以将失败的消息存储在状态存储中,并通过调用schedule(标点符号)来处理它们,然后删除所有成功处理的消息。
例如: