beam管道:kafka到hdfs的时间桶

iyr7buue  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(367)

我试图烘焙一个非常简单的管道,读取Kafka事件流( KafkaIO.read )并将完全相同的事件写入hdfs,按小时将每个事件拼凑在一起(小时是从事件的时间戳字段读取的,而不是处理时间)。
无法对事件的时间戳进行任何假设(即使99%的时间是实时的,它们也可能跨越多天),并且绝对没有关于事件顺序的信息。我的第一个尝试是创建一个在处理时间内运行的管道。
我的管道如下所示:

val kafkaReader = KafkaIO.read[String, String]()
  .withBootstrapServers(options.getKafkaBootstrapServers)
  .withTopic(options.getKafkaInputTopic)
  .withKeyDeserializer(classOf[StringDeserializer])
  .withValueDeserializer(classOf[StringDeserializer])
  .updateConsumerProperties(
    ImmutableMap.of("receive.buffer.bytes", Integer.valueOf(16 * 1024 * 1024))
  )
  .commitOffsetsInFinalize()
  .withoutMetadata()

val keyed = p.apply(kafkaReader)
  .apply(Values.create[String]())
  .apply(new WindowedByWatermark(options.getBatchSize))
  .apply(ParDo.of[String, CustomEvent](new CustomEvent))

val outfolder = FileSystems.matchNewResource(options.getHdfsOutputPath, true)

    keyed.apply(
  "write to HDFS",
  FileIO.writeDynamic[Integer, CustomEvent]()
    .by(new SerializableFunction[CustomEvent, Integer] {
      override def apply(input: CustomEvent): Integer = {
        new Instant(event.eventTime * 1000L).toDateTime.withMinuteOfHour(0).withSecondOfMinute(0)
        (eventZeroHoured.getMillis / 1000).toInt
      }
    })
    .via(Contextful.fn(new SerializableFunction[CustomEvent, String] {
      override def apply(input: CustomEvent): String = {
        convertEventToStr(input)
      }
    }), TextIO.sink())
    .withNaming(new SerializableFunction[Integer, FileNaming] {
      override def apply(bucket: Integer): FileNaming = {
        new BucketedFileNaming(outfolder, bucket, withTiming = true)
      }
    })
    .withDestinationCoder(StringUtf8Coder.of())
    .to(options.getHdfsOutputPath)
    .withTempDirectory("hdfs://tlap/tmp/gulptmp")
    .withNumShards(1)
    .withCompression(Compression.GZIP)
)

这是我的windowedbywatermark:

class WindowedByWatermark(bucketSize: Int = 5000000) extends PTransform[PCollection[String], PCollection[String]] {

  val window: Window[String] = Window
    .into[String](FixedWindows.of(Duration.standardMinutes(10)))
    .triggering(
      AfterWatermark.pastEndOfWindow()
        .withEarlyFirings(AfterPane.elementCountAtLeast(bucketSize))
    )
    .withAllowedLateness(Duration.standardMinutes(30))
    .discardingFiredPanes()

  override def expand(input: PCollection[String]): PCollection[String] = {
    input.apply("window", window)
  }
}

管道运行完美无瑕,但由于写入阶段(由 writeDynamic ). 大多数事件都是实时发生的,因此它们属于同一时间。我也试着用小时和分钟来计算数据,没有太多帮助。
在经历了几天的痛苦之后,我决定用一个 bucketingSink 而且性能非常好。

val stream = env
  .addSource(new FlinkKafkaConsumer011[String](options.kafkaInputTopic, new SimpleStringSchema(), properties))
  .addSink(bucketingSink(options.hdfsOutputPath, options.batchSize))

根据我的分析(甚至使用jmx),beam中的线程在hdfs的写入阶段等待(这会导致管道暂停从kafka检索数据)。
因此,我有以下问题:
有没有可能把扣环按下去 bucketingSink 你也在梁里吗?
有没有一个更聪明的方法来实现相同的光束?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题