java—如何在间隔运行的作业中异步处理来自kafka的对象的Map

2admgd59  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(312)

示例:kafka消费者每秒接收很少的用户对象记录。传入用户还有“timeinterval”参数。

User: {name: "xyz", age: 23, timeInterval: 30}

基于“时间间隔”,我需要动态运行一个作业,该作业在指定的时间间隔内连续运行。我可以有不同的间隔来与用户对象,如10秒,15秒30秒。
初步设计:
数据结构:

Map<Integer, HashMap<Integer, ArrayList<User>>> intervalUserMapMap;

在作业中,我需要处理按年龄分组的用户。我将根据时间间隔将所有用户添加到Map中,并根据年龄将内部Map添加到Map中。
由于可能会有多个作业在不同的时间间隔运行,我需要在不同的线程上运行。
问题:
当作业运行时,kafka不断地将许多用户推到程序中。“IntervaluerMapMapMap”中的hashmaps将被更新。这可能会导致两个线程访问同一Map时出现问题。
我可以用 ConcurrentHashMap 但我担心,如果将更多的密钥散列到将被锁定的同一个bucket中,那么Map将不会更新,因为线程将在应用程序的生命周期内连续运行。
需要帮助吗
我的设计正确吗。如果有什么可以改进的,请建议。
如果设计是好的,那么我应该如何更新这个多线程模型中的Map。
编辑:我试过的代码

@Component
public class UsersConsumer {

    private Map<Integer, HashMap<Integer, List<User>>> intervalAgeMapMap = new HashMap<>();

    @KafkaListener(groupId = "users", topics = "users", containerFactory = "usersKafkaListenerContainerFactory")
    public void listenGroupUsers(User user) {
        System.out.println(user);
        HashMap<Integer, List<User>> ageUserMap = null;
        if (intervalAgeMapMap.containsKey(user.getInterval())) {
            ageUserMap = intervalAgeMapMap.get(user.getInterval());
            if (ageUserMap.containsKey(user.getAge())) {
                List<User> userList = ageUserMap.get(user.getAge());
                userList.add(user);
            } else {
                List<User> userList = new ArrayList<User>();
                userList.add(user);
                ageUserMap.put(user.getAge(), userList);
            }
        } else {
            ageUserMap = new HashMap<>();
            List<User> userList = new ArrayList<User>();
            userList.add(user);
            ageUserMap.put(user.getAge(), userList);
            intervalAgeMapMap.put(user.getInterval(), ageUserMap);
            CompletableFuture.runAsync(() -> {
                processUsers(user.getInterval());
            });
        }
    }

    public void processUsers(int interval) {
        while (true) {
            HashMap<Integer, List<User>> ageUserListMap = intervalAgeMapMap.get(interval);
            for (Entry<Integer, List<User>> ageUserListSet : ageUserListMap.entrySet()) {
                List<User> userList = ageUserListSet.getValue();
                // process Users
            }
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

提前谢谢。

3bygqnnd

3bygqnnd1#

对于Kafka的分批工作,你不应该睡在消费者的线程,因为它会重新平衡集团定期,给经纪人带来压力。
所以,这里没有理由使用SpringKafka注解。您需要按照设置的时间表手动打开和关闭使用者
如果没有排序限制/处理是幂等的,您还可以将所有作业不断地推入优先级队列

相关问题