我想在flink中创建一个bucketingsink,它在同一个文件夹中写入所有文件,但是用当前时间戳设置文件名,而不是递增计数器。例如:
零件号:0-1514107452000.avro
零件号:0-1514021052000.avro
...
这是我的密码:
val sink = new BucketingSink[Tuple2[String, MyType]]("/tmp/flink/")
sink.setBucketer(new MyTypeBucketer(new SimpleDateFormat("yyyy-MM-dd--HH")))
sink.setInactiveBucketThreshold(120000) // this is 2 minutes
sink.setBatchSize(1024 * 1024 * 64) // this is 64 MB,
sink.setPendingSuffix(".avro")
val writer: AvroKeyValueSinkWriter[String, MyType] = new AvroKeyValueSinkWriter[String, MyType](parseAvroSinkProperties())
sink.setWriter(writer.duplicate())
def parseAvroSinkProperties(): util.Map[String, String] = {
var properties = new util.HashMap[String, String]()
val stringSchema = Schema.create(Type.STRING)
val myTypeSchema = myType.getClassSchema
val keySchema = stringSchema.toString
val valueSchema = myTypeSchema.toString
val compress = true
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema)
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, compress.toString)
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC)
properties
}
class MyTypeBucketer(dateFormatter: SimpleDateFormat) extends DateTimeBucketer[Tuple2[String, MyType]] {
override def getBucketPath(clock: Clock, basePath: Path, element: Tuple2[String, MyType]) = {
new Path(s"$basePath/${element.f1.getMyStringProp}")
}
有人知道吗?谢谢
暂无答案!
目前还没有任何答案,快来回答吧!