将相关的hive查询转换为pyspark Dataframe 转换的策略?

vsikbqxv  于 2022-11-05  发布在  Hive
关注(0)|答案(1)|浏览(161)

我需要将下面的SQL查询转换为pyspark Dataframe 转换。在select子句中定义了一个相关的子查询。有没有什么方法可以将其转换为pyspark Dataframe 转换?如果您可以分享有关此的文章,将不胜感激。
注:acc_cap表也是在time列上使用滞后窗口函数添加prev_time列后从test_db.test_table创建的。
查询---

SELECT
   A.id,
   "psmark" fid,
   (
      SELECT distinct psmark 
      from test_db.test_table 
      where id = A.id and time = A.prev_time and rnk=1
   )
   AS fromvalue,
FROM acc_cap A;
ycggw6v2

ycggw6v21#

我使用了两个示例 Dataframe acc_cap_df和test_table_df
下面是查询的pyspark等效代码

>>> acc_cap_df=spark.read.csv("/path to/sample2.csv",header=True)
>>> acc_cap_df.show()
+---+-------+
| id|   time|
+---+-------+
|003| 174256|
|003| 174267|
|003|  17429|
|003|1747567|
|001|     10|
|001|     10|
|004|  12719|
|002|     11|
|002|    117|
|002|  11878|
+---+-------+

>>> from pyspark.sql.functions import rank

>>> acc_cap_df=acc_cap_df.withColumn("rank",rank().over(windowSpec))

>>> acc_cap_df.show()

+---+-------+----+
| id|   time|rank|
+---+-------+----+
|003| 174256|   1|
|003| 174267|   2|
|003|  17429|   3|
|003|1747567|   4|
|001|     10|   1|
|001|     10|   1|
|004|  12719|   1|
|002|     11|   1|
|002|    117|   2|
|002|  11878|   3|
+---+-------+----+

>>> acc_cap_df=acc_cap_df.withColumn("prev_time",lag("time",2).over(windowSpec))

>>> acc_cap_df.show()

+---+-------+----+---------+
| id|   time|rank|prev_time|
+---+-------+----+---------+
|003| 174256|   1|     null|
|003| 174267|   2|   174256|
|003|  17429|   3|   174267|
|003|1747567|   4|    17429|
|001|     10|   1|     null|
|001|     10|   1|       10|
|004|  12719|   1|     null|
|002|     11|   1|     null|
|002|    117|   2|       11|
|002|  11878|   3|      117|
+---+-------+----+---------+

>>> test_table_df=spark.read.csv("/path to/sample1.csv",header=True)
>>> test_table_df.show()
+--------+---+----+
|  psmark| id|time|
+--------+---+----+
|  csvvsw|001|  10|
|csvvswfw|002|  11|
| csvvsgg|003|  12|
| csvvser|004|  13|
+--------+---+----+

>>> acc_cap_df=acc_cap_df.withColumnRenamed("id","acc_cap_id")

>>> acc_cap_df.show()
+----------+-------+----+
|acc_cap_id|   time|rank|
+----------+-------+----+
|       003| 174256|   1|
|       003| 174267|   2|
|       003|  17429|   3|
|       003|1747567|   4|
|       001|     10|   1|
|       001|     10|   1|
|       004|  12719|   1|
|       002|     11|   1|
|       002|    117|   2|
|       002|  11878|   3|
+----------+-------+----+

>>> df_join=test_table_df.join(acc_cap_df, (test_table_df.id == acc_cap_df.acc_cap_id) & (test_table_df.time == acc_cap_df.time)).filter(F.col("rank")=="1")
>>> df_join.show()
+--------+---+----+----------+----+----+
|  psmark| id|time|acc_cap_id|time|rank|
+--------+---+----+----------+----+----+
|  csvvsw|001|  10|       001|  10|   1|
|  csvvsw|001|  10|       001|  10|   1|
|csvvswfw|002|  11|       002|  11|   1|
+--------+---+----+----------+----+----+

>>> df_output=df_join.select('psmark','id').distinct()

>>> df_output=df_output.withColumnRenamed("psmark","fid")
>>> df_output.show()
+--------+---+
|     fid| id|
+--------+---+
|  csvvsw|001|
|csvvswfw|002|
+--------+---+

相关问题