使用“Kafka连接”阅读最后一条信息中的Kafka主题

2q5ifsrm  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(302)

我正在使用confluent kafka conenct(confluent Inc/cp kafka connect)-base:5.4.1)从主题中读取消息并使用 org.apache.kafka.connect.converters.ByteArrayConverter 转换器。
我想为Kafka主题中1分钟的消息创建一个文件,从最后一条(最早的)消息开始。通过这样做,我希望每分钟都有一个文件被推送到s3存储桶,因为我开始阅读主题的最后一条消息,“在每个文件上放置”一分钟的消息。
为此,我将worker配置为 "consumer.override.auto.offset.reset": "earliest" , "rotate.interval.ms": "60000" 以及 "timestamp.extractor": "Record" ,与 "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner" .
当我查看s3 bucket时,我每分钟看到+20个文件。有什么我遗漏了/解释错了吗?这个 "s3.part.size" 从未达到完整的辅助进程配置:

{
  "name": "worker name",
  "config": {
    "_comment": "The S3 sink connector class",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
    "tasks.max": "1",
    "_comment": "Which topics to export to S3",
    "topics": "comma separated topics",
    "_comment": "Start reading the topics from the earliest message, this means ignore the messages pushed before the initialization of the consumer worker",
    "consumer.override.auto.offset.reset": "earliest",
    "_comment": "The S3 bucket that will be used by this connector instance",
    "s3.bucket.name": "S3 bucket name",
    "_comment": "The AWS region where the S3 bucket is located",
    "s3.region": "us-east-1",
    "_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
    "s3.part.size": "5242880",
    "_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
    "flush.size": "300000",
    "_comment": "Kafka Connect converter used to deserialize keys",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "_comment": "Kafka Connect converter used to deserialize values",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "_comment": "The type of storage for this storage cloud connector",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "_comment": "The storage format of the objects uploaded to S3",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "_comment": "The locale used by the time-based partitioner to encode the date string",
    "locale": "en",
    "_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
    "timezone": "UTC",
    "_comment": "Root directory name of the S3 bucket to store the files",
    "topics.dir": "S3 bucket root directory",
    "_comment": "The date-based part of the S3 object key",
    "path.format": "'date'=YYYY-MM-dd/'hour'=HH",
    "_comment": "The duration that aligns with the path format defined above",
    "partition.duration.ms": "3600000",
    "_comment": "The interval between timestamps that is sufficient to upload a new object to S3.",
    "rotate.interval.ms": "60000",
    "_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
    "timestamp.extractor": "Record"
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题