示例:kafka消费者每秒接收很少的用户对象记录。传入用户还有“timeinterval”参数。
User: {name: "xyz", age: 23, timeInterval: 30}
基于“时间间隔”,我需要动态运行一个作业,该作业在指定的时间间隔内连续运行。我可以有不同的间隔来与用户对象,如10秒,15秒30秒。
初步设计:
数据结构:
Map<Integer, HashMap<Integer, ArrayList<User>>> intervalUserMapMap;
在作业中,我需要处理按年龄分组的用户。我将根据时间间隔将所有用户添加到Map中,并根据年龄将内部Map添加到Map中。
由于可能会有多个作业在不同的时间间隔运行,我需要在不同的线程上运行。
问题:
当作业运行时,kafka不断地将许多用户推到程序中。“IntervaluerMapMapMap”中的hashmaps将被更新。这可能会导致两个线程访问同一Map时出现问题。
我可以用 ConcurrentHashMap
但我担心,如果将更多的密钥散列到将被锁定的同一个bucket中,那么Map将不会更新,因为线程将在应用程序的生命周期内连续运行。
需要帮助吗
我的设计正确吗。如果有什么可以改进的,请建议。
如果设计是好的,那么我应该如何更新这个多线程模型中的Map。
编辑:我试过的代码
@Component
public class UsersConsumer {
private Map<Integer, HashMap<Integer, List<User>>> intervalAgeMapMap = new HashMap<>();
@KafkaListener(groupId = "users", topics = "users", containerFactory = "usersKafkaListenerContainerFactory")
public void listenGroupUsers(User user) {
System.out.println(user);
HashMap<Integer, List<User>> ageUserMap = null;
if (intervalAgeMapMap.containsKey(user.getInterval())) {
ageUserMap = intervalAgeMapMap.get(user.getInterval());
if (ageUserMap.containsKey(user.getAge())) {
List<User> userList = ageUserMap.get(user.getAge());
userList.add(user);
} else {
List<User> userList = new ArrayList<User>();
userList.add(user);
ageUserMap.put(user.getAge(), userList);
}
} else {
ageUserMap = new HashMap<>();
List<User> userList = new ArrayList<User>();
userList.add(user);
ageUserMap.put(user.getAge(), userList);
intervalAgeMapMap.put(user.getInterval(), ageUserMap);
CompletableFuture.runAsync(() -> {
processUsers(user.getInterval());
});
}
}
public void processUsers(int interval) {
while (true) {
HashMap<Integer, List<User>> ageUserListMap = intervalAgeMapMap.get(interval);
for (Entry<Integer, List<User>> ageUserListSet : ageUserListMap.entrySet()) {
List<User> userList = ageUserListSet.getValue();
// process Users
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
提前谢谢。
1条答案
按热度按时间3bygqnnd1#
对于Kafka的分批工作,你不应该睡在消费者的线程,因为它会重新平衡集团定期,给经纪人带来压力。
所以,这里没有理由使用SpringKafka注解。您需要按照设置的时间表手动打开和关闭使用者
如果没有排序限制/处理是幂等的,您还可以将所有作业不断地推入优先级队列