我在mssql中有一些表,这些表每秒钟都会更新一次,查询或多或少是这样的
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
假设selectinnerjoin查询结果为5条记录,如下所示。
如果查询第一次运行 ${lastUpdateTime}
以及 ${lastG_ID}
设置为0,将返回5条以下的记录。处理完记录后,查询将存储 max(G_ID)
i、 e.5和 max(UpdateTime)
i、 e.1512010479英寸 etl_stat
table。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
如表所示,再添加5条新记录:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
查询将首先读取 max(G_ID)
以及 max(UpdateTime)
从 etl_stat table
并将框架查询如下 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
,因此查询只返回5条delta记录,如下所示。
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
因此,每次运行查询时,它都应该首先读取 max(G_ID)
以及 max(UpdateTime)
从 etl_stat
表和框架中的selectinnerjoin查询,如上图所示,并获取增量更改。
使用sparksql的as-is体系结构
我实现了上述用例,如下所示:
1) spark jdbc读取phoenix表以获得 max(G_ID)
以及 max(UpdateTime)
从 etl_stat
table。
2) sparkjdbc将selectinnerjoin查询框架如下 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3) sparkjdbc运行步骤2内部连接查询,从mssqlserver读取delta消息,处理记录并插入hbase。
4) 成功插入hbase后,spark更新 etl_stat
最新版本的表格 G_ID
i、 e.10和 UpdateTime
i、 e.1512010500。
5) 此作业已安排为每1分钟运行一次。
使用nifi的准体系结构
我想将这个用例移到nifi,我想使用nifi从mssqldb读取记录并将这个记录发送给kafka。
在成功发布到kafka之后,nifi会在数据库中保存g\u id和updatetime。
消息到达kafka后,spark streaming将从kafka读取消息,并使用现有业务逻辑保存到hbase。
在每次运行时,nifi处理器都应该使用 max(G_ID)
以及 max(UpdateTime)
为了得到德尔塔的记录并出版给Kafka。
我是nifi/hdf的新手。我需要你的帮助和指导,以便使用nifi/hdf实现这一点。如果您对此用例有更好的解决方案/体系结构,请提出建议。
抱歉发了这么长的帖子。
1条答案
按热度按时间f45qwnt81#
您所描述的是jdbc-kafka-connect连接器开箱即用的功能。设置你的配置文件,加载它,然后开始。完成。Kafka连接是ApacheKafka的一部分。不需要额外的工具和技术。
您可能还需要考虑适当的变更数据捕获(cdc)。对于专有的rdbms(oracle、db2、mssql等),您可以使用goldengate、attunity、dbvisit等商业工具。对于开源rdbms(例如mysql、postgresql),您应该看看开源debezium工具。所有这些cdc工具都直接与Kafka集成。