我必须使用Kafka模块的代码:https://github.com/akka/reactive-kafka/blob/master/readme.md
我的代码以:
val kafka = new ReactiveKafka()
val kafkaIdpsMsgs: Publisher[StringKafkaMessage] = kafka.consume(
ConsumerProperties(
brokerList = kafkaHosts,
zooKeeperHost = zkHosts,
topic = "test",
groupId = "idps-translator",
decoder = new StringDecoder()
).readFromEndOfStream())
val kafkaSamples: Subscriber[String] = kafka.publish(ProducerProperties(
brokerList = kafkaHosts,
topic = "test",
encoder = new StringEncoder()
))
我想(由出版商)发一条消息。我要写什么代码才能实现它?
1条答案
按热度按时间gpfsuwkq1#
您可以浏览以下文档:http://doc.akka.io/docs/akka-stream-kafka/current/producer.html