我在sql server中有一个人数据库,其中有地址、许可证、亲属等表,大约有20个。所有表都具有每个人唯一的id参数。这些表中有数百万条记录。我需要使用公共id参数组合这些个人记录,并将其转换为带有一些列名更改的json表文件。这个json文件然后通过生产者推送到kafka。如果我能以kafka生产者作为item writer得到这个例子的话——很好,但真正的问题是理解如何利用spring批处理item reader、processor和item writer来创建复合json文件的策略和细节。这是我的第一个spring批处理应用程序,所以我对这个比较陌生。
我希望得到关于使用复合读取器或处理器的实现策略的建议,该读取器或处理器使用personid作为游标,并使用每个表的id查询每个表,将结果记录转换为json,并将其聚合为一个复合的关系json文件,其中根表persondata提供给kafka集群。
基本上我有一个数据源,为读者提供相同的数据库。我计划使用person表获取id和其他对person唯一的记录,并使用id作为其他19个表的where子句。将表中的每个resultset转换为json,最后合成json对象并写入kafka。
3条答案
按热度按时间2j4z5cfb1#
我们进行了类似的练习,将多个表中的100mn+行作为json的一种形式进行迁移,以便将其发布到消息总线。
其思想是创建一个视图,对数据进行非规范化,并使用jdbcpagingitemreader从该视图中读取数据。从一个源读取数据的开销较小。
反规范化数据时,请确保主表中没有多行。
示例-sql server-
上面的内容将为您提供json字符串中的依赖表数据,每个主表数据对应一行。一旦检索到数据,就可以使用gson或jackson将其转换为pojo。
我们试图避免使用jdbccursoritemreader,因为它将从内存中提取所有数据,并从中逐个读取数据。它不支持分页。
sbtkgmzw2#
我们在一个项目中有这样一个需求,并用以下方法解决了它。
在并行运行的splitflow中,我们有一个step for ever表,它将表的数据加载到文件中,并按公共id排序(这是可选的,但如果文件中有数据,则测试更容易)。
然后我们实现了自己的“mergereader”。这个mergereader对每个文件/表都有flatfileitemreaders(我们称之为datareaders)。所有这些flatfileitemreaders都用一个singleitempeakeableitemreader Package 。mergereader的read方法的逻辑如下:
如果您需要重新启动功能,您可以以某种方式实现itemstream,它可以跟踪每个datareader的当前readposition。
e0bqpujr3#
我使用这里描述的基于驱动查询的itemreaders使用模式来解决这个问题。
reader:只是jdbccursoritemreader的一个默认实现,使用sql获取
唯一的关系id(例如,从person-中选择id)
processor:使用这个长id作为输入,我使用spring的jdbctemplate实现的dao通过查询每个表获取特定id的数据(例如,select*from license where id=),并将结果以列表格式Map到person的pojo,然后转换为json对象(使用jackson),然后转换为字符串
writer:如果使用kafka,可以用json字符串写出文件,也可以将json字符串发布到主题中