我已经从ksql查询和inut流创建了一个表,该表由kafka主题支持。
本主题是使用kafka连接到s3。在这个主题中,我有大约1k msgs/秒。
该主题有6个分区和3个副本。
我有一个奇怪的产出比。Flume似乎很奇怪。这是我的监控:监控
您可以看到第一个图表显示输入比b/s,第二个输出比,第三个是使用burrow计算的延迟。
这是我的s3接收器属性文件:
{
"name": "sink-feature-static",
"config": {
"topics": "FEATURE_APP_STATIC",
"topics.dir": "users-features-stream",
"tasks.max": "6",
"consumer.override.auto.offset.reset": "latest",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
"partition.duration.ms": "3600000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
"flush.size": 1000000,
"s3.part.size": 5242880,
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "600000",
"locale": "fr-FR",
"timezone": "UTC",
"timestamp.extractor": "Record",
"schema.compatibility": "NONE",
"aws.secret.access.key": "secretkey",
"aws.access.key.id": "accesskey",
"s3.bucket.name": "feature-store-prod-kafka-test",
"s3.region": "eu-west-1"
}
}
下面是我在s3 bucket中观察到的:s3 bucket在这些文件中,我在parquet.snappy中有少量的消息(有时只有1有时更多…)。每个分区大约每秒2个文件(当我使用记录时间戳时,这是因为它正赶上滞后时间(我认为)。
我期待的是:
每1000000条消息(flush.size)或每10分钟(rotate.schedule.interval.ms)提交一次文件。
所以我期望(100万条消息>10分钟*1千米/秒):
每小时1/6(每10分钟)*6(分区数量)Parquet文件
2/或者如果我错了,里面至少有100万条信息的文件。。。
但没有观察到1/或2/。。。
我每小时在s3文件中有一个巨大的延迟和一个flush/commit(参见监控)。
“partition.duration.ms”:“3600000”会导致这样的结果吗?
我错在哪里?为什么我没有看到一个连续的输出刷新的数据,但这样的峰值?
谢谢!ré我的
1条答案
按热度按时间z3yyvxxp1#
是的,第一盘
partition.duration.ms
如果您希望每10分钟有一个s3对象,则为10分钟。第二,如果你真的不想设置小文件rotate.interval.ms=-1
以及rotate.schedule.interval.ms
10分钟(但你方不保证一次交货)。使用时
rotate.interval.ms
,发生的情况是,每次您收到的时间戳早于文件偏移量时,kafka connect会在每个小时的开始和结束时刷新导致非常小的文件,它确实确保在所有故障情况下都能准确地传递一次。