我不确定这个问题本身是否正确。我为sql找到的解决方案在配置单元sql中不起作用,或者禁止递归。因此,我想用pyspark解决这个问题,需要一个解决方案或者至少是一些想法,如何解决这个问题。
我有一张原始的table,看起来像这样:
+--------+----------+
|customer|nr_tickets|
+--------+----------+
| A| 3|
| B| 1|
| C| 2|
+--------+----------+
这就是我想要的table:
+--------+
|customer|
+--------+
| A|
| A|
| A|
| B|
| C|
| C|
+--------+
你有什么建议吗?
事先非常感谢!
3条答案
按热度按时间55ooxyrt1#
同时我自己也找到了解决办法:
说明:dfs“table”和“test”在开头是相同的。所以“最大票数”就是最高的“票数”。它起作用了。我只是在纠结于最大数字的格式:
我不能在for循环的范围内使用结果,因为它是一个列表。所以我手动输入最高的数字。你知道我怎样才能把最大票数转换成正确的格式,这样循环范围就可以接受吗?
谢谢
cidc1ykv2#
为了
Spark2.4+
,使用array_repeat
与explode
.jgwigjjp3#
您可以通过对行(组)进行迭代来创建新的Dataframe。
第一个列有
customer
(Row(customer=a["customer"])
)重复nr_tickets
客户使用的次数range(int(a["nr_tickets"]))
```df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
df= spark.createDataFrame(df_list)
from pyspark.sql import Row
df_list = []
for a in df.select(["customer","nr_tickets"]).collect():
df_list = df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
df= spark.createDataFrame(df_list)
df.show()
from pyspark.sql import Row
from functools import reduce #python 3
df_list = [
[Row(customer=a["customer"])]*int(a["nr_tickets"])
for a in df.select(["customer","nr_tickets"]).collect()
]
df= spark.createDataFrame(reduce(lambda x,y: x+y,df_list))
df.show()
+--------+
|customer|
+--------+
| A|
| A|
| A|
| B|
| C|
| C|
+--------+