我正在设计一个数据管道,从读取的平面文件开始。文件中的每一行都是一条记录。
一旦加载,每个记录都将被解析、转换和丰富。这与其他记录无关。
作为最后一步,我希望基于多个记录字段的模糊匹配来删除记录。为此,我想得到2个记录的所有组合。
目前我使用sql表作为缓冲区。我的表包含了所有的记录,我把表和它自己连接起来, on
关键字不同的条件下,名称与关键字的模糊匹配 sounds like
:
CREATE TABLE temp_tblSoundsLikeName AS
SELECT DISTINCT clients1.client_name client_name1,
clients1.client_id client_id1,
clients2.client_name client_name2,
clients2.client_id client_id2,
FROM tblClients clients1
JOIN tblClients clients2
ON clients1.client_name != clients2.client_name
AND clients1.ban_id < clients2.ban_id
AND SUBSTRING_INDEX(clients2.client_name,' ',1) SOUNDS LIKE SUBSTRING_INDEX(clients1.client_name,' ',1)
temp\tblsoundslikename中的记录表示重复项,我将把它们合并到tblclients中。
我在考虑使用Kafka流,这是我过去没有用过的。当一条消息 M
(代表记录) R
)说到重复数据消除主题,我希望我的应用程序使用它,并因此生成一条包含来自的信息的消息 R
从另一条信息 R'
,在哪里 R'
是过去5小时内到达重复数据消除阶段的任何消息。这些消息包含两条消息的组合,应该被发送到另一个主题,在那里它们可以通过匹配和模糊匹配条件进行过滤,最后一个阶段是合并重复的记录,并使用kafka connect jdbc将合并的记录推送到rdbms。
我不知道如何创建所有这些信息 R
以及 R'
组合。这可能吗?这是kafka流的一个很好的用例吗?
1条答案
按热度按时间gt0wga4j1#
使用kafka的streams api进行重复数据消除的起点是
EventDeduplicationLambdaIntegrationTest.java
在https://github.com/confluentinc/kafka-streams-examples (confluent platform 3.3.0/apache kafka 0.11.0的直接链接:eventdeduplicationlambdaintegrationtest.java)。方法
isDuplicate
控制是否将新事件视为重复事件:这个
eventIdStore
是一个所谓的“状态存储”,它允许您记住过去事件中的信息,以便您可以做出“重复的是/否”决策。当一条消息m(代表记录r)到达重复数据消除主题时,我希望我的应用程序使用它,从而生成一条包含来自r和另一条消息r'的信息的消息,其中r'是过去5小时内到达重复数据消除阶段的任何消息。这些消息包含两条消息的组合,应该被发送到另一个主题,在那里它们可以通过匹配和模糊匹配条件进行过滤,最后一个阶段是合并重复的记录,并使用kafka connect jdbc将合并的记录推送到rdbms。
你有一个选择就是做“给一个新的
R
,让我们找到所有R'
消息,然后在一个步骤中进行“重复数据消除”,即在一个处理步骤中执行此操作(类似于上面的示例,使用所谓的Transformer
),而不是创建一堆新的下游消息,从而导致写放大(1 * R => N * "(R/R')"
下游消息)。状态存储可用于跟踪所有以前的消息,包括各种R'
你感兴趣的是什么时候R
到达。