我正在尝试使用akka stream将多个数据发送给kafka制作者,同时我编写了制作者本身,但正在为如何使用akka streamio来获取多个文件而挣扎,这些文件将是我要发送给我的kafka制作者的数据这是我的代码:
object App {
def main(args: Array[String]): Unit = {
val file = Paths.get("233339.8.1231731728115136.1722327129833578.log")
// val file = Paths.get("example.csv")
//
// val foreach: Future[IOResult] = FileIO.fromPath(file)
// .to(Sink.ignore)
// .run()
println("Hello from producer")
implicit val system:ActorSystem = ActorSystem("producer-example")
implicit val materializer:Materializer = ActorMaterializer()
val producerSettings = ProducerSettings(system,new StringSerializer,new StringSerializer)
val done: Future[Done] =
Source(1 to 955)
.map(value => new ProducerRecord[String, String]("test-topic", s"$file : $value"))
.runWith(Producer.plainSink(producerSettings))
implicit val ec: ExecutionContextExecutor = system.dispatcher
done onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
}
1条答案
按热度按时间xtfmy6hx1#
给定多个文件名:
可以创建一个
Source
它使用flatMapConcat
:这将发射固定大小的信号
ByteString
所有的价值观chunkSize
长度,除了可能较小的最后一个值。如果要用分隔符分隔行,则可以使用
Framing
: