kafka remote aws consumer.poll

vd8tlhqk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(429)

嗨,我一直在努力学习Kafka和有问题,我的远程投票/消费者。
我已经在aws ec2示例中用私有和公共ip设置了kafka。我的server.properties如下所示。

listeners=PLAINTEXT://172.31.31.58:9092  #AWS Private IP

advertised.listeners=PLAINTEXT://35.??.??.??:9092 #AWS Public IP Masked

我的aws ec2安全组配置为允许任何端口上的任何ip流量用于测试目的。
当我在ec2示例中使用以下脚本在本地生成/使用消息时,它工作得非常好

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

但当我试图从运行JavaAPI的远程笔记本eclipse代码连接到同一个kafka示例时,我的代码永远挂在consumer.poll(100)中。我做错什么了吗?

Properties props = new Properties();
props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners
props.put("group.id", "test123");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("test"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

     }
}
qgelzfjb

qgelzfjb1#

你确定它还挂着吗 poll() ? 或者是 poll() 只是归还一个空的 ConsumerRecords 它在空中循环 while(true) ?
默认情况下,如果尚未提交组的任何偏移量,则使用者从主题的末尾开始,因此它将只接收新消息。在这种情况下,如果要使用主题中已有的消息,则需要设置 auto.offset.resetearliest (就像您在控制台中使用 --from-beginning )
编辑:
如果真的卡住了 poll() ,可能是连接问题。要找到答案,最好的方法是在启用日志记录的情况下运行客户端。创建包含以下内容的文件:

log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

从你的客户开始 -Dlog4j.configuration=file:PATH_TO_FILE

相关问题