我在Jedis客户端使用Redis,我有两个类,一个生产者类和一个消费者类。生产者类通过流发送的数据不能很好地被消费者类接收。似乎有数据丢失,但我不知道来源或如何减轻这个问题。My Streams生产者代码段:
Map<String, String> messageBody = new HashMap<>();
for(String basekey : args){
Set<String> prices = client.keys("RTSH:"+basekey +"*");
for(String price_key : prices){
messageBody.put("json_key", price_key);
client.xadd(STREAMS_KEY, StreamEntryID.NEW_ENTRY, messageBody);
System.out.println(messageBody);
}
}
我的流消费者片段:
HostAndPort config = new HostAndPort(Protocol.DEFAULT_HOST, 6379);
PooledConnectionProvider provider = new PooledConnectionProvider(config);
UnifiedJedis client = new UnifiedJedis(provider);
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.block(0);
xReadGroupParams.count(1);
xReadGroupParams.noAck();
try {
client.xgroupCreate(STREAMS_KEY, "RTSH_consumers",StreamEntryID.LAST_ENTRY,true);
}catch(Exception redisBusyException) {
System.out.println( String.format("\t Group '%s' already exists", "application_1"));
}
Map<String, StreamEntryID> stream = new HashMap<>();
stream.put(STREAMS_KEY,StreamEntryID.UNRECEIVED_ENTRY);
while(true) {
List<Map.Entry<String, List<StreamEntry>>> messages = client.xreadGroup("RTSH_consumers", "consumer_RTSH", xReadGroupParams, stream);
if (!messages.isEmpty()) {
for (Map.Entry<String, List<StreamEntry>> entry : messages) {
String key = entry.getKey();//key is the stream name
List<StreamEntry> value = entry.getValue();
for (StreamEntry StreamEntry : value) {
String json_key = StreamEntry.getFields().get("json_key");
Object prices = client.jsonGet(json_key);
System.out.println(prices);
System.out.println("\n");
client.xack(STREAMS_KEY, "RTSH_consumers", StreamEntry.getID());
}
}
}
}
我试着减少数据量,但问题仍然存在。
1条答案
按热度按时间7gcisfzg1#
在
xgroupCreate
方法中,发送new StreamEntryID()
而不是StreamEntryID.LAST_ENTRY。