当连接到我的Kafka服务器时,我有一个循环:
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Initiating connection to node kafka1:9092 (id: -1 rack: null) using address kafka1/192.168.2.50
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Completed connection to node -1. Fetching API versions.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Initiating API versions fetch from node -1.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Recorded API versions for node -1: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='GGBD_SILC_AVRO')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node kafka1:9092 (id: -1 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Updating last seen epoch from null to 0 for partition GGBD_SILC_AVRO-0
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Cluster ID: nkxqo9NPSmWLKsk9dpI0wg
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='nkxqo9NPSmWLKsk9dpI0wg', nodes={1=kafka1:9092 (id: 1 rack: null), 2=kafka2:9092 (id: 2 rack: null), 3=kafka3:9092 (id: 3 rack: null)}, partitions=[PartitionMetadata(, error=NONE, partition=GGBD_SILC_AVRO-0, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,1, isr=3,1, offlineReplicas=)], controller=kafka1:9092 (id: 1 rack: null)}
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Received FindCoordinator response ClientResponse(receivedTimeMs=1596111596454, latencyMs=88, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-KOSILCConsumer-1, correlationId=0), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='kafka1', port=9092))
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Discovered group coordinator kafka1:9092 (id: 2147483646 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Initiating connection to node kafka1:9092 (id: 2147483646 rack: null) using address kafka1/192.168.2.50
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Executing onJoinPrepare with generation -1 and memberId
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Disabling heartbeat thread
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Heartbeat thread started
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] (Re-)joining group
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Joining group with current subscription: [GGBD_SILC_AVRO]
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending JoinGroup (JoinGroupRequestData(groupId='KOSILCConsumer', sessionTimeoutMs=60000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 14, 71, 71, 66, 68, 95, 83, 73, 76, 67, 95, 65, 86, 82, 79, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator kafka1:9092 (id: 2147483646 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2147483646
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Completed connection to node 2147483646. Fetching API versions.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Initiating API versions fetch from node 2147483646.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Recorded API versions for node 2147483646: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Disabling heartbeat thread
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] (Re-)joining group
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Joining group with current subscription: [GGBD_SILC_AVRO]
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending JoinGroup (JoinGroupRequestData(groupId='KOSILCConsumer', sessionTimeoutMs=60000, rebalanceTimeoutMs=300000, memberId='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 1, 0, 0, 0, 1, 0, 14, 71, 71, 66, 68, 95, 83, 73, 76, 67, 95, 65, 86, 82, 79, -1, -1, -1, -1, 0, 0, 0, 0])])) to coordinator kafka1:9092 (id: 2147483646 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Received successful JoinGroup response: JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=5, protocolType='consumer', protocolName='range', leader='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', memberId='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', members=[JoinGroupResponseMember(memberId='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 14, 71, 71, 66, 68, 95, 83, 73, 76, 67, 95, 65, 86, 82, 79, -1, -1, -1, -1, 0, 0, 0, 0])])
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Performing assignment using strategy range with subscriptions {consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Subscription@68987a0f}
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Finished assignment for group at generation 5: {consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2=Assignment(partitions=[GGBD_SILC_AVRO-0])}
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending leader SyncGroup to coordinator kafka1:9092 (id: 2147483646 rack: null) at generation Generation{generationId=5, memberId='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', protocol='range'}: SyncGroupRequestData(groupId='KOSILCConsumer', generationId=5, memberId='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2', assignment=[0, 1, 0, 0, 0, 1, 0, 14, 71, 71, 66, 68, 95, 83, 73, 76, 67, 95, 65, 86, 82, 79, 0, 0, 0, 1, 0, 0, 0, 0, -1, -1, -1, -1])])
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Received successful SyncGroup response: org.apache.kafka.common.requests.SyncGroupResponse@79cf691
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Successfully joined group with generation 5
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Enabling heartbeat thread
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Executing onJoinComplete with generation 5 and memberId consumer-KOSILCConsumer-1-b0ed09d8-e444-4266-bd5b-e76e7eae01f2
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Adding newly assigned partitions: GGBD_SILC_AVRO-0
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Fetching committed offsets for partitions: [GGBD_SILC_AVRO-0]
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Setting offset for partition GGBD_SILC_AVRO-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka3:9092 (id: 3 rack: null)], epoch=0}}
[monPartitionsAssigned: Tread:29 Particion: 0 Offset: 0
[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Seeking to offset 437 for partition GGBD_SILC_AVRO-0
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Added READ_COMMITTED fetch request for partition GGBD_SILC_AVRO-0 at position FetchPosition{offset=437, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka3:9092 (id: 3 rack: null)], epoch=0}} to node kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Built full fetch (sessionId=INVALID, epoch=INITIAL) for node 3 with 1 partition(s).
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending READ_COMMITTED FullFetchRequest(GGBD_SILC_AVRO-0) to broker kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Initiating connection to node kafka3:9092 (id: 3 rack: null) using address kafka3/192.168.2.52
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 3
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Completed connection to node 3. Fetching API versions.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Initiating API versions fetch from node 3.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Recorded API versions for node 3: (Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0])
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Node 3 sent a full fetch response that created a new incremental fetch session 732253096 with 1 response partition(s)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Fetch READ_COMMITTED at offset 437 for partition GGBD_SILC_AVRO-0 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=0)
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Fetch offset 437 is out of range for partition GGBD_SILC_AVRO-0, resetting offset
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={GGBD_SILC_AVRO-0={timestamp: -2, maxNumOffsets: 1, currentLeaderEpoch: Optional[0]}}, isolationLevel=READ_COMMITTED) to broker kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Handling ListOffsetResponse response for GGBD_SILC_AVRO-0. Fetched offset 0, timestamp -1
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Not replacing existing epoch 0 with new epoch 0 for partition GGBD_SILC_AVRO-0
[m[30m[30/07/2020 14:19:56] [INFO ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Resetting offset for partition GGBD_SILC_AVRO-0 to offset 0.
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Added READ_COMMITTED fetch request for partition GGBD_SILC_AVRO-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka3:9092 (id: 3 rack: null)], epoch=0}} to node kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Built incremental fetch (sessionId=732253096, epoch=1) for node 3. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[m[32m[30/07/2020 14:19:56] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(GGBD_SILC_AVRO-0), toForget=(), implied=()) to broker kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Node 0 sent an incremental fetch response with throttleTimeMs = 3 for session 732253096 with 1 response partition(s)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Fetch READ_COMMITTED at offset 0 for partition GGBD_SILC_AVRO-0 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = [], recordsSizeInBytes=0)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Added READ_COMMITTED fetch request for partition GGBD_SILC_AVRO-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka3:9092 (id: 3 rack: null)], epoch=0}} to node kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Built incremental fetch (sessionId=732253096, epoch=2) for node 3. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(GGBD_SILC_AVRO-0)) to broker kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Committed offset 0 for partition GGBD_SILC_AVRO-0
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Node 0 sent an incremental fetch response with throttleTimeMs = 3 for session 732253096 with 0 response partition(s), 1 implied partition(s)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Added READ_COMMITTED fetch request for partition GGBD_SILC_AVRO-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka3:9092 (id: 3 rack: null)], epoch=0}} to node kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Built incremental fetch (sessionId=732253096, epoch=3) for node 3. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
[m[32m[30/07/2020 14:19:57] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(GGBD_SILC_AVRO-0)) to broker kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:58] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Node 0 sent an incremental fetch response with throttleTimeMs = 3 for session 732253096 with 0 response partition(s), 1 implied partition(s)
[m[32m[30/07/2020 14:19:58] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Added READ_COMMITTED fetch request for partition GGBD_SILC_AVRO-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka3:9092 (id: 3 rack: null)], epoch=0}} to node kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:58] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Built incremental fetch (sessionId=732253096, epoch=4) for node 3. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
[m[32m[30/07/2020 14:19:58] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(GGBD_SILC_AVRO-0)) to broker kafka3:9092 (id: 3 rack: null)
[m[32m[30/07/2020 14:19:58] [DEBUG ] [Consumer clientId=consumer-KOSILCConsumer-1, groupId=KOSILCConsumer] Committed offset 0 for partition GGBD_SILC_AVRO-0
i、 不要在我的Kafka2.0.0版中更改任何内容。这个循环突然开始。
读到这些主题,我把我的Kafka升级到了ersión kafka_2.13-2.5.0用于解决此错误https://issues.apache.org/jira/browse/kafka-8052 但不是同一个问题,它仍然存在。
现在使用客户端版本2.5.0和新安装的versióKafka2.13-2.5.0。
有人能告诉我为什么会发生这种情况以及如何解决吗?
谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!