Kafka 修改来自拦截器的RPC上下文数据?

gpnt7bae  于 2023-11-16  发布在  Apache
关注(0)|答案(2)|浏览(181)

可以修改基于HTTP的上下文的请求有效载荷(例如此方法),但是否有方法修改基于RPC的上下文的有效载荷?
在我的例子中,我的NestJS设置是一个监听特定Kafka主题(Customer)的微服务。消息是使用Avro编码的,所以我试图创建一个拦截器来全局解码来自Kafka的每条消息。

// inside intercept function
const rpcContext = context.switchToRpc().getContext();
const kafkaMessageBuffer: Buffer = context.switchToRpc().getData();
const decodedMessage = await this.schemaRegistry.decode(kafkaMessageBuffer);

// My attempt to replace the value (which is a buffer) with the decoded data
rpcContext.args[0]['value'] = decodedMessage;

字符串
这不起作用,因为args是受保护的属性,所以我不能修改它。
还有什么其他方法可以全局解码所有Kafka消息?
我能想到的其他选择:

  • 创建自定义管道以转换控制器中的数据:
  • @Payload(AvroTransformPipe) message: CustomerMessage
  • 在将解码的消息传递给服务之前,在控制器中手动解码消息
dtcbnfnu

dtcbnfnu1#

我遇到了同样的问题。我的解决方案是在全局拦截器中,动态执行de方法,而不使用next.handle():

const rpcHost: RpcArgumentsHost = context.switchToRpc();
const dataWrapper = rpcHost.getData();
const user = dataWrapper.user;
const data = dataWrapper.data;

const instance = app.get(context.getClass());

const resultPromise: Promise<any> =
  instance[context.getHandler().name](data);

return from(resultPromise);

字符串

vawmfj5a

vawmfj5a2#

最后我使用了一个全局管道。

transform(value: any, metadata: ArgumentMetadata): any {
  if (metadata.type !== 'body') {
    return value;
  }

  return value instanceof Buffer ? this.schemaRegistry.decode(value) : null;
}

字符串
这个解决方案并不完美,因为它将应用于每个装饰器(因此需要进行metadata.type检查),而且我不确定如何将其针对特定的装饰器(在我的示例中,我只想针对@Payload())。如果应用程序是带有HTTP端点的混合应用程序,它也可能会有问题。

相关问题