对于我的poc,我使用sparksql2.4.x和kafka。我有一个来自Kafka主题的流媒体公司数据。以“公司id”、“创建日期”、“字段1”、“字段2”等作为字段的公司数据。让我们把它称为数据流。
我的Parquet文件里有公司的旧资料。i、 e.“hdfs://parquet/company,让我们说这是旧公司数据。
如果数据已经在hdfs://parquet/company“文件(旧公司数据)
怎么检查这个?
如果newcompanydatastream“field1”和oldcompanydatadf“field1”不相同,则执行tast2(即删除oldcompanydatadf记录并将newcompanydatastream“field1”记录添加到oldcompanydatadf中)
如果newcompanydatastream“field2”和oldcompanydatadf“field2”不相同,则执行tast2(即删除oldcompanydatadf记录并将newcompanydatastream“field2”记录添加到oldcompanydatadf中)
如何使用sparksql结构化流来实现这一点?
对于任何片段或建议,我都非常感激
暂无答案!
目前还没有任何答案,快来回答吧!