使用pyspark对象框架比较数据块上的2个csv文件

t9eec4r0  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(118)

我有一些麻烦找到一个好的方法来比较两个csv文件使用databricks,所以这里的要点:
我有两个ddbb是supossed有完全相同的信息(剧透:他们没有),他们都有一个过程,创建4个CSV文件与信息的不同的事情,是那些我们需要比较。
我的第一个想法是做一个左反连接,以取出所有不在一个DFS中的行,然后我继续使用exceptAll,因为它似乎做同样的事情。有了这个,我收集了所有在df2上但不在df1中的行,反之亦然,这给了我太多的行来管理,所以我想知道是否有一种方法来逐行比较,并保持行之间的差异,例如,相同的名字姓氏和出生日期或类似的事情,确定一个特定的主题上的框架。
提前谢谢你们帮我解决这个问题。

fdbelqdn

fdbelqdn1#

我最终得到了这个函数,它对任何可能有同样问题的人都很有效:

def find_mismatch(df1, df2, key_list):
  
  def is_mismatch(col):
    values = (df1[col] != df2[col]) 
    return values | (df1[col].isNull() & df2[col].isNotNull()) | (df1[col].isNotNull() & df2[col].isNull())
  
  result = df1.join(df2, key_list, "outer")
  
  # Selecting original columns from both DataFrames
  select_columns = [result[i] for i in key_list]
  
  for col in df1.columns:
    if col not in key_list:
      select_columns.extend([
          df1[col].alias(col + '_df1'),
          df2[col].alias(col + '_df2'),
          is_mismatch(col).alias(col + '_mismatch')
      ])
  
  result = result.select(*select_columns)
  
  mismatched_cols = result.where(
      reduce(lambda acc, e: acc | e, [result[c + '_mismatch'] for c in df1.columns if c not in key_list], F.lit(False))
  )
  mismatch_count = mismatched_cols.count()
  return mismatch_count, mismatched_cols

相关问题