如何使用lagom框架实现kafka消费者

6ljaweal  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(424)

尝试了不同的文档,但无法使用lagom实现kafka消费者api。由于com.lightbend.lagom.scaladsl.server.lagomservercomponents中的成员lagomserver:lagomserver未在加载器类中定义,因此无法使用MessageBroker和获取对象创建。下面是我的加载器类的代码片段。

class ConsumerLoader extends LagomApplicationLoader {

  override def load(context: LagomApplicationContext): LagomApplication =
    new ConsumerApplication(context) with ConfigurationServiceLocatorComponents

  override def describeService = Some(readDescriptor[ConsumerService])
}

abstract class ConsumerApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with AhcWSComponents {

  lazy val kafkaService = serviceClient.implement[ConsumerService]
}

请提供有关如何实现kafka消息消费者的有用文档链接。

yfjy0ee7

yfjy0ee71#

您可以尝试查看文档和lagomgithub存储库。另一个ServiceImpl包含kafka consumer的逻辑。适当的加载程序在另一个应用程序类的另一个servicespec.scala中定义。

ie3xauqp

ie3xauqp2#

我是这样做的:
当读写主题的服务是不同的服务或您只需要实现读卡器时:
添加主题方法 def topic: Topic[Envelope] 服务阅读器

trait ReaderKafkaService extends Service {
  def topic1: Topic[Envelope]

  override final def descriptor: Descriptor = {

    named("kafka-reader")
      .withTopics(
        topic("topic-name", topic1)
      )
      .withAutoAcl(true)
  }
}

Kafka的服务:

ConsumerService extends Service {
  override final def descriptor: Descriptor = {

    named("consumer-service")
      .withAutoAcl(true)
  }
}

此服务的加载项:

lazy val kafkaService: ReaderKafkaService = serviceClient.implement[ReaderKafkaService]

在impl中插入创建的服务:

class ServiceImpl(
    kafkaService: ReaderKafkaService,
) extends ConsumerService

订阅主题

class ServiceImpl(
    kafkaService: ReaderKafkaService,
) extends ConsumerService {

  kafkaService.topic1.subscribe
    .withGroupId("group-1")
    .atLeastOnce(
      Flow[Envelope]
        .mapAsync(1) {
          case envelope: Envelope =>
              println(s" Message from topic: $envelope") 
              Future.successful(Done)
        }
        .recover {
          case e =>
            log.error(s"Invalid message $e")
            Done
        }
    )
}

如果需要,可为kafka配置插件

kafka {
  bootstrap.servers = "localhost:9092"
}

当您要从同一服务写入和读取时:
添加服务方法:

trait ReaderWriterService extends Service {
  def topic1: Topic[Envelope]

  override final def descriptor: Descriptor = {

    named("kafka-reader-writer")
      .withTopics(
        topic("topic-name", topic1)
      )
      .withAutoAcl(true)
  }
}

服务实现:

class ServiceImpl(
    kafkaService: ReaderWriterService,
) extends ReaderWriterService {

  kafkaService.topic1.subscribe
    .withGroupId("group-1")
    .atLeastOnce(
      Flow[Envelope]
        .mapAsync(1) {
          case envelope: String =>
              println(s" Message from topic: $envelope") 
              Future.successful(Done)
        }
        .recover {
          case e =>
            log.error(s"Invalid message $e")
            Done
        }
    )

  override def topic1(): Topic[String] =
    TopicProducer.singleStreamWithOffset { fromOffset =>
      persistentEntityRegistry
        .eventStream(Event.Tag, fromOffset)
        .map(ev => ("Hi world", ev.offset))
    }
}

在装载机中,您需要扩展 with LagomKafkaComponents 并将此添加为服务 lazy val kafka: ProfileService = serviceClient.implement[ProfileService] ```
abstract class Application(context: LagomApplicationContext)
extends LagomApplication(context)
with CassandraPersistenceComponents
with LagomKafkaComponents
with AhcWSComponents {

override lazy val lagomServer = serverForReaderWriterService

lazy val kafka: ReaderWriterService = serviceClient.implement[ReaderWriterService]

persistentEntityRegistry.register(wire[PersistentEntity])
}

相关问题