如何提高kafka生产商的吞吐量?

dffbzjpn  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(263)

我正在构建一个apachekafka生产者,由flinkkafka消费者使用。我需要每秒生成100万到1000万条消息。但是,我现在每秒得到的记录数量非常少(每个分区每秒最多2000条)。我有一个集群,每个集群有3个代理和30gb内存。这个主题还有10个分区。有什么建议吗?
这是我的制作人代码

public class TempDataGenerator implements Runnable {
private String topic = "try";
private String bootStrap_Servers = "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092";
public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    executor.execute(new TempDataGenerator());

}
public TempDataGenerator() {
}

private Producer<String, String> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootStrap_Servers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    props.put(ProducerConfig.ACKS_CONFIG,"0");
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"5000000000");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG,"100000");

    return new KafkaProducer<>(props);
}
public void run() {

    final Producer<String, String> producer = createProducer();
    Socket soc = null;
    try {
        boolean active = true;
        int  generatedCount = 0,tempUserID=1;//the minimum tuple that any thread can generate
        while (active) {
            generatedCount = 0;

                /**
                 * generate per second
                 */
                for (long stop = Instant.now().getMillis()+1000; stop > Instant.now().getMillis(); ) { //generate tps

                    String msg = "{ID:" + generatedCount + ", msg: "+Instant.now().getMillis()+"}";
                    final ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, msg);
                    RecordMetadata metadata = producer.send(record).get();
                    producer.flush();
                    generatedCount++;

                }

        }

    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }

}

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题