Kafka 流数据过滤算法[关闭]

vlf7wbxs  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(117)

已关闭,此问题为opinion-based。它目前不接受回答。
**想改善这个问题吗?**更新问题,以便editing this post可以用事实和引用来回答。

14天前关闭
Improve this question
我正在使用Kafka Streams开发一个基于Kafka的流数据应用程序。
在这里,我的应用程序将在高峰时间(约10小时)处理约400 M的事件,在这里,我需要根据一些多个过滤条件过滤这些事件,并相应地路由它们,这些条件可以是逻辑EQUALS,NOTEQUALS,IN,NOTIN
这里的问题是,我想写一个时间复杂度为O(1)的过滤器,同时寻找一些算法,我遇到了布隆过滤器,但布隆过滤器的问题是,它可能会有误报,时间复杂度也是O(H)(这里H是哈希函数的数量)。
另一种方法,我正在考虑创建HashMap或实现Cockene缓存,其中键将是过滤条件值上的置换组合的字符串值,这里我担心堆内存利用率,因为置换将随着HashMap/Cache大小不断增加
考虑以下场景:

empId : 1
deptId : 2
filter_conditions = [
    {field: id, operaor:EQ, value: 1},
    {field: deptId, operaor:IN, value:1,2,3,4}
]

谁能给我一个最好的建议

ddrv8njm

ddrv8njm1#

我不知道你为什么要看像Bloom过滤器之类的数据结构,但首先让我们谈谈你的O(1)要求。
我知道你希望你的过滤器很快,但是考虑到你是从Kafka流中拉取记录,你是从流中阅读它们并将它们从字节转换成POJO,这个操作至少需要O(|R|)每个记录的时间,|R|记录的大小(以字节为单位)。
您的过滤应用程序不可能运行得比这更快。因为你能达到的最低可能复杂度是O(|R|),然后您可以花费|R|在每个滤波操作上节省时间,并且仍然实现最低可能的最坏情况复杂度。
如果您的条件都使用您列出的过滤运算符:在,不在,平等,不平等,那么你是幸运的。您可以将任何条件列表转换为一个过滤器,并在O(|R|)时间。
编译后的过滤器应包含以下结构:

  • 必须存在的Set<String> requiredFields字段。这些都是具有EQUALS或IN条件的字段。
  • 一个Map<String, Condition> conditions,对于过滤器中提到的每个字段,它包含应用于该字段的条件。
  • 每个Condition需要一个boolean inclusiveSet<String> valuesinclusive == true表示字段的值必须在values集合中。inclusive == false表示字段的值不能在values集合中。

请注意,无论您在特定字段上有多少个条件,所有这些条件都可以组合到一个Condition对象中。
使用这种表示法,我们可以在与记录大小成比例的时间内处理每个记录,而不考虑过滤条件的数量或大小。
对于每条记录:
1.检查每个字段,看看它是否在requiredFields中。对匹配项进行计数,如果小于requiredFields.size(),则记录不能通过过滤器。
1.查找每个字段的Condition。如果有,则检查该值是否在values集合中,然后根据inclusive的值确定是否使记录失败。
请注意,我们只迭代记录中的字段,并对每个字段执行两个O(1)操作。我们没有迭代任何与条件的数量或大小有关的东西。这就是为什么我们可以在O(|R|)时间。

相关问题