我在和萨姆扎处理Kafka主题的信息。有些消息将来会带有时间戳,我想把处理推迟到时间戳之后。同时,我想继续处理其他传入的消息。
我想做的就是 Task
对消息进行排队并实现 WindowableTask
定期检查消息的时间戳是否允许处理它们。基本思路如下:
public class MyTask implements StreamTask, WindowableTask {
private HashSet<MyMessage> waitingMessages = new HashSet<>();
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBeforeNow()) {
// Do the processing
} else {
waitingMessages.add(parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
for (MyMessage message : waitingMessages) {
if (message.getValidFromDateTime().isBeforeNow()) {
// Do the processing and remove the message from the set
}
}
}
}
这显然有一些缺点。当我重新部署我的任务时,我会在内存中丢失等待的消息。所以我想知道用samza延迟消息处理的最佳实践。我是否需要一次又一次地将消息重新发送到同一主题,直到我最终能够处理它们?我们说的是延迟处理几分钟,最多1-2个小时。
2条答案
按热度按时间8ehkhllq1#
在处理消息队列时,需要记住的一点是,它们在系统中执行非常特定的功能:当处理器忙于处理前面的消息时,它们保存消息。正常运行的消息队列将按需传递消息。这意味着一旦消息到达队列的头部,队列上的下一次拉取将产生消息。
请注意,延迟不是等式中可配置的部分。相反,延迟是带有队列的系统的输出变量。事实上,利特尔定律对此提供了一些有趣的见解。
因此,在需要延迟的系统中(例如,连接/等待并行操作完成),您应该考虑其他方法。通常,可查询数据库在这个特定示例中是有意义的。如果您发现自己将消息保留在队列中一段预先设置的时间,那么实际上您是在将消息队列用作数据库—它并不是为提供此功能而设计的。这不仅有风险,而且极有可能损害您的messagebroker的性能。
owfi6suc2#
我认为您可以使用samza的键值存储来保存任务示例的状态,而不是保存在内存中
Set
. 它应该看起来像:如果重新部署任务,samza应该重新创建键值存储的状态(samza将值保存在与键值存储相关的特殊kafka主题中)。当然,您需要为您的商店提供一些额外的配置(例如,在上面的示例中)
messages-store
).您可以在此处阅读有关key value store的信息(对于最新的samza版本):https://samza.apache.org/learn/documentation/0.14/container/state-management.html