如何使用发布者生成消息(在React式kafka中)?

tcbh2hod  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(343)

我必须使用Kafka模块的代码:https://github.com/akka/reactive-kafka/blob/master/readme.md
我的代码以:

val kafka = new ReactiveKafka()

val kafkaIdpsMsgs: Publisher[StringKafkaMessage] = kafka.consume(
      ConsumerProperties(
        brokerList = kafkaHosts,
        zooKeeperHost = zkHosts,
        topic = "test",
        groupId = "idps-translator",
        decoder = new StringDecoder()
      ).readFromEndOfStream())

    val kafkaSamples: Subscriber[String] = kafka.publish(ProducerProperties(
      brokerList = kafkaHosts,
      topic = "test",
      encoder = new StringEncoder()
    ))

我想(由出版商)发一条消息。我要写什么代码才能实现它?

gpfsuwkq

gpfsuwkq1#

val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings))

您可以浏览以下文档:http://doc.akka.io/docs/akka-stream-kafka/current/producer.html

相关问题