我正在本地pc上运行confluent-oss-5.0.0-2.11 kafka服务器,带有默认服务器属性(来自etc/kafka),并使用以下命令创建了主题test1和test2
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 –topic
以下是我拥有的环境属性
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
props.put(BATCH_SIZE_CONFIG, 16384);
props.put(LINGER_MS_CONFIG, 10);
props.put(BUFFER_MEMORY_CONFIG, 33554432);
props.put(METADATA_MAX_AGE_CONFIG, "10");
这是制片人的工作
String padded = RandomStringUtils.random(2000, true, true);
for(int i=0;i<1000000;i++) {
kafkaProducer.send("a" + i, "aa" + padded + i, "test1");
kafkaProducer.send("a" + i, "bb" + padded + i, "test2");
}
kafkaProducer.flush();
以下是消费者所做的
KTable<String, String> a = builder.table("test");
KTable<String, String> b = builder.table("test1");
a.join(b, new ValueJoiner<String, String, String>() {
@Override
public String apply(String value1, String value2) {
return "a" + value1;
}
}).toStream().to("finalTopic");
下面是我如何观察“最终”人口的表现
AtomicInteger counter = new AtomicInteger();
builder.<String, String>stream("finalTopic").peek((key, value) -> {
if(counter.incrementAndGet()%1000 == 0) {
logger.info("date {}, final join key {}, value size {}, joins performed {}", System.currentTimeMillis(), key, value.length(), counter.get());
}
});
我设法得到大约55000条信息每秒得到上述信息到不同的Kafka主题使用生产者。
然而,在用户端,消息被填充到“finaltopic”中的速率大约是每秒110条消息。
感谢您的指点!
暂无答案!
目前还没有任何答案,快来回答吧!