Redis streams数据丢失

5us2dqdw  于 2023-05-28  发布在  Redis
关注(0)|答案(1)|浏览(206)

我在Jedis客户端使用Redis,我有两个类,一个生产者类和一个消费者类。生产者类通过流发送的数据不能很好地被消费者类接收。似乎有数据丢失,但我不知道来源或如何减轻这个问题。My Streams生产者代码段:

Map<String, String> messageBody  = new HashMap<>();
        for(String basekey : args){
            Set<String> prices =  client.keys("RTSH:"+basekey +"*");
            for(String price_key : prices){
                messageBody.put("json_key", price_key);
                client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
                System.out.println(messageBody);
            }
        }

我的流消费者片段:

HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
        PooledConnectionProvider provider = new PooledConnectionProvider(config);
        UnifiedJedis client = new UnifiedJedis(provider);
        XReadGroupParams xReadGroupParams = new XReadGroupParams();
        xReadGroupParams.block(0);
        xReadGroupParams.count(1);
        xReadGroupParams.noAck();
        try {
            client.xgroupCreate(STREAMS_KEY, "RTSH_consumers",StreamEntryID.LAST_ENTRY,true);
        }catch(Exception redisBusyException) {
            System.out.println( String.format("\t Group '%s' already exists", "application_1"));
        }
        Map<String, StreamEntryID> stream =  new HashMap<>();
        stream.put(STREAMS_KEY,StreamEntryID.UNRECEIVED_ENTRY);
        while(true) {
            List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
            if (!messages.isEmpty()) {
                for (Map.Entry<String, List<StreamEntry>> entry : messages) {
                    String key = entry.getKey();//key is the stream name
                    List<StreamEntry> value = entry.getValue();
                    for (StreamEntry StreamEntry : value) {
                        String json_key = StreamEntry.getFields().get("json_key");
                        Object prices =  client.jsonGet(json_key);
                        System.out.println(prices);
                        System.out.println("\n");
                        client.xack(STREAMS_KEY, "RTSH_consumers", StreamEntry.getID());
                    }
                }
            }
        }

我试着减少数据量,但问题仍然存在。

7gcisfzg

7gcisfzg1#

xgroupCreate方法中,发送new StreamEntryID()而不是StreamEntryID.LAST_ENTRY。

相关问题