pyspark 如何进行两个或两个以上的因子联接?

lsmepo6l  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(130)
  1. test_policies_rl_data = [
  2. (1, 'A', 0.5, 0.8),
  3. (2, 'B', 0.6, 0.9),
  4. (3, 'C', 0.7, 1.0)
  5. ]
  6. rates_table_data = [
  7. (1, 'A', 0.5, 0.8, null, 10, 0.5, 1, 100),
  8. (2, 'B', 0.6, 0.9, null, 20, 0.6, 2, 200),
  9. (3, 'C', 0.7, 1.0, null, 30, 0.7, 3, 300)
  10. ]
  11. test_policies_rl = spark.createDataFrame(test_policies_rl_data, ['version', 'sub_class', 'rf_1', 'rl_1'])
  12. rates_table = spark.createDataFrame(rates_table_data, ['version', 'cover', 'RF', 'rating_factor_1', 'rating_factor_2',
  13. 'rating_factor_amount', 'rating_factor_coefficient', 'rating_factor_level', 'rating_factor_rate'])
  14. test_policies_rl.createOrReplaceTempView("test_policies_rl")
  15. rates_table.createOrReplaceTempView("rates_table")
  16. num_columns = 3
  17. base_query = """
  18. SELECT
  19. t.*,
  20. r.rating_factor_amount AS rating_factor_amount_{},
  21. r.rating_factor_coefficient AS rating_factor_coefficient_{},
  22. r.rating_factor_level AS rating_factor_level_{},
  23. r.rating_factor_rate AS rating_factor_rate_{}
  24. FROM test_policies_rl t
  25. LEFT JOIN rates_table r
  26. ON t.version = r.version
  27. AND t.sub_class = r.cover
  28. AND t.rf_1 = r.RF
  29. AND t.rl_1 = r.rating_factor_1
  30. """
  31. for i in range(1, num_columns + 1):
  32. query = base_query.format(i, i, i, i)
  33. policies_coeff = spark.sql(query)
  34. policies_coeff.show()

字符串
如果我在test_policies_rl_data中有多个条件要加入eahc评级因子,该怎么办?
首先,rates_table是一个表的表,其中我可以使用以下命令访问每个评级因子rates:
rates_table价格表
所以当访问这些表时,
只有rating_factor_1列在费率表中被填充.但是,如果rating_factor_2列也被填充,我将不得不匹配它与test_policies_rl_data中的列之一,以获得正确的rating_factor_amount,系数和费率?
范例:

  1. test_policies_rl_data = [
  2. (1, 'A','private', 0.5, 0.8),
  3. (2, 'B', 'business',0.6, 0.9),
  4. (3, 'C', 'private',0.7, 1.0)
  5. ]
  6. rates_table_data = [
  7. (1, 'A', 0.5, 0.8, 'private', 10, 0.5, 1, 100),
  8. (2, 'B', 0.6, 0.9, 'business', 20, 0.6, 2, 200),
  9. (3, 'C', 0.7, 1.0, 'private', 30, 0.7, 3, 300),
  10. (3, 'C', 0.7, 1.0, 'business', 30, 0.7, 3, 300)
  11. ]
  12. test_policies_rl = spark.createDataFrame(test_policies_rl_data, ['version', 'sub_class', 'use,'rf_1', 'rl_1'])
  13. rates_table = spark.createDataFrame(rates_table_data, ['version', 'cover', 'RF', 'rating_factor_1', 'rating_factor_2',
  14. 'rating_factor_amount', 'rating_factor_coefficient', 'rating_factor_level', 'rating_factor_rate'])
  15. test_policies_rl.createOrReplaceTempView("test_policies_rl")
  16. rates_table.createOrReplaceTempView("rates_table")


所以在这种情况下,
rates_table中的rating_factor_2也应该匹配test_policies中的“use”列。我不确定如何使代码识别将要发生的2因子连接?因为有时它可能是1因子连接,有时它可能是2。

nwsw7zdq

nwsw7zdq1#

我尝试了下面的例子。

  1. test_policies_rl_data = [
  2. (1, 'A', 'private', 0.5, 0.8),
  3. (2, 'B', 'business', 0.6, 0.9),
  4. (3, 'C', 'private', 0.7, 1.0)
  5. ]
  6. rates_table_data = [
  7. (1, 'A', 0.5, 0.8, 'private', 10, 0.5, 1, 100),
  8. (2, 'B', 0.6, 0.9, 'business', 20, 0.6, 2, 200),
  9. (3, 'C', 0.7, 1.0, 'private', 30, 0.7, 3, 300),
  10. (3, 'C', 0.7, 1.0, 'business', 30, 0.7, 3, 300)
  11. ]
  12. test_policies_rl = spark.createDataFrame(test_policies_rl_data, ['version', 'sub_class', 'use', 'rf_1', 'rl_1'])
  13. rates_table = spark.createDataFrame(rates_table_data, ['version', 'cover', 'RF', 'rating_factor_1', 'rating_factor_2',
  14. 'rating_factor_amount', 'rating_factor_coefficient', 'rating_factor_level', 'rating_factor_rate'])
  15. test_policies_rl.createOrReplaceTempView("test_policies_rl")
  16. rates_table.createOrReplaceTempView("rates_table")
  17. num_columns = 3
  18. base_query = """
  19. SELECT
  20. t.*,
  21. r.rating_factor_amount AS rating_factor_amount_{},
  22. r.rating_factor_coefficient AS rating_factor_coefficient_{},
  23. r.rating_factor_level AS rating_factor_level_{},
  24. r.rating_factor_rate AS rating_factor_rate_{}
  25. FROM test_policies_rl t
  26. LEFT JOIN rates_table r
  27. ON t.version = r.version
  28. AND t.sub_class = r.cover
  29. AND t.rf_1 = r.RF
  30. AND t.rl_1 = r.rating_factor_1
  31. AND (t.use = r.rating_factor_2 OR (t.use = 'business' AND r.rating_factor_2 IS NULL))
  32. """
  33. for i in range(1, num_columns + 1):
  34. query = base_query.format(i, i, i, i)
  35. policies_coeff = spark.sql(query)
  36. policies_coeff.show()

字符串

  • 我为join添加了一个额外的条件,特别是针对use和rating_factor_2列。
    此条件检查这些列中的值是否匹配,或者如果rating_factor_2null,则检查use列是否为“*business business”。
  • 查询将能够处理给定条件的单因子和双因子联接。
  • 这样,它将创建DataFrames,将它们注册为临时视图,然后对DataFrames执行左连接操作。使用查询将输出循环每次迭代的结果,根据连接中指定的多个条件显示匹配的行。
  • 查询将能够处理给定条件的单因子和双因子联接。调整括号内的逻辑以满足特定的数据要求。
展开查看全部

相关问题