typescript nestjs为Kafka客户端设置分区数

pbpqsu0x  于 2023-04-07  发布在  TypeScript
关注(0)|答案(1)|浏览(147)

有什么方法可以将分区数量传递给Kafka客户端配置吗?
我正在使用nestjs的开箱即用的Kafka客户端,如下所示:生产者和消费者代码:
main.ts

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.connectMicroservice(
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'app1',
          brokers: ['localhost:9092'],
          logLevel: logLevel.ERROR,
        },
        producer: {
          allowAutoTopicCreation: true,
          retry: {
            retries: 10,
            multiplier: 2,
          },
        },
        consumer: {
          groupId: 'app1',
        },
        run: {
          autoCommit: false,
        },
      },
    },
    {
      inheritAppConfig: true,
    },
  );
  await app.startAllMicroservices();
  await app.listen(3000);
}
bootstrap();

控制器

@MessagePattern('event1')
  async event1Handler(@Payload() data: any, @Ctx() context: KafkaContext) {
    console.log(`receive ${data.arg} - part: ${context.getPartition()}`);
    await context.getConsumer().commitOffsets([
      {
        topic: context.getTopic(),
        partition: context.getPartition(),
        offset: context.getMessage().offset,
      },
    ]);
  }

当生产者和消费者运行时,他们用1个分区创建了一个Kafka主题,我需要通过Kafka客户端为主题创建多个分区,有没有办法将分区的数量传递给Kafka客户端?
谢谢

qvk1mo1f

qvk1mo1f1#

修改您的Kafka server.properties以更改partitions=1 ...
否则,您需要手动使用KafkaJS AdminClient类来创建主题,与NestJS config分开(即在调用bootstrap()之前)

相关问题