我有一个具有以下模式的Dataframe:
root
|-- user_id: string (nullable = true)
|-- user_loans_arr: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- loan_date: string (nullable = true)
| | |-- loan_amount: string (nullable = true)
|-- new_loan: struct (nullable = true)
| |-- loan_date : string (nullable = true)
| |-- loan_amount : string (nullable = true)
我想使用一个自定义项,它将user\u loans\u arr和new\u loans作为输入,并将new\u loans结构添加到现有的user\u loans\u arr中。然后,从user\u loans\u arr中删除所有贷款日期早于12个月的元素。
提前谢谢。
2条答案
按热度按时间eit6fx6z1#
您需要将数组和结构列作为数组或结构传递给udf。我更喜欢把它作为struct传递。在那里,您可以操作元素并返回数组类型。
结果是
本例将所有内容都视为int,因为字符串是date,所以在udf中必须使用python的datetime函数进行比较。
ryhaxcpt2#
如果
spark >= 2.4
那么您不需要自定义项,请检查下面的示例-加载输入数据
按以下要求加工
将user\u loans\u arr和new\u loans作为输入,并将new\u loans结构添加到现有user\u loans\u arr中。然后,从user\u loans\u arr中删除贷款日期早于12个月的所有元素。
spark >= 2.4
```df.withColumn("user_loans_arr",
expr(
"""
|FILTER(array_union(user_loans_arr, array(new_loan)),
| x -> months_between(current_date(), to_date(x.loan_date)) < 12)
""".stripMargin))
.show(false)
`spark < 2.4`
// spark < 2.4
val outputSchema = df.schema("user_loans_arr").dataType