join在两个表上,文件名有额外的字符串,regex从文件名中删除字符串并进行连接

7eumitmz  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(318)

我有两个表,需要分别对表名和文件名应用join。问题是与表2中的文件名相比,表名有一些额外的字符串。
使用正则表达式,如何从表\u name中删除额外的字符串以使其与表2的文件\u name兼容?

  1. TABLE 1:
  2. table_name audit_record_count
  3. Immunology_COVID-19_Treatment_202006221630_01.csv 1260124
  4. Immunology_COVID-19_Trial_Design_202006221630_01.csv 2173762
  5. Immunology_COVID-19_Planned_Treatment_202006221630_01.csv 1350135
  6. Immunology_COVID-19_Patient_Characteristic_202006221630_01.csv 2173762
  7. Immunology_COVID-19_Intervention_Type_202006221630_01.csv 2173762
  8. Immunology_COVID-19_Arm_202006221630_01.csv 4
  9. Immunology_COVID-19_Actual_Treatment_202006221630_01.csv 2173762
  10. Immunology_COVID-19_Publication_202006221630_01.csv 2173762
  11. Immunology_COVID-19_Outcome_202006221630_01.csv 2173762
  12. Immunology_COVID-19_Intervention_Type_Factor_202006221630_01.csv 2173762
  13. Immunology_COVID-19_Inclusion_Criteria_202006221630_01.csv 2173762
  14. Immunology_COVID-19_Curation_202006221630_01.csv 2173762
  15. TABLE 2:
  16. file_name csv_record_count
  17. Treatment 1260124
  18. Trial_Design 2173762
  19. Planned_Treatment 1350135
  20. Patient_Characteristic 2173762
  21. Intervention_Type 2173762
  22. Arm 4
  23. Actual_Treatment 2173762
  24. Publication 2173762
  25. Outcome 2173762
  26. Intervention_Type_Factor 2173762
  27. Inclusion_Criteria 2173762
  28. Curation 2173762

我尝试过:

  1. audit_file_df = spark.read.csv(
  2. f"s3://{config['raw_bucket']}/{config['landing_directory']}/{config['audit_file']}/{watermark_timestamp}*.csv",
  3. header=False, inferSchema=True) \
  4. .withColumnRenamed("_c0", "table_name").withColumnRenamed("_c1", "audit_record_count")\
  5. .selectExpr("regexp_extract(table_name, '^(.(?!(\\\\d{12}_\\\\d{2,4}.csv|\\\\d{12}.csv)))*', 0) AS table_name",'audit_record_count')
  6. print("audit_file_df :",audit_file_df)
  7. audit_file_df.show()
  8. validation_df = audit_file_df.join(schema_validation_df, how='inner', on=audit_file_df['table_name'] == schema_validation_df['file_name']).withColumn("count_match",
  9. col=col(
  10. 'audit_record_count') == col(
  11. 'csv_record_count'))
  12. print("Record validation result")
  13. validation_df.show()

我可以从表\u name中删除时间戳,但无法提取文件\u name以使连接条件工作。

加法

免疫学\u covid-19未修复它可能会更改为另一个文件,表\u name的格式为:

  1. TA_Indication_data_timestamp_nn.csv
rqenqsqc

rqenqsqc1#

在表1中创建一个包含 data 零件:

  1. df = df.withColumn('data', F.regexp_extract(F.col('table_name'), '.*?_.*?_(.*)_\d{12}_\d{2}\.csv', 1))

给予

  1. +----------------------------------------------------------------+---------+------------------------+
  2. |table_name |audit_rec|data |
  3. +----------------------------------------------------------------+---------+------------------------+
  4. |Immunology_COVID-19_Treatment_202006221630_01.csv |1260124 |Treatment |
  5. |Immunology_COVID-19_Trial_Design_202006221630_01.csv |2173762 |Trial_Design |
  6. |Immunology_COVID-19_Planned_Treatment_202006221630_01.csv |1350135 |Planned_Treatment |
  7. |Immunology_COVID-19_Patient_Characteristic_202006221630_01.csv |2173762 |Patient_Characteristic |
  8. |Immunology_COVID-19_Intervention_Type_202006221630_01.csv |2173762 |Intervention_Type |
  9. |Immunology_COVID-19_Arm_202006221630_01.csv |4 |Arm |
  10. |Immunology_COVID-19_Actual_Treatment_202006221630_01.csv |2173762 |Actual_Treatment |
  11. |Immunology_COVID-19_Publication_202006221630_01.csv |2173762 |Publication |
  12. |Immunology_COVID-19_Outcome_202006221630_01.csv |2173762 |Outcome |
  13. |Immunology_COVID-19_Intervention_Type_Factor_202006221630_01.csv|2173762 |Intervention_Type_Factor|
  14. |Immunology_COVID-19_Inclusion_Criteria_202006221630_01.csv |2173762 |Inclusion_Criteria |
  15. |Immunology_COVID-19_Curation_202006221630_01.csv |2173762 |Curation |
  16. +----------------------------------------------------------------+---------+------------------------+

然后可以使用 table1.data 以及 table2.file_name 继续你在问题中已经给出的审计检查。
regexp的棘手部分是使用非贪婪限定符,例如 data 部件本身可以包含下划线字符。

展开查看全部

相关问题