我正在与Kafka合作,有人要求我验证发送给Kafka的信息,但我不喜欢我认为的解决方案,这就是为什么我希望有人能就此给我建议。
我们有许多生产商在我们的控制之外,所以他们可以发送任何形式的信息,我们可以发送多达8000万条记录,他们应该在不到2小时内处理。我被要求:
验证格式(json,因为它必须与mongodb兼容)。
验证发送的某些字段。
重命名某些字段
最后两个请求将使用mongodb中存储的参数完成。所有这些都应该在假设我们不是唯一一个生成消费者的人的情况下完成,因此应该有一个“简单”的调用来调用我们的服务来进行验证。有什么想法吗?
1条答案
按热度按时间dz6r00yl1#
这通常是与Kafka流的工作。
你有“原始”输入主题,你的制作者在其中发送事件。然后streams作业从这些主题中读取数据,并将有效记录写入“干净”主题。在流中,您可以执行各种处理来检查记录或在需要时对其进行丰富。
您可能还希望将坏记录写入死信队列主题,以便检查发生这些情况的原因。
然后,您的消费者可以阅读干净的主题,以确保他们只看到经过验证的数据。
此解决方案为记录增加了一些延迟,因为它们必须在到达消费者之前进行“处理”。您还希望在kafka集群附近运行streams作业,因为根据您要验证的内容,它可能需要接收大量数据。
另请参阅使用kafka的streamsapi处理坏消息,其中详细介绍了其中的一些概念。