我正在使用confluent kafka conenct(confluent Inc/cp kafka connect)-base:5.4.1)从主题中读取消息并使用 org.apache.kafka.connect.converters.ByteArrayConverter
转换器。
我想为Kafka主题中1分钟的消息创建一个文件,从最后一条(最早的)消息开始。通过这样做,我希望每分钟都有一个文件被推送到s3存储桶,因为我开始阅读主题的最后一条消息,“在每个文件上放置”一分钟的消息。
为此,我将worker配置为 "consumer.override.auto.offset.reset": "earliest"
, "rotate.interval.ms": "60000"
以及 "timestamp.extractor": "Record"
,与 "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
.
当我查看s3 bucket时,我每分钟看到+20个文件。有什么我遗漏了/解释错了吗?这个 "s3.part.size"
从未达到完整的辅助进程配置:
{
"name": "worker name",
"config": {
"_comment": "The S3 sink connector class",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"_comment": "The total number of Connect tasks to spawn (with implicit upper limit the number of topic-partitions)",
"tasks.max": "1",
"_comment": "Which topics to export to S3",
"topics": "comma separated topics",
"_comment": "Start reading the topics from the earliest message, this means ignore the messages pushed before the initialization of the consumer worker",
"consumer.override.auto.offset.reset": "earliest",
"_comment": "The S3 bucket that will be used by this connector instance",
"s3.bucket.name": "S3 bucket name",
"_comment": "The AWS region where the S3 bucket is located",
"s3.region": "us-east-1",
"_comment": "The size in bytes of a single part in a multipart upload. The last part is of s3.part.size bytes or less. This property does not affect the total size of an S3 object uploaded by the S3 connector",
"s3.part.size": "5242880",
"_comment": "The maximum number of Kafka records contained in a single S3 object. Here a high value to allow for time-based partition to take precedence",
"flush.size": "300000",
"_comment": "Kafka Connect converter used to deserialize keys",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"_comment": "Kafka Connect converter used to deserialize values",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"_comment": "The type of storage for this storage cloud connector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"_comment": "The storage format of the objects uploaded to S3",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"_comment": "The class used to partition records in objects to S3. Here, partitioning based on time is used.",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"_comment": "The locale used by the time-based partitioner to encode the date string",
"locale": "en",
"_comment": "Setting the timezone of the timestamps is also required by the time-based partitioner",
"timezone": "UTC",
"_comment": "Root directory name of the S3 bucket to store the files",
"topics.dir": "S3 bucket root directory",
"_comment": "The date-based part of the S3 object key",
"path.format": "'date'=YYYY-MM-dd/'hour'=HH",
"_comment": "The duration that aligns with the path format defined above",
"partition.duration.ms": "3600000",
"_comment": "The interval between timestamps that is sufficient to upload a new object to S3.",
"rotate.interval.ms": "60000",
"_comment": "The class to use to derive the timestamp for each record. Here Kafka record timestamps are used",
"timestamp.extractor": "Record"
}
}
暂无答案!
目前还没有任何答案,快来回答吧!