消费者-使用Kafka每秒产生100万条信息

svgewumm  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(359)

我需要证明kafka producer能够每秒向kafka集群产生100万条消息,然后评估其性能。
如何实现每秒生成100万条消息?
j'ai une选择了commeçKafka的制作人:

public static void main(String args[]){

    Random rnd = new Random();

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    int counter=0;
    int i = 0;
    while ( true ){
        TimeZone tz = TimeZone.getTimeZone("UTC");
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm.sss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset
        df.setTimeZone(tz);
        String nowAsISO = df.format(new Date());
        ++counter;

            final String message = "sensor" + i + ":" + Integer.toString(rnd.nextInt(10000)) + " " +  String.valueOf(rnd.nextDouble())+ " " +
                    "MyDevice" + " " +
                    "Sensor" + " " + "Sensing" + " " + "Property" + " " + "Unit" + " " + "9845A" + " " + nowAsISO ;

        try {
            producer.send(new ProducerRecord<String, String>("test", message), new Callback() {
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if ( e == null ){
                                System.out.println("Partition: "+recordMetadata.partition()
                                        +", Offset" + recordMetadata.offset()
                                        + ", timestamp: " + recordMetadata.timestamp());
                                System.out.println(message);
                            }
                            else {
                                e.printStackTrace();
                            }
                        }
                    }
            );

            i++;
            TimeUnit.SECONDS.sleep(1000);
        }
        catch ( InterruptedException e   ){
            System.out.println("I was interrupted.");
        }
    }

}

}
谢谢!!

s6fujrry

s6fujrry1#

在librdkafka github页面上,有一些指令可以使用2个代理生成850000 msg/sec
https://github.com/edenhill/librdkafka/blob/master/introduction.md
如果您有足够的网络带宽和磁盘i/o容量,您应该能够通过3个代理每秒获得超过100万条消息。

相关问题