我在Nest.js项目中使用Kafka.js。这是我初始化KafkaClient的方式:
@Module({
...
providers: [{
provide: 'KAFKA_CLIENT',
useFactory: async (configService: KafkaClientConfigService) => {
const kafkaOptions = configService.getKafkaOptions();
return ClientProxyFactory.create(kafkaOptions);
},
inject: [KafkaClientConfigService],
},
]
...
})
字符串
现在我将KafkaClient注入到我的控制器中,我希望使用间隔来消费消息。虽然有一种方法可以使用Kafka.js使用consumer.pause()
来实现这一点,但我在KafkaClient中找不到任何引用。
是否有任何选项可以通过暂停或限制消费者来实现这一点?
1条答案
按热度按时间ldxq2e6h1#
ClientKafka
有一个protected consumer
属性。在消费者本身,你可以调用pause()
和resume()
。为了使用这些方法,我目前正在用一个自定义类扩展
ClientKafka
,并将其用作提供程序。字符串
请确保使用自定义类作为app.module.ts中的提供程序:
型
或者使用
useFactory
来注入依赖项。