producer.send在.map中不起作用

u3r8eeie  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(198)

我正在制作一个应用程序,从elasticsearch获取数据并将其发送给kafka。但是producer.send()函数在map内部不起作用,但是在map外部,一切都能正常工作

val f1 = ElasticsearchSource
  .create(
    indexName = "products",
    typeName = "product",
    query = """{"match_all": {}}"""
  )
  .map { message: OutgoingMessage[spray.json.JsObject] =>
    val product = message.source
    producer.send(new ProducerRecord("test", product))
    println("publishing message ")
    IncomingMessage(Some(message.id), message.source)
  }
  .runWith(Sink.seq)

原因可能是什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题