alpakka s3“multipartupload”不上载文件

agyaoht7  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(431)

我有一个关于 alpakka_kafka+alpakka_s3 整合。阿尔帕卡s3 multipartUpload 当我使用alpakka-kafka源代码时似乎不会上传文件。

  1. kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
  2. bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
  3. bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

不过,我一加上 .take(100) Kafka资源之后。一切正常。

  1. kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
  2. bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
  3. bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

任何帮助都将不胜感激。提前谢谢!
以下是完整的代码片段:

  1. // Source
  2. val kafkaSource: Source[(CommittableOffset, Array[Byte]), Consumer.Control] = {
  3. Consumer
  4. .committableSource(consumerSettings, Subscriptions.topics(prefixedTopics))
  5. .map(committableMessage => (committableMessage.committableOffset, committableMessage.record.value))
  6. .watchTermination() { (mat, f: Future[Done]) =>
  7. f.foreach { _ =>
  8. log.debug("consumer source shutdown, consumerId={}, group={}, topics={}", consumerId, group, prefixedTopics.mkString(", "))
  9. }
  10. mat
  11. }
  12. }
  13. // Flow
  14. val commitFlow: Flow[CommittableOffset, Done, NotUsed] = {
  15. Flow[CommittableOffset]
  16. .groupedWithin(batchingSize, batchingInterval)
  17. .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
  18. .mapAsync(parallelism = 3) { msg =>
  19. log.debug("committing offset, msg={}", msg)
  20. msg.commitScaladsl().map { result =>
  21. log.debug("committed offset, msg={}", msg)
  22. result
  23. }
  24. }
  25. }
  26. private val kafkaMsgToByteStringFlow = Flow[KafkaMessage[Any]].map(x => ByteString(x.msg + "\n"))
  27. private val kafkaMsgToOffsetFlow = {
  28. implicit val askTimeout: Timeout = Timeout(5.seconds)
  29. Flow[KafkaMessage[Any]].mapAsync(parallelism = 5) { elem =>
  30. Future(elem.offset)
  31. }
  32. }
  33. // Sink
  34. val s3Sink = {
  35. val BUCKET = "test-data"
  36. s3Client.multipartUpload(BUCKET, s"tmp/data.txt")
  37. // Doesnt' work..... ( no files are showing up on the S3)
  38. kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
  39. bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
  40. bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
  41. // This one works...
  42. kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
  43. bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
  44. bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
2ledvvac

2ledvvac1#

  1. private def running: Receive = {
  2. case Subscribe(subscriberId) =>
  3. val kafkaSubscriber = new KafkaSubscriber(
  4. serviceName = "akka_kafka_subscriber",
  5. group = kafkaConfig.group,
  6. topics = kafkaConfig.subscriberTopics,
  7. system = system,
  8. configurationProperties = Seq(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
  9. )
  10. RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  11. import GraphDSL.Implicits._
  12. val bcast = builder.add(Broadcast[KafkaMessage[Any]](2))
  13. kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> kafkaSubscriber.filterTypeFlow[Any] ~> bcast.in
  14. bcast.out(0) ~> kafkaMsgToStringFlow
  15. .groupedWithin(BATCH_SIZE, BATCH_DURATION)
  16. .map(group => group.foldLeft(new StringBuilder()) { (batch, elem) => batch.append(elem) })
  17. .mapAsync(parallelism = 3) { data =>
  18. self ? ReadyForUpload(ByteString(data.toString()),UUID.randomUUID().toString,subscriberId)
  19. } ~> Sink.ignore
  20. bcast.out(1) ~> kafkaMsgToOffsetFlow ~> kafkaSubscriber.commitFlow ~> Sink.ignore
  21. ClosedShape
  22. }).withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
  23. sender ! "subscription started"
  24. case ready: ReadyForUpload=>
  25. println("==========================Got ReadyForUpload: " + ready.fileName)
  26. val BUCKET = "S3_BUCKET"
  27. Source.single(ready.data).runWith(s3Client.multipartUpload(BUCKET, s"tmp/${ready.fileName}_${ready.subscriberId}.txt"))
  28. sender() ! "Done"
展开查看全部
sdnqo3pr

sdnqo3pr2#

实际上,它确实上传了。问题是,您需要向s3发送一个完成请求,以便完成您的上传,然后您的文件将在bucket中可用。我打赌Kafka没有消息来源 take(n) 从不停止向下游生成数据,接收器从不向s3发送完成请求,因为流实际上从未完成,因此接收器总是希望在完成请求之前上载更多数据。
你不可能只把所有东西上传到一个文件中,所以,我的建议是:分组 kafkaSource 消息并将压缩的数组[byte]发送到接收器。诀窍是你必须为每个文件创建一个接收器,而不是只使用一个接收器。

相关问题