我需要证明kafka producer能够每秒向kafka集群产生100万条消息,然后评估其性能。
如何实现每秒生成100万条消息?
j'ai une选择了commeçKafka的制作人:
public static void main(String args[]){
Random rnd = new Random();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int counter=0;
int i = 0;
while ( true ){
TimeZone tz = TimeZone.getTimeZone("UTC");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm.sss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset
df.setTimeZone(tz);
String nowAsISO = df.format(new Date());
++counter;
final String message = "sensor" + i + ":" + Integer.toString(rnd.nextInt(10000)) + " " + String.valueOf(rnd.nextDouble())+ " " +
"MyDevice" + " " +
"Sensor" + " " + "Sensing" + " " + "Property" + " " + "Unit" + " " + "9845A" + " " + nowAsISO ;
try {
producer.send(new ProducerRecord<String, String>("test", message), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e == null ){
System.out.println("Partition: "+recordMetadata.partition()
+", Offset" + recordMetadata.offset()
+ ", timestamp: " + recordMetadata.timestamp());
System.out.println(message);
}
else {
e.printStackTrace();
}
}
}
);
i++;
TimeUnit.SECONDS.sleep(1000);
}
catch ( InterruptedException e ){
System.out.println("I was interrupted.");
}
}
}
}
谢谢!!
1条答案
按热度按时间s6fujrry1#
在librdkafka github页面上,有一些指令可以使用2个代理生成850000 msg/sec
https://github.com/edenhill/librdkafka/blob/master/introduction.md
如果您有足够的网络带宽和磁盘i/o容量,您应该能够通过3个代理每秒获得超过100万条消息。