kafka connect s3源连接器配置问题

vhipe2zx  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(406)

我上传了一些来自某个主题的avro消息 my.topic ,一个亚马逊s3水桶 s3-bucket ,使用kafka connect s3接收器连接器。接收器连接器的配置如下:

  1. {
  2. "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  3. "key.converter": "org.apache.kafka.connect.converters.LongConverter",
  4. "value.converter": "io.confluent.connect.avro.AvroConverter",
  5. "value.converter.schema.registry.url": "http://schemaregistry:8099",
  6. "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
  7. "tasks.max": "1",
  8. "topics": "my.topic",
  9. "s3.region": "eu-west-2",
  10. "s3.bucket.name": "s3-bucket",
  11. "flush.size": "5",
  12. "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  13. "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  14. "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  15. "schema.compatibility": "NONE",
  16. "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
  17. }

这和预期的一样,所有的消息都是相同的记录,具有相同的模式版本,我在主题中写了5个,在我的bucket中看到了一个带有path的s3对象

  1. /topics/my.topic/partition=0/my.topic+0+0000000000.avro

现在我想把这些存储的消息放到另一个空主题上。我使用以下配置启动s3源连接器:

  1. {
  2. "confluent.topic.bootstrap.servers": "kafka:9092",
  3. "confluent.topic.replication.factor": 1,
  4. "connector.class": "io.confluent.connect.s3.source.S3SourceConnector",
  5. "s3.region": "eu-west-2",
  6. "s3.bucket.name": "s3-bucket",
  7. "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  8. "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
  9. "transforms": "AddPrefix",
  10. "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  11. "transforms.AddPrefix.regex": ".*",
  12. "transforms.AddPrefix.replacement": "recovery_$0"
  13. }

当我查看kafka connect(在docker容器中运行)生成的日志时,它看起来很高兴,没有错误,并且它正确地标识了我的bucket和它内部的目录路径

  1. /topics/my.topic/partition=0/

但是,它从不检测内部的文件,也从不将任何内容写入预期的文件 recovery_my.topic 主题。它反复记录

  1. kafka-connect | [2020-07-05 15:31:46,311] INFO PartitionCheckingTask - Checking if Partitions have changed. (io.confluent.connect.cloud.storage.source.util.PartitionCheckingTask)
  2. kafka-connect | [2020-07-05 15:31:47,963] INFO WorkerSourceTask{id=tx-s3-restore-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
  3. kafka-connect | [2020-07-05 15:31:47,964] INFO WorkerSourceTask{id=tx-s3-restore-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
  4. kafka-connect | [2020-07-05 15:31:50,483] INFO AvroDataConfig values:
  5. kafka-connect | schemas.cache.config = 50
  6. kafka-connect | enhanced.avro.schema.support = false
  7. kafka-connect | connect.meta.data = true
  8. kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
  9. kafka-connect | [2020-07-05 15:31:50,483] INFO AvroDataConfig values:
  10. kafka-connect | schemas.cache.config = 50
  11. kafka-connect | enhanced.avro.schema.support = false
  12. kafka-connect | connect.meta.data = true
  13. kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
  14. kafka-connect | [2020-07-05 15:31:50,537] INFO AvroDataConfig values:
  15. kafka-connect | schemas.cache.config = 50
  16. kafka-connect | enhanced.avro.schema.support = false
  17. kafka-connect | connect.meta.data = true
  18. kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
  19. kafka-connect | [2020-07-05 15:31:50,589] INFO No new files ready after scan task assigned folders (io.confluent.connect.cloud.storage.source.StorageSourceTask)

这表明它出于某种原因忽略了该文件?下面是从日志中提取的完整s3源连接器配置

  1. kafka-connect | [2020-07-05 15:10:49,427] INFO S3SourceConnectorConfig values:
  2. kafka-connect | behavior.on.error = fail
  3. kafka-connect | confluent.license =
  4. kafka-connect | confluent.topic = _confluent-command
  5. kafka-connect | confluent.topic.bootstrap.servers = [kafka:9092]
  6. kafka-connect | confluent.topic.replication.factor = 1
  7. kafka-connect | directory.delim = /
  8. kafka-connect | filename.regex = (.+)\+(\d+)\+.+$
  9. kafka-connect | folders = [topics/my.topic/partition=0/]
  10. kafka-connect | format.bytearray.extension = .bin
  11. kafka-connect | format.bytearray.separator =
  12. kafka-connect | format.class = class io.confluent.connect.s3.format.avro.AvroFormat
  13. kafka-connect | partition.field.name = []
  14. kafka-connect | partitioner.class = class io.confluent.connect.storage.partitioner.DefaultPartitioner
  15. kafka-connect | path.format =
  16. kafka-connect | record.batch.max.size = 200
  17. kafka-connect | s3.bucket.name = s3-bucket
  18. kafka-connect | s3.credentials.provider.class = class com.amazonaws.auth.DefaultAWSCredentialsProviderChain
  19. kafka-connect | s3.http.send.expect.continue = true
  20. kafka-connect | s3.part.retries = 3
  21. kafka-connect | s3.poll.interval.ms = 60000
  22. kafka-connect | s3.proxy.password = [hidden]
  23. kafka-connect | s3.proxy.url =
  24. kafka-connect | s3.proxy.user = null
  25. kafka-connect | s3.region = eu-west-2
  26. kafka-connect | s3.retry.backoff.ms = 200
  27. kafka-connect | s3.sse.customer.key = [hidden]
  28. kafka-connect | s3.ssea.name =
  29. kafka-connect | s3.wan.mode = false
  30. kafka-connect | schema.cache.size = 50
  31. kafka-connect | store.url = null
  32. kafka-connect | topics.dir = topics
  33. kafka-connect | (io.confluent.connect.s3.source.S3SourceConnectorConfig)
  34. kafka-connect | [2020-07-05 15:10:49,428] INFO [Producer clientId=connector-producer-tx-s3-restore-0] Cluster ID: nlQYzBVYRbWozKk54-Qx_A (org.apache.kafka.clients.Metadata)
  35. kafka-connect | [2020-07-05 15:10:49,432] INFO AvroDataConfig values:
  36. kafka-connect | schemas.cache.config = 50
  37. kafka-connect | enhanced.avro.schema.support = false
  38. kafka-connect | connect.meta.data = true
  39. kafka-connect | (io.confluent.connect.avro.AvroDataConfig)
  40. kafka-connect | [2020-07-05 15:10:49,434] INFO Starting source connector task with assigned folders [topics/my.topic/partition=0/] using partitioner io.confluent.connect.storage.partitioner.DefaultPartitioner (io.confluent.connect.cloud.storage.source.StorageSourceTask)

如果有人知道为什么我的文件被忽略,我将非常感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题