Confluent S3 Sink Connector for Kafka无法使用MSK / MSK Connect连接到S3

fjaof16o  于 2023-05-21  发布在  Apache
关注(0)|答案(1)|浏览(149)

我正在尝试使用Apache MSK / MSK Connect和Confluent S3 Sink Connector将我的Kafka消息存储到S3。
连接器尝试连接S3时遇到超时错误。
为了排除权限问题,我给了连接器角色对S3的完全访问权限,但这并没有解决问题。
我使用的配置如下:

connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.json.JsonFormat
flush.size=3
tasks.max=1
name=kafka-connect-s3
storage.class=io.confluent.connect.s3.storage.S3Storage
topics.regex=.*
s3.bucket.name=######-kafka-messages
[Worker-0bfbcc480ad565df0]  (io.confluent.connect.storage.partitioner.PartitionerConfig:361)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:52:24,646] INFO [kafka-connect-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:186)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,352] ERROR [kafka-connect-s3|task-0] WorkerSinkTask{id=kafka-connect-s3-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
[Worker-0bfbcc480ad565df0] org.apache.kafka.connect.errors.ConnectException: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:308)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-0bfbcc480ad565df0]  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-0bfbcc480ad565df0]  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-0bfbcc480ad565df0]  at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-0bfbcc480ad565df0] Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1207)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1153)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
[Worker-0bfbcc480ad565df0]  at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
[Worker-0bfbcc480ad565df0]  ... 9 more
[Worker-0bfbcc480ad565df0] Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.ap-southeast-2.amazonaws.com:443 [s3.ap-southeast-2.amazonaws.com/52.95.131.12] failed: connect timed out
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[Worker-0bfbcc480ad565df0]  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Worker-0bfbcc480ad565df0]  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.$Proxy47.connect(Unknown Source)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1331)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[Worker-0bfbcc480ad565df0]  ... 24 more
[Worker-0bfbcc480ad565df0] Caused by: java.net.SocketTimeoutException: connect timed out
[Worker-0bfbcc480ad565df0]  at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
[Worker-0bfbcc480ad565df0]  at java.base/java.net.Socket.connect(Socket.java:609)
[Worker-0bfbcc480ad565df0]  at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
[Worker-0bfbcc480ad565df0]  at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:142)
[Worker-0bfbcc480ad565df0]  at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
[Worker-0bfbcc480ad565df0]  ... 40 more
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,354] INFO [kafka-connect-s3|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-0bfbcc480ad565df0] [2022-04-12 13:53:05,355] INFO [kafka-connect-s3|task-0] App info kafka.consumer for connector-consumer-kafka-connect-s3-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

会出什么问题呢?子网可以访问Internet并连接Internet网关。网络ACL是默认设置。

svmlkihl

svmlkihl1#

您需要按照documentation中所述为S3创建一个端点。然后检查您分配给连接器的安全组是否具有外出权限。例如,您可以设置出站规则:“所有流量/所有协议/所有端口”> 0.0.0.0/0

相关问题