我正在制作一个应用程序,从elasticsearch获取数据并将其发送给kafka。但是producer.send()函数在map内部不起作用,但是在map外部,一切都能正常工作
val f1 = ElasticsearchSource
.create(
indexName = "products",
typeName = "product",
query = """{"match_all": {}}"""
)
.map { message: OutgoingMessage[spray.json.JsObject] =>
val product = message.source
producer.send(new ProducerRecord("test", product))
println("publishing message ")
IncomingMessage(Some(message.id), message.source)
}
.runWith(Sink.seq)
原因可能是什么?
暂无答案!
目前还没有任何答案,快来回答吧!