我想设计一个简单的数据管道,就像S3中的文件被增量加载到PostgreSQL表中的表中。加载数据的逻辑,可能是一个简单的复制命令,或者是一个python脚本,将csv导入dataframe并加载,或者是一个aws胶水作业,等等。每天在某个特定的时间执行数据管道,并将数据从csv文件加载到数据库表中。如果在某一天,由于一些源系统问题或其他原因,csv文件中出现了额外的列,该怎么办?我该怎么处理?
如果它是一个删除表和加载逻辑,我想我可以提取列名和动态创建的表与列名匹配。但这是一个增量负荷。想探索不同的方法来解决这个问题。
1条答案
按热度按时间jyztefdp1#
常见的数据仓库解决方案是将传入的行作为JSON对象存储在staging表中,其中一列包含作为JSON对象的数据行,另一列包含唯一键。附加行可能包括关于行的元数据,诸如源和接收行时的时间戳。
然后将对staging表运行一个查询,以重新创建具有列名和类型转换的列。然后,该查询可以引入各种质量检查。
这样你就可以有一个设置,如果在数据以JSON形式着陆时添加或删除一个列,然后检查读取质量,你的下游系统不会中断。