如何使用scala中的akka流将多个文件发送给kafka生产者

jaxagkaj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(393)

我正在尝试使用akka stream将多个数据发送给kafka制作者,同时我编写了制作者本身,但正在为如何使用akka streamio来获取多个文件而挣扎,这些文件将是我要发送给我的kafka制作者的数据这是我的代码:

object App {

  def main(args: Array[String]): Unit = {
    val file = Paths.get("233339.8.1231731728115136.1722327129833578.log")
//    val file = Paths.get("example.csv")
//
//    val foreach: Future[IOResult] = FileIO.fromPath(file)
//      .to(Sink.ignore)
//      .run()

    println("Hello from producer")

    implicit val system:ActorSystem = ActorSystem("producer-example")
    implicit val materializer:Materializer = ActorMaterializer()

    val producerSettings = ProducerSettings(system,new StringSerializer,new StringSerializer)

    val done: Future[Done] =
      Source(1 to 955)
        .map(value => new ProducerRecord[String, String]("test-topic", s"$file : $value"))
        .runWith(Producer.plainSink(producerSettings))

    implicit val ec: ExecutionContextExecutor = system.dispatcher
    done onComplete  {
      case Success(_) => println("Done"); system.terminate()
      case Failure(err) => println(err.toString); system.terminate()
    }

  }

}
xtfmy6hx

xtfmy6hx1#

给定多个文件名:

val fileNames : Iterable[String] = ???

可以创建一个 Source 它使用 flatMapConcat :

val chunkSize = 8192

val chunkSource : Source[ByteString, _] = 
  Source.apply(fileNames)
        .map(fileName => Paths get fileName)
        .flatMapConcat(path => FileIO.fromPath(path, chunkSize))

这将发射固定大小的信号 ByteString 所有的价值观 chunkSize 长度,除了可能较小的最后一个值。
如果要用分隔符分隔行,则可以使用 Framing :

val delimiter : ByteString = ???

val maxFrameLength : Int = ???

val framingSource : Source[ByteString, _] =
  chunkSource.via(Framing.delimiter(delimiter, maxFrameLength))

相关问题