使用pyspark对象框架比较数据块上的2个csv文件

t9eec4r0  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(125)

我有一些麻烦找到一个好的方法来比较两个csv文件使用databricks,所以这里的要点:
我有两个ddbb是supossed有完全相同的信息(剧透:他们没有),他们都有一个过程,创建4个CSV文件与信息的不同的事情,是那些我们需要比较。
我的第一个想法是做一个左反连接,以取出所有不在一个DFS中的行,然后我继续使用exceptAll,因为它似乎做同样的事情。有了这个,我收集了所有在df2上但不在df1中的行,反之亦然,这给了我太多的行来管理,所以我想知道是否有一种方法来逐行比较,并保持行之间的差异,例如,相同的名字姓氏和出生日期或类似的事情,确定一个特定的主题上的框架。
提前谢谢你们帮我解决这个问题。

fdbelqdn

fdbelqdn1#

我最终得到了这个函数,它对任何可能有同样问题的人都很有效:

  1. def find_mismatch(df1, df2, key_list):
  2. def is_mismatch(col):
  3. values = (df1[col] != df2[col])
  4. return values | (df1[col].isNull() & df2[col].isNotNull()) | (df1[col].isNotNull() & df2[col].isNull())
  5. result = df1.join(df2, key_list, "outer")
  6. # Selecting original columns from both DataFrames
  7. select_columns = [result[i] for i in key_list]
  8. for col in df1.columns:
  9. if col not in key_list:
  10. select_columns.extend([
  11. df1[col].alias(col + '_df1'),
  12. df2[col].alias(col + '_df2'),
  13. is_mismatch(col).alias(col + '_mismatch')
  14. ])
  15. result = result.select(*select_columns)
  16. mismatched_cols = result.where(
  17. reduce(lambda acc, e: acc | e, [result[c + '_mismatch'] for c in df1.columns if c not in key_list], F.lit(False))
  18. )
  19. mismatch_count = mismatched_cols.count()
  20. return mismatch_count, mismatched_cols
展开查看全部

相关问题