尝试了不同的文档,但无法使用lagom实现kafka消费者api。由于com.lightbend.lagom.scaladsl.server.lagomservercomponents中的成员lagomserver:lagomserver未在加载器类中定义,因此无法使用MessageBroker和获取对象创建。下面是我的加载器类的代码片段。
class ConsumerLoader extends LagomApplicationLoader {
override def load(context: LagomApplicationContext): LagomApplication =
new ConsumerApplication(context) with ConfigurationServiceLocatorComponents
override def describeService = Some(readDescriptor[ConsumerService])
}
abstract class ConsumerApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with AhcWSComponents {
lazy val kafkaService = serviceClient.implement[ConsumerService]
}
请提供有关如何实现kafka消息消费者的有用文档链接。
2条答案
按热度按时间yfjy0ee71#
您可以尝试查看文档和lagomgithub存储库。另一个ServiceImpl包含kafka consumer的逻辑。适当的加载程序在另一个应用程序类的另一个servicespec.scala中定义。
ie3xauqp2#
我是这样做的:
当读写主题的服务是不同的服务或您只需要实现读卡器时:
添加主题方法
def topic: Topic[Envelope]
服务阅读器Kafka的服务:
此服务的加载项:
在impl中插入创建的服务:
订阅主题
如果需要,可为kafka配置插件
当您要从同一服务写入和读取时:
添加服务方法:
服务实现:
在装载机中,您需要扩展
with LagomKafkaComponents
并将此添加为服务lazy val kafka: ProfileService = serviceClient.implement[ProfileService]
```abstract class Application(context: LagomApplicationContext)
extends LagomApplication(context)
with CassandraPersistenceComponents
with LagomKafkaComponents
with AhcWSComponents {
override lazy val lagomServer = serverForReaderWriterService
lazy val kafka: ReaderWriterService = serviceClient.implement[ReaderWriterService]
persistentEntityRegistry.register(wire[PersistentEntity])
}