我在databricks中创建了一个货币转换函数,当我调用函数时,它给出错误:
我试着遵循代码
from pyspark.sql.functions import lit
from pyspark.sql.functions import when, col
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
df_curr1 = spark.read.table("table")
def currency(from_currency,to_currency,date,rate_type):
df_select = df_curr1.select(df_curr1["col0"].alias("BaseCurrency"), df_curr1["col1"].alias("TargetCurrency"),df_curr1["col2"].alias("EffectiveDate"),df_curr1["col3"].alias("ExpressInBaseCurrency"),df_curr1["col4"].alias("Ratetype"),df_curr1["col5"].alias("Rate"))
#display(df_select)
df_select = df_select.withColumn(
"Rate",
when(
(df_select["ExpressInBaseCurrency"] == 2),
1/df_select["Rate"]
).otherwise(df_select["Rate"]) )
display(df_select)
if from_currency == to_currency :
rate = 1.0
elif df_select.where(col("BaseCurrency") == from_currency).count() > 0:
if df_select.where((df_select.BaseCurrency == from_currency ) & (df_select.TargetCurrency == to_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
selected_df = df_select.filter( (df_select.BaseCurrency == from_currency ) & (df_select.TargetCurrency == to_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type) )
selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
selected_df = selected_df.limit(1)
rate = selected_df.select("Rate").head()[0]
else:
selected_df = df_select.filter( (df_select.BaseCurrency == from_currency ) & (df_select.TargetCurrency == to_currency ) & (df_select.EffectiveDate <= date) )
selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
selected_df = selected_df.limit(1)
rate = selected_df.select("Rate").head()[0]
elif df_select.where(col("BaseCurrency") == to_currency ).count() > 0:
if df_select.where((df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
selected_df = df_select.filter( (df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type) )
selected_df = selected_df.limit(1)
display(selected_df)
rate = selected_df.select("Rate").head()[0]
rate = 1/rate
else:
if df_select.where((df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) & (df_select.EffectiveDate <= date)).count()>0:
selected_df = df_select.filter( (df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) & (df_select.EffectiveDate <= date) )
selected_df = selected_df.limit(1)
display(selected_df)
rate = selected_df.select("Rate").head()[0]
rate = 1/rate
else:
selected_df = df_select.filter( (df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) )
selected_df = selected_df.limit(1)
display(selected_df)
rate = selected_df.select("Rate").head()[0]
rate = 1/rate
else:
if df_select.where((df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
selected_df = df_select.filter((df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type))
selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
selected_df = selected_df.limit(1)
first_value = selected_df.select("Rate").head()[0]
else:
selected_df = df_select.filter((df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date))
selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
selected_df = selected_df.limit(1)
first_value = selected_df.select("Rate").head()[0]
if df_select.where((df_select.TargetCurrency == to_currency) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
selected_df2 = df_select.filter((df_select.TargetCurrency == to_currency) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type))
selected_df2 = selected_df2.orderBy(selected_df2["EffectiveDate"].desc())
selected_df2 = selected_df2.limit(1)
second_value = selected_df2.select("Rate").head()[0]
else:
selected_df2 = df_select.filter((df_select.TargetCurrency == to_currency) & (df_select.EffectiveDate <= date) )
selected_df2 = selected_df2.orderBy(selected_df2["EffectiveDate"].desc())
selected_df2 = selected_df2.limit(1)
second_value = selected_df2.select("Rate").head()[0]
rate = second_value / first_value
return rate
currency_udf = udf(currency, FloatType())
invoice_currency_col = df_fact2["TransactionCurrency"]
currency1_col = df_fact2["Currency1"]
document_date_col = df_fact2["DateofFirstContact"]
rate_type_col = df_fact2["OpportunityType"]
# Apply the currency function using withColumn
df_fact2 = df_fact2.withColumn("rate1", currency_udf(invoice_currency_col, currency1_col, document_date_col, rate_type_col))
字符串
我得到以下错误时,我执行上述代码
酸洗错误:无法序列化对象:运行时间错误:您似乎试图从广播变量、操作或转换中引用SparkContext。SparkContext只能在驱动程序上使用,不能在worker上运行的代码中使用。有关详细信息,请参阅SPARK-5063。
这里是输入数据
df_fact2
TransactionCurrency | Currency1 | DateofFirstContact | OpportunityType
AED | SAR | 2023-05-24T19:59:59.000+0000 | PUR
型
这里是预期输出
df_fact2
TransactionCurrency | Currency1 | DateofFirstContact | OpportunityType | rate1
AED | SAR | 2023-05-24T19:59:59.000+0000 | PUR | 1.02124
型
这里是df_curr1样本
col0 | col1 | col2 | col3 | col4 | col5
AED | AUD | 2022-12-31T18:30:00.000+0000 | 2 | PUR | 2.51325
AED | SAR | 2021-01-31T19:59:55.000+0000 | 1 | PUR | 1.02124
型
请帮助我哪里错了
1条答案
按热度按时间o3imoua41#
首先,当你使用UDF时,你不能使用spark上下文或引用它。像这样使用广播变量并使用它们。在这里,我正在将
df_curr1
转换为pandas并广播它。试试下面的代码块。
字符串
接下来,广播。
型
改变,你的功能如下。关于Pandas。
型
接下来,创建udf并调用函数。
型
输出量:
的数据