我正在使用pykafka来消费消息,现在我正在使用balanced\u consumer来消费来自一个主题的消息。现在我必须使用来自另一个主题的消息,如果可能的话,可以优先使用来自不同主题的消息。我如何处理这个问题?可能还有其他python库吗?
ih99xse11#
我刚发了一篇关于这个问题的帖子。即使我使用的是java,您也会发现这里描述的概念对您的案例很有用。我们解决Kafka主题优先问题的方法是-我们开发了一种机制来优先考虑Kafka主题的消费。这样的机制将检查我们是要处理从kafka使用的消息,还是保留处理以备以后使用。我们在分区和布尔之间进行Map,如果需要的话,它会阻止每个分区的使用,topicpartitionlocks。封锁初步的,而继续消费从迟到的,创造了优先主题。timertask更新这个Map,我们的消费者检查他们是否被“允许”消费或者必须等待——正如您在WaitForLatePartitioniFeeded方法中看到的那样。
public class Prioritizer extends TimerTask { private Map<String, Boolean> topicPartitionLocks = new ConcurrentHashMap<>(); private Map<String, Long> topicPartitionLatestTimestamps = new ConcurrentHashMap<>(); @Override public void run(){ updateTopicPartitionLocks(); } private void updateTopicPartitionLocks() { Optional<Long> minValue = topicPartitionLatestTimestamps.values().stream().min((o1, o2) -> (int) (o1 - o2)); if(! minValue.isPresent()) { return; } Iterator it = topicPartitionLatestTimestamps.entrySet().iterator(); while (it.hasNext()) { Boolean shouldLock = false; Map.Entry<String, Long> pair = (Map.Entry)it.next(); String topicPartition = pair.getKey(); if(pair.getValue() > (minValue.get() + maxGap)) { shouldLock = true; if(isSameTopicAsMinPartition(minValue.get(), topicPartition)) { shouldLock = false; } } topicPartitionLocks.put(topicPartition, shouldLock); } } public boolean isLocked(String topicPartition) { return topicPartitionLocks.get(topicPartition).booleanValue(); }}
public class Prioritizer extends TimerTask {
private Map<String, Boolean> topicPartitionLocks = new ConcurrentHashMap<>();
private Map<String, Long> topicPartitionLatestTimestamps = new ConcurrentHashMap<>();
@Override
public void run(){
updateTopicPartitionLocks();
}
private void updateTopicPartitionLocks() {
Optional<Long> minValue = topicPartitionLatestTimestamps.values().stream().min((o1, o2) -> (int) (o1 - o2));
if(! minValue.isPresent()) {
return;
Iterator it = topicPartitionLatestTimestamps.entrySet().iterator();
while (it.hasNext()) {
Boolean shouldLock = false;
Map.Entry<String, Long> pair = (Map.Entry)it.next();
String topicPartition = pair.getKey();
if(pair.getValue() > (minValue.get() + maxGap)) {
shouldLock = true;
if(isSameTopicAsMinPartition(minValue.get(), topicPartition)) {
shouldLock = false;
topicPartitionLocks.put(topicPartition, shouldLock);
public boolean isLocked(String topicPartition) {
return topicPartitionLocks.get(topicPartition).booleanValue();
WaitForLatePartitioniFeeded方法
private void waitForLatePartitionIfNeeded(final String topic, int partition) { String topicPartition = topic + partition; prioritizer.getTopicPartitionLocks.putIfAbsent(topicPartition); while(prioritizer.isLocked(topicPartition)) { monitorWaitForLatePartitionTimes(topicPartition, startTime); Misc.sleep(timeToWaitBetweenGapToTardyPartitionChecks.get()); }}
private void waitForLatePartitionIfNeeded(final String topic, int partition) {
String topicPartition = topic + partition;
prioritizer.getTopicPartitionLocks.putIfAbsent(topicPartition);
while(prioritizer.isLocked(topicPartition)) {
monitorWaitForLatePartitionTimes(topicPartition, startTime);
Misc.sleep(timeToWaitBetweenGapToTardyPartitionChecks.get());
利用这一点,我们增加了再平衡,所以我们用以下定义来解决它:我们改变了Kafka的下一个配置
request.timeout.ms: 7300000 (~2hrs)max.poll.interval.ms: 7200000 (2hrs)
request.timeout.ms: 7300000 (~2hrs)
max.poll.interval.ms: 7200000 (2hrs)
有关该问题的图表和一般描述,请查看我的帖子:我是如何通过对Kafka主题进行优先级排序来解决Kafka信息中的延迟问题的祝你好运!
1条答案
按热度按时间ih99xse11#
我刚发了一篇关于这个问题的帖子。
即使我使用的是java,您也会发现这里描述的概念对您的案例很有用。
我们解决Kafka主题优先问题的方法是-
我们开发了一种机制来优先考虑Kafka主题的消费。这样的机制将检查我们是要处理从kafka使用的消息,还是保留处理以备以后使用。
我们在分区和布尔之间进行Map,如果需要的话,它会阻止每个分区的使用,topicpartitionlocks。封锁初步的,而继续消费从迟到的,创造了优先主题。timertask更新这个Map,我们的消费者检查他们是否被“允许”消费或者必须等待——正如您在WaitForLatePartitioniFeeded方法中看到的那样。
WaitForLatePartitioniFeeded方法
利用这一点,我们增加了再平衡,所以我们用以下定义来解决它:
我们改变了Kafka的下一个配置
有关该问题的图表和一般描述,请查看我的帖子:
我是如何通过对Kafka主题进行优先级排序来解决Kafka信息中的延迟问题的
祝你好运!