producer api(kafka-client-1.0.1)无法在google云中托管的远程kafka代理(v1.1.0)中生成消息。
我通过添加以下内容配置了server.properties文件:
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://<Local-IP>:9092
但同样的结果。
我的生产者api看起来像this:-
public class KafkaProducerExample {
private final static String TOPIC = "my-topic";
private final static String BOOTSTRAP_SERVERS ="Public-IP:9092";
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers",BOOTSTRAP_SERVERS);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put("key.serializer",LongSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
static void runProducer(final int sendMessageCount) throws Exception {
final Producer<Long, String> producer = createProducer();
long time = System.currentTimeMillis();
try {
for (long index = time; index < time + sendMessageCount; index++) {
final ProducerRecord<Long, String> record =
new ProducerRecord<>(TOPIC, index,
"Hello Suvro " + index);
RecordMetadata metadata = producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - time;
System.out.printf("sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d) time=%d\n",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime);
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String args[]) {
try {
runProducer(5);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
错误如下所示:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-topic-0: 30039 ms has passed since batch creation plus linger time
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at in.gov.enam.etrade.notification.KafkaProducerExample.runProducer(KafkaProducerExample.java:39)
at in.gov.enam.etrade.notification.KafkaProducerExample.main(KafkaProducerExample.java:54)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-topic-0: 30039 ms has passed since batch creation plus linger time
然而,当zookeeper、kafka服务器和producerapi与localhost在同一台机器上运行时,这一切都起了作用。
我不知道我错过了什么,也不知道我错在哪里。
1条答案
按热度按时间omhiaaxx1#
我有一个类似的问题指向部署在docker中的kafka集群。这是因为本地计算机和远程kafka集群之间存在访问问题。如果你能够打包你的producer应用程序,在远程google云上部署/运行它,producer应用程序应该可以正常工作。