azure 在数据块中调用函数时出现问题

ymdaylpp  于 2023-08-07  发布在  其他
关注(0)|答案(1)|浏览(105)

我在databricks中创建了一个货币转换函数,当我调用函数时,它给出错误:
我试着遵循代码

from pyspark.sql.functions import lit
from pyspark.sql.functions import when, col
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

df_curr1 = spark.read.table("table")

def currency(from_currency,to_currency,date,rate_type):
    
    df_select = df_curr1.select(df_curr1["col0"].alias("BaseCurrency"), df_curr1["col1"].alias("TargetCurrency"),df_curr1["col2"].alias("EffectiveDate"),df_curr1["col3"].alias("ExpressInBaseCurrency"),df_curr1["col4"].alias("Ratetype"),df_curr1["col5"].alias("Rate"))

    #display(df_select)

    df_select = df_select.withColumn(
        "Rate",
        
        when(
            (df_select["ExpressInBaseCurrency"] == 2),

            1/df_select["Rate"]
        ).otherwise(df_select["Rate"]) )
    display(df_select)
    
    if from_currency == to_currency :
        rate = 1.0

    elif df_select.where(col("BaseCurrency") == from_currency).count() > 0:
        if df_select.where((df_select.BaseCurrency == from_currency ) & (df_select.TargetCurrency == to_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
            
            selected_df = df_select.filter( (df_select.BaseCurrency == from_currency ) & (df_select.TargetCurrency == to_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type) )
            selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
            selected_df = selected_df.limit(1)
            rate = selected_df.select("Rate").head()[0]
        else:
            selected_df = df_select.filter( (df_select.BaseCurrency == from_currency ) & (df_select.TargetCurrency == to_currency ) & (df_select.EffectiveDate <= date) )
            selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
            selected_df = selected_df.limit(1)
            rate = selected_df.select("Rate").head()[0]
    elif  df_select.where(col("BaseCurrency") == to_currency ).count() > 0:
        
        if df_select.where((df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
            selected_df = df_select.filter( (df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type) )

            selected_df = selected_df.limit(1)
            display(selected_df)
            rate = selected_df.select("Rate").head()[0]
            rate = 1/rate

        else:
            if df_select.where((df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) & (df_select.EffectiveDate <= date)).count()>0:
                selected_df = df_select.filter( (df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency) & (df_select.EffectiveDate <= date) )
                selected_df = selected_df.limit(1)
                display(selected_df)
                rate = selected_df.select("Rate").head()[0]
                rate = 1/rate
            else:
                selected_df = df_select.filter( (df_select.BaseCurrency == to_currency ) & (df_select.TargetCurrency == from_currency)  )
                selected_df = selected_df.limit(1)
                display(selected_df)
                rate = selected_df.select("Rate").head()[0]
                rate = 1/rate

    else:
        if df_select.where((df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
            selected_df = df_select.filter((df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type))
            selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
            selected_df = selected_df.limit(1)
            first_value = selected_df.select("Rate").head()[0]
        else:
            selected_df = df_select.filter((df_select.TargetCurrency == from_currency ) & (df_select.EffectiveDate <= date))
            selected_df = selected_df.orderBy(selected_df["EffectiveDate"].desc())
            selected_df = selected_df.limit(1)
            first_value = selected_df.select("Rate").head()[0]
        
        if df_select.where((df_select.TargetCurrency == to_currency) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type)).count() > 0:
            selected_df2 = df_select.filter((df_select.TargetCurrency == to_currency) & (df_select.EffectiveDate <= date) & (df_select.Ratetype == rate_type))
            selected_df2 = selected_df2.orderBy(selected_df2["EffectiveDate"].desc())
            selected_df2 = selected_df2.limit(1)
            second_value = selected_df2.select("Rate").head()[0]
        else:
            selected_df2 = df_select.filter((df_select.TargetCurrency == to_currency) & (df_select.EffectiveDate <= date) )
            selected_df2 = selected_df2.orderBy(selected_df2["EffectiveDate"].desc())
            selected_df2 = selected_df2.limit(1)
            second_value = selected_df2.select("Rate").head()[0]

        rate = second_value / first_value

    return rate

currency_udf = udf(currency, FloatType())
invoice_currency_col = df_fact2["TransactionCurrency"]
currency1_col = df_fact2["Currency1"]
document_date_col = df_fact2["DateofFirstContact"]
rate_type_col = df_fact2["OpportunityType"]

# Apply the currency function using withColumn
df_fact2 = df_fact2.withColumn("rate1", currency_udf(invoice_currency_col, currency1_col, document_date_col, rate_type_col))

字符串
我得到以下错误时,我执行上述代码
酸洗错误:无法序列化对象:运行时间错误:您似乎试图从广播变量、操作或转换中引用SparkContext。SparkContext只能在驱动程序上使用,不能在worker上运行的代码中使用。有关详细信息,请参阅SPARK-5063。
这里是输入数据

df_fact2
TransactionCurrency | Currency1 | DateofFirstContact | OpportunityType
AED | SAR | 2023-05-24T19:59:59.000+0000 | PUR


这里是预期输出

df_fact2
TransactionCurrency | Currency1 | DateofFirstContact | OpportunityType | rate1
AED | SAR | 2023-05-24T19:59:59.000+0000 | PUR | 1.02124


这里是df_curr1样本

col0 | col1 | col2 | col3 | col4 | col5 
AED | AUD | 2022-12-31T18:30:00.000+0000 | 2 | PUR | 2.51325
AED | SAR | 2021-01-31T19:59:55.000+0000 | 1 | PUR | 1.02124


请帮助我哪里错了

o3imoua4

o3imoua41#

首先,当你使用UDF时,你不能使用spark上下文或引用它。像这样使用广播变量并使用它们。在这里,我正在将df_curr1转换为pandas并广播它。
试试下面的代码块。

df_select = df_curr1.select(
    df_curr1["col0"].alias("BaseCurrency"),
    df_curr1["col1"].alias("TargetCurrency"),
    df_curr1["col2"].alias("EffectiveDate"),
   df_curr1["col3"].alias("ExpressInBaseCurrency"),
   df_curr1["col4"].alias("Ratetype"),
   df_curr1["col5"].alias("Rate"))
   
df_select = df_select.withColumn("Rate",when((df_select["ExpressInBaseCurrency"] == 2),1/df_select["Rate"]).otherwise(df_select["Rate"]))

字符串
接下来,广播。

fact_ent_df_data = sc.broadcast(df_select.toPandas())


改变,你的功能如下。关于Pandas。

import pandas as pd

def currency_df(from_currency, to_currency, date, rate_type):
    df_select = fact_ent_df_data.value
    
    if from_currency == to_currency:
        rate = 1.0
    elif df_select[(df_select["BaseCurrency"] == from_currency) & (df_select["TargetCurrency"] == to_currency) & (df_select["EffectiveDate"] <= date) & (df_select["Ratetype"] == rate_type)].shape[0] > 0:
        selected_df = df_select[(df_select["BaseCurrency"] == from_currency) & (df_select["TargetCurrency"] == to_currency) & (df_select["EffectiveDate"] <= date) & (df_select["Ratetype"] == rate_type)]
        selected_df = selected_df.sort_values("EffectiveDate", ascending=False)
        rate = selected_df["Rate"].iloc[0]
    elif df_select[(df_select["BaseCurrency"] == to_currency) & (df_select["TargetCurrency"] == from_currency) & (df_select["EffectiveDate"] <= date) & (df_select["Ratetype"] == rate_type)].shape[0] > 0:
        selected_df = df_select[(df_select["BaseCurrency"] == to_currency) & (df_select["TargetCurrency"] == from_currency) &(df_select["EffectiveDate"] <= date) & (df_select["Ratetype"] == rate_type)]
        selected_df = selected_df.sort_values("EffectiveDate", ascending=False)
        rate = 1 / selected_df["Rate"].iloc[0]
    else:
        from_df = df_select[(df_select["TargetCurrency"] == from_currency) & (df_select["EffectiveDate"] <= date) & (df_select["Ratetype"] == rate_type)]
        to_df = df_select[(df_select["TargetCurrency"] == to_currency) & (df_select["EffectiveDate"] <= date) & (df_select["Ratetype"] == rate_type)]
        if  not from_df.empty and  not to_df.empty:
            first_value = from_df.sort_values("EffectiveDate", ascending=False)["Rate"].iloc[0]
            second_value = to_df.sort_values("EffectiveDate", ascending=False)["Rate"].iloc[0]
            rate = second_value / first_value
        else:
            rate = None
    return  float(rate)


接下来,创建udf并调用函数。

currency_pd_udf = udf(currency_df, FloatType())
df_fact2 = df_fact2.withColumn("rate1",currency_pd_udf(invoice_currency_col, currency1_col, document_date_col, rate_type_col))
display(df_fact2)


输出量:


的数据

相关问题