我正在使用akkastreamsKafka库与Kafka经纪人进行交互。我有以下具有无限缓冲区的流定义:
def producerStream[T: MessageType](producerProperties: Map[String, String]) = {
val streamSource = source[T](producerProperties)
val streamFlow = flow[T](producerProperties)
val streamSink = sink(producerProperties)
streamSource.via(streamFlow).to(streamSink).run()
}
我从actor示例中调用producerstream。我有一个类型的消息:
case class InitiateStream[T](publisher: ActorRef, anyMessage: T)
在我的actor中,我有以下接收块:
override def receive: Receive = super.receive orElse {
case StartProducerStream(publisherActor, DefaultMessage) =>
producerStream[DefaultMessage](cfg.producerProps)
// context.become(active)
case other => println(s"SHIT !! Got unknown message: $other")
}
发布者最终将获得应该推送到相应Kafka主题的消息。
现在我的问题是,我是否需要费心关闭producerstream?
我正在考虑在启动流时立即执行context.been(active(producerstream)),在active方法中,我将根据destroysteam消息处理流终止。需要这个吗?你们怎么看?
1条答案
按热度按时间wooyq4lh1#
如果使用akka kafka使用者,则源将无限期地发出事件,直到主题中的事件可用为止。为什么要关闭生产商?
我´我建议您拨打以下电话开始您的来源:
对于主动行为: