kafka connector record writer由于缺少可分配的内存而卡在s3outputstream中,但不会在空闲数小时后失败

lnvxswe2  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(214)

我有一种行为我不知道该怎么改变。我正在测试s3Kafka同步连接器,我的主题中只有很少的数据。
目前,我可以通过使用kafka管理器看到主题中有数据,但我的连接器读取数据,从不移动偏移,也从不将其推送到kafka。在其他主题中,这是有效的,但在这个特定的主题中,它不是。我认为这与超时有关,但我找不到要设置的正确配置属性,因此刷新速度要快一点。
这是我的配置:

curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
      http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
      \
        -d '{
          ""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
          ""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
          ""s3.region"": ""us-east-1"",
          ""s3.bucket.name"": ""confluent-pipeline"",
          ""topics.dir"": ""topics"",
          ""topics"": ""com.acp.bde.doc_cmg"",
          ""flush.size"": ""25"",
          ""rotate.interval.ms"": ""5000"",
          ""auto.register.schemas"": ""false"",
          ""tasks.max"": ""1"",
          ""s3.part.size"": ""5242880"",
          ""timezone"": ""UTC"",
          ""parquet.codec"": ""snappy"",
          ""offset.flush.interval.ms"": ""5000"",
          ""offset.flush.timeout.ms"": ""1000"",
          ""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
          ""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
          ""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
          ""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
          ""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
          ""locale"": ""de-CH"",
          ""timezone"": ""Europe/Zurich"",
          ""store.url"": ""http://minio-server-svc:9000/""
        }'"

这是我在日志中看到的:

[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

所以他们已经开放了将近一个小时,什么都没有发生,我想知道我的配置是完全不好,或者有一些属性和配置,我需要这样的数据推送速度更快。
更新:我仍然没有得到正确的修复,但它似乎实际上是一个内存不足的问题。
程序就卡在这一行this.buffer=bytebuffer.allocate(this.partsize);
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/s3outputstream.java#l85
困扰我的是,它根本没有抱怨,只是停留在那里。它应该不会因为内存不足而崩溃吗?或者内存不能更快地释放?基本上它可以在通话中停留3到4个小时而没有任何反馈。
我仍然认为我的配置可能有问题,但我不知道我应该看什么或在哪里。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题