我使用的是scala 2.11和akka streams kafka 0.17。
我有一条小溪:
一 Source
是使用 Source.actorRef
. 这里,actor被安排以一定的时间间隔运行,并连续生成消息,这些消息被发送到流中。
我附上了一份 Producer
作为一个 Flow
. 制作人推动 ProducerMessage.Message
Kafka的主题。
一些数据库操作。
我在构建 ProducerMessage.Message
,看起来像:
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
我很容易通过考试 record
包含实际消息的参数。但我不知道接下来要传递什么 passThrough
参数。根据文件:
这个 passThrough
字段可以保存通过 Consumer#flow
包括在 Result
. 当需要向下游操作传递某些上下文时,这非常有用。这可以通过unzip/zip来完成,但这更方便。例如,它可以是一个 ConsumerMessage.CommittableOffset
或者 ConsumerMessage.CommittableOffsetBatch
可以稍后在流中提交。
在我的例子中,没有任何Kafka消费者订阅Kafka主题并生成 Source
( comittableSource
或者 plainSource
)为了我的小溪。在这种情况下,我将传递文档中描述的消费者补偿。但在我的例子中,一个演员正在模拟这样一个消费者。那意味着我没有权限 ConsumerMessage.CommittableOffset
. 那我要把什么传给 passThrough
参数在这里?在这种情况下,最好的做法是什么?
1条答案
按热度按时间s4chpxco1#
把我的问题转达给
reactive-kafka
团队,我得到答案了。基本上,他们所说的是,如果你没有一个用例pass through
任何情况下,您都可以尝试将其设置为none或notused,或者仅设置空字符串“”。另外请注意,如果您使用
Producer.plainSink
您不需要构造ProducerMessage.Message
. 然后,你可以直接构造一个KafkaProducerRecord
. 那个ProducerMessage.Message
case类只是一个容器,用于pass through
是需要的。除了要传递的元素之外,它只包含一个kafkaProducerRecord
.