使用模糊匹配重复数据消除进行流处理的最佳实践

x8diyxa7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(403)

我正在设计一个数据管道,从读取的平面文件开始。文件中的每一行都是一条记录。
一旦加载,每个记录都将被解析、转换和丰富。这与其他记录无关。
作为最后一步,我希望基于多个记录字段的模糊匹配来删除记录。为此,我想得到2个记录的所有组合。
目前我使用sql表作为缓冲区。我的表包含了所有的记录,我把表和它自己连接起来, on 关键字不同的条件下,名称与关键字的模糊匹配 sounds like :

CREATE TABLE temp_tblSoundsLikeName AS
SELECT DISTINCT clients1.client_name client_name1,
                clients1.client_id client_id1,
                clients2.client_name client_name2,
                clients2.client_id client_id2,
FROM tblClients clients1 
  JOIN tblClients clients2 
    ON clients1.client_name != clients2.client_name
       AND clients1.ban_id < clients2.ban_id
       AND SUBSTRING_INDEX(clients2.client_name,' ',1) SOUNDS LIKE SUBSTRING_INDEX(clients1.client_name,' ',1)

temp\tblsoundslikename中的记录表示重复项,我将把它们合并到tblclients中。
我在考虑使用Kafka流,这是我过去没有用过的。当一条消息 M (代表记录) R )说到重复数据消除主题,我希望我的应用程序使用它,并因此生成一条包含来自的信息的消息 R 从另一条信息 R' ,在哪里 R' 是过去5小时内到达重复数据消除阶段的任何消息。这些消息包含两条消息的组合,应该被发送到另一个主题,在那里它们可以通过匹配和模糊匹配条件进行过滤,最后一个阶段是合并重复的记录,并使用kafka connect jdbc将合并的记录推送到rdbms。
我不知道如何创建所有这些信息 R 以及 R' 组合。这可能吗?这是kafka流的一个很好的用例吗?

gt0wga4j

gt0wga4j1#

使用kafka的streams api进行重复数据消除的起点是 EventDeduplicationLambdaIntegrationTest.java 在https://github.com/confluentinc/kafka-streams-examples (confluent platform 3.3.0/apache kafka 0.11.0的直接链接:eventdeduplicationlambdaintegrationtest.java)。
方法 isDuplicate 控制是否将新事件视为重复事件:

private boolean isDuplicate(final E eventId) {
  long eventTime = context.timestamp();
  WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
      eventId,
      eventTime - leftDurationMs,
      eventTime + rightDurationMs);
  boolean isDuplicate = timeIterator.hasNext();
  timeIterator.close();
  return isDuplicate;

这个 eventIdStore 是一个所谓的“状态存储”,它允许您记住过去事件中的信息,以便您可以做出“重复的是/否”决策。
当一条消息m(代表记录r)到达重复数据消除主题时,我希望我的应用程序使用它,从而生成一条包含来自r和另一条消息r'的信息的消息,其中r'是过去5小时内到达重复数据消除阶段的任何消息。这些消息包含两条消息的组合,应该被发送到另一个主题,在那里它们可以通过匹配和模糊匹配条件进行过滤,最后一个阶段是合并重复的记录,并使用kafka connect jdbc将合并的记录推送到rdbms。
你有一个选择就是做“给一个新的 R ,让我们找到所有 R' 消息,然后在一个步骤中进行“重复数据消除”,即在一个处理步骤中执行此操作(类似于上面的示例,使用所谓的 Transformer ),而不是创建一堆新的下游消息,从而导致写放大( 1 * R => N * "(R/R')" 下游消息)。状态存储可用于跟踪所有以前的消息,包括各种 R' 你感兴趣的是什么时候 R 到达。

相关问题