我在我的控制台中得到下面的日志,但是在发布消息时成功地得到了一条消息,但是每次都会发生这种情况,它会继续打印下面的日志。
10:18:06.884 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Sending asynchronous auto-commit of offsets {shayona-0=OffsetAndMetadata{offset=11349, leaderEpoch=0, metadata=''}}
10:18:06.884 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending OFFSET_COMMIT request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1, correlationId=1093) and timeout 30000 to node 2147482646: {group_id=test,generation_id=18,member_id=consumer-test-1-52154059-bfce-41f8-b05e-2e6973910aa9,group_instance_id=null,topics=[{name=shayona,partitions=[{partition_index=0,committed_offset=11349,committed_leader_epoch=0,committed_metadata=,_tagged_fields={}}],_tagged_fields={}}],_tagged_fields={}}
10:18:06.886 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received OFFSET_COMMIT response from node 2147482646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=consumer-test-1, correlationId=1093): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='shayona', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])])
10:18:06.886 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Committed offset 11349 for partition shayona-0
10:18:06.886 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-1, groupId=test] Completed asynchronous auto-commit of offsets {shayona-0=OffsetAndMetadata{offset=11349, leaderEpoch=0, metadata=''}}
10:18:07.177 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Received FETCH response from node 1001 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-test-1, correlationId=1092): org.apache.kafka.common.requests.FetchResponse@2c715e84
10:18:07.177 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Node 1001 sent an incremental fetch response with throttleTimeMs = 0 for session 1022872780 with 0 response partition(s), 1 implied partition(s)
10:18:07.177 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Added READ_UNCOMMITTED fetch request for partition shayona-0 at position FetchPosition{offset=11349, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1001 rack: null)], epoch=0}} to node localhost:9092 (id: 1001 rack: null)
10:18:07.177 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-test-1, groupId=test] Built incremental fetch (sessionId=1022872780, epoch=1035) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
10:18:07.177 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-test-1, groupId=test] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(shayona-0)) to broker localhost:9092 (id: 1001 rack: null)
10:18:07.177 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-1, groupId=test] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-test-1, correlationId=1094) and timeout 30000 to node 1001: {replica_id=-1,max_wait_ms=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=1022872780,session_epoch=1035,topics=[],forgotten_topics_data=[],rack_id=,_tagged_fields={}}
帮我摆脱困境。下面我提到消费者应用程序。
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);
String topic[] = {"shayona"};
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
暂无答案!
目前还没有任何答案,快来回答吧!