消费者未收到消息,kafka控制台,新消费者api,kafka 0.9

clj7thdc  于 2021-06-07  发布在  Kafka
关注(0)|答案(16)|浏览(447)

我正在为Kafka0.9.0.0做Kafka快速入门。
我有Zookeeper在听 localhost:2181 因为我跑了

bin/zookeeper-server-start.sh config/zookeeper.properties

我有一个经纪人在听 localhost:9092 因为我跑了

bin/kafka-server-start.sh config/server.properties

我有一个生产者张贴到主题“测试”,因为我跑

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
yello
is this thing on?
let's try another
gimme more

当我运行旧的api consumer时,它通过运行

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

然而,当我运行新的api消费者时,我在运行时什么也得不到

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning \
    --bootstrap-server localhost:9092

是否可以使用新的api从控制台使用者订阅主题?我该怎么修?

628mspwn

628mspwn1#

从bin运行以下命令:

./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

“test”是主题名

xnifntxz

xnifntxz2#

我刚刚遇到这个问题,解决办法是删除 /brokers 在zookeeper中,重新启动kafka节点。 bin/zookeeper-shell <zk-host>:2181 然后 rmr /brokers 不知道为什么这能解决问题。
当我启用调试日志记录时,我在使用者中反复看到此错误消息: 2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata

svmlkihl

svmlkihl3#

对我来说,这篇文章中描述的解决方案是有效的-https://stackoverflow.com/a/51540528/7568227
检查是否

offsets.topic.replication.factor

(或可能与复制相关的其他配置参数)不大于代理数。这就是我的问题所在。
在此修复之后,不再需要使用--分区0。
否则,我建议遵循上述线程中描述的调试过程。

rqdpfwrv

rqdpfwrv4#

我在我的mac盒中遇到了同样的问题:当使用该命令时,控制台消费者不消费任何消息

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic

但当我试着

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic --partition 0

它列出了发送的消息。这是Kafka1.10.11中的错误吗?

falq053o

falq053o5#

在我看来,这是行不通的

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

这很管用

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic --partition 0

因为这个主题 __consumer_offsets 在不可接近的经纪人身上。基本上,我忘了复制它。重新定位 __consumer_offsets 解决了我的问题。

c2e8gylq

c2e8gylq6#

这个问题也会影响使用flume从kafka接收数据并将数据放入hdfs。
要解决上述问题:
阻止Kafka的经纪人
连接到zookeeper集群并移除/z节点
重启Kafka经纪公司
我们使用集群的kafka客户端版本和scala版本没有问题。zookeeper可能有关于代理主机的错误信息。
要验证操作:
在Kafka中创建主题。

$ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning

打开一个制作人频道并向其提供一些消息。

$ kafka-console-producer --broker-list slavenode03.cdh.com:9092 --topic rkkrishnaa3210

打开使用者频道以使用特定主题的消息。

$ kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --from-beginning

要在Flume中进行测试:
flume代理配置:

rk.sources  = source1
rk.channels = channel1
rk.sinks = sink1

rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181
rk.sources.source1.topic = rkkrishnaa321
rk.sources.source1.groupId = flume1
rk.sources.source1.channels = channel1
rk.sources.source1.interceptors = i1
rk.sources.source1.interceptors.i1.type = timestamp
rk.sources.source1.kafka.consumer.timeout.ms = 100
rk.channels.channel1.type = memory
rk.channels.channel1.capacity = 10000
rk.channels.channel1.transactionCapacity = 1000
rk.sinks.sink1.type = hdfs
rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d
rk.sinks.sink1.hdfs.rollInterval = 5
rk.sinks.sink1.hdfs.rollSize = 0
rk.sinks.sink1.hdfs.rollCount = 0
rk.sinks.sink1.hdfs.fileType = DataStream
rk.sinks.sink1.channel = channel1

运行flume代理:

flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk

观察来自使用者的日志,该主题的消息是用hdfs编写的。

18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1
18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1
18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned.
18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920
18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.
pnwntuvh

pnwntuvh7#

在kafkaèu2.11-0.11.0.0中,zookeeper服务器已被弃用,它使用的是bootstrap服务器,它将使用代理ip地址和端口。如果您给出正确的代理参数,您将能够使用消息。
e、 g.$bin/kafka-console-consumer.sh--引导服务器:9093--主题测试--从头开始
我使用的是9093端口,对你来说可能会有所不同。
当做。

xmq68pz9

xmq68pz98#

就我而言, broker.id=1server.properties 这是个问题。
这应该是 broker.id=0 当您只使用一个kafka服务器进行开发时。
别忘了删除所有日志并重新启动zookeper和kafka
删除 /tmp/kafka-logs (定义见 server.properties 文件)
删除 [your_kafka_home]/logs 重新启动zookeper和kafka

4szc88ey

4szc88ey9#

你的本地主机是这里的foo。如果您将localhost字替换为实际的主机名,它应该可以工作。
这样地:
制作人

./bin/kafka-console-producer.sh --broker-list \
sandbox-hdp.hortonworks.com:9092 --topic test

消费者:

./bin/kafka-console-consumer.sh --topic test --from-beginning \    
--bootstrap-server bin/kafka-console-consumer.sh --new-consumer \
--topic test --from-beginning \
--bootstrap-server localhost:9092
ie3xauqp

ie3xauqp10#

我也有同样的问题,现在我明白了。
当您使用--zookeeper时,应该为它提供zookeeper地址作为参数。
当您使用--bootstrap服务器时,应该为它提供代理地址作为参数。

8ehkhllq

8ehkhllq11#

我遇到了一个问题,消费者在kafka_2.12-2.3.0.tgz中完成了执行。
尝试调试,但没有打印日志。
试着用Kafka2.12-2.2.2运行良好。运行良好。
试着从快速入门指南中运行zookeeper和kafka!

deyfvvtc

deyfvvtc12#

在我的mac上也有同样的问题。我检查了日志,发现以下错误。

Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). 
This error can be ignored if the cluster is starting up and not all brokers are up yet.

这可以通过将复制因子更改为1来解决。在server.properties中添加以下行并重新启动kafka/zookeeper。

offsets.topic.replication.factor=1
p4tfgftt

p4tfgftt13#

使用此选项:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

注意:删除 --new-consumer 从你的命令
参考请参见:https://kafka.apache.org/quickstart

ldxq2e6h

ldxq2e6h14#

在我的例子中,使用这两种方法都不起作用,然后我还增加了要调试的日志级别 config/log4j.properties ,启动控制台使用者

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic MY_TOPIC

然后拿到下面的日志

[2018-03-11 12:11:25,711] DEBUG [MetadataCache brokerId=10] Error while fetching metadata for MY_TOPIC-3: leader not available (kafka.server.MetadataCache)

这里的要点是,我有两个kafka节点,但是其中一个节点坏了,由于某种原因,默认情况下,如果由于节点坏了而导致某个分区不可用(在这种情况下是分区3),kafka控制台使用者将不会使用它。在我的申请表里没有。
可能的解决方案是
启动下行经纪人
删除主题并再次创建它,这样所有分区都将放置在online broker节点上

bwleehnv

bwleehnv15#

你能这样试一下吗:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

相关问题