使用SASL_SCRAM-256时,Kafka consumer无法连接到服务器。这里是consumer的配置和调试信息,我使用spring-Kafka-2.1.11.RELEASE,因此Kafka-client的版本是1.0.2。我已经为对方的服务器IP设置了防火墙,对方也将我的IP加入了白名单。consumer配置:
auto.commit.interval.ms = 1000
112 auto.offset.reset = earliest
113 bootstrap.servers = [114.xxx.xxx.xxx:9092]
114 check.crcs = true
115 client.id =
116 connections.max.idle.ms = 540000
117 enable.auto.commit = true
118 exclude.internal.topics = true
119 fetch.max.bytes = 52428800
120 fetch.max.wait.ms = 500
121 fetch.min.bytes = 1
122 group.id = xxx
123 heartbeat.interval.ms = 100
124 interceptor.classes = null
125 internal.leave.group.on.close = true
126 isolation.level = read_uncommitted
127 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
128 max.partition.fetch.bytes = 1048576
129 max.poll.interval.ms = 300000
130 max.poll.records = 10
131 metadata.max.age.ms = 300000
132 metric.reporters = []
133 metrics.num.samples = 2
134 metrics.recording.level = INFO
135 metrics.sample.window.ms = 30000
136 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
137 receive.buffer.bytes = 65536
138 reconnect.backoff.max.ms = 1000
139 reconnect.backoff.ms = 50
140 request.timeout.ms = 305000
141 retry.backoff.ms = 100
142 sasl.jaas.config = [hidden]
143 sasl.kerberos.kinit.cmd = /usr/bin/kinit
144 sasl.kerberos.min.time.before.relogin = 60000
145 sasl.kerberos.service.name = null
146 sasl.kerberos.ticket.renew.jitter = 0.05
147 sasl.kerberos.ticket.renew.window.factor = 0.8
148 sasl.mechanism = SCRAM-SHA-256
149 security.protocol = SASL_PLAINTEXT
150 send.buffer.bytes = 131072
151 session.timeout.ms = 10000
152 ssl.cipher.suites = null
153 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
154 ssl.endpoint.identification.algorithm = null
155 ssl.key.password = null
156 ssl.keymanager.algorithm = SunX509
157 ssl.keystore.location = null
158 ssl.keystore.password = null
159 ssl.keystore.type = JKS
160 ssl.protocol = TLS
161 ssl.provider = null
162 ssl.secure.random.implementation = null
163 ssl.trustmanager.algorithm = PKIX
164 ssl.truststore.location = null
165 ssl.truststore.password = null
166 ssl.truststore.type = JKS
167 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
字符串
调试信息:
2023-12-22 23:54:58.051 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Initialize connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null) for sending metadata request
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Initiating connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null)
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator Set SASL client state to SEND_APIVERSIONS_REQUEST
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator Creating SaslClient: client=null;service=kafka;serviceHostname=kafka154;mechs=[SCRAM-SHA-256]
2023-12-22 23:54:58.103 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ScramSaslClient Setting SASL/SCRAM_SHA_256 client state to SEND_CLIENT_FIRST_MESSAGE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector [Consumer clientId=consumer-1, groupId=xxx] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Completed connection to node -1. Fetching API versions.
2023-12-22 23:55:02.115 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector [Consumer clientId=consumer-1, groupId=xxx] Connection with kafka154/114.xxx.xxx.xxx disconnected
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:122)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:306)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:392)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:180)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:712)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Node -1 disconnected.
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available
型
知道是什么引起的吗
1条答案
按热度按时间k7fdbhmy1#
问题已经查明,服务器机房的出口IP不在对方Kafka的白名单中,虽然我之前要求对方添加白名单一次,但我没有注意到这台机器的出口IP不在前述白名单中,因为我能够和对方的broker建立TCP连接,我一开始没有考虑IP限制,实际上对方添加的白名单并不是防火墙式的白名单(否则不应该建立连接),如果以后出现类似问题,建议先确认网络问题。