使用kafka客户端api 1.0.1生成到远程kafka代理时发生java超时异常

hjqgdpho  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(385)

producer api(kafka-client-1.0.1)无法在google云中托管的远程kafka代理(v1.1.0)中生成消息。
我通过添加以下内容配置了server.properties文件:

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://<Local-IP>:9092

但同样的结果。
我的生产者api看起来像this:-

public class KafkaProducerExample {
private final static String TOPIC = "my-topic";
private final static String BOOTSTRAP_SERVERS ="Public-IP:9092";
private static Producer<Long, String> createProducer() {
    Properties props = new Properties();
    props.put("bootstrap.servers",BOOTSTRAP_SERVERS);

    props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
    props.put("key.serializer",LongSerializer.class.getName());
    props.put("value.serializer",StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}

static void runProducer(final int sendMessageCount) throws Exception {
    final Producer<Long, String> producer = createProducer();
    long time = System.currentTimeMillis();
    try {
        for (long index = time; index < time + sendMessageCount; index++) {
            final ProducerRecord<Long, String> record =
                    new ProducerRecord<>(TOPIC, index,
                                "Hello Suvro " + index);
            RecordMetadata metadata = producer.send(record).get();
            long elapsedTime = System.currentTimeMillis() - time;
            System.out.printf("sent record(key=%s value=%s) " +
                            "meta(partition=%d, offset=%d) time=%d\n",
                    record.key(), record.value(), metadata.partition(),
                    metadata.offset(), elapsedTime);
        }
    } finally {
        producer.flush();
        producer.close();
    }
}

public static void main(String args[]) {
    try {
        runProducer(5);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

错误如下所示:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-topic-0: 30039 ms has passed since batch creation plus linger time
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at in.gov.enam.etrade.notification.KafkaProducerExample.runProducer(KafkaProducerExample.java:39)
at in.gov.enam.etrade.notification.KafkaProducerExample.main(KafkaProducerExample.java:54)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-topic-0: 30039 ms has passed since batch creation plus linger time

然而,当zookeeper、kafka服务器和producerapi与localhost在同一台机器上运行时,这一切都起了作用。
我不知道我错过了什么,也不知道我错在哪里。

omhiaaxx

omhiaaxx1#

我有一个类似的问题指向部署在docker中的kafka集群。这是因为本地计算机和远程kafka集群之间存在访问问题。如果你能够打包你的producer应用程序,在远程google云上部署/运行它,producer应用程序应该可以正常工作。

相关问题