pandas 有没有一种方法可以根据当前的dataframe列从不同的dataframe单元格获取值

cig3rfwq  于 2023-06-20  发布在  其他
关注(0)|答案(1)|浏览(113)

如下例所示。我正在查找操作员注销时间。
我有一个来源,与每个卡车操作员登录时间戳。
另一个源,具有卡车操作员注销时间戳(在此源中,注销时间戳与操作员无关)
下面是一个示例源代码

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as sf
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from pyspark.sql.types import (DoubleType, StringType, StructField, StructType, ArrayType, MapType, TimestampType, LongType)

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns1 = ["truck","operator","firstlogin","lead_login","firstlogin_unix","leadlogin_unix"]
data1 = [("t1","1", "2023-06-02T00:17:02.095+0000", "2023-06-02T01:57:31.868+0000",1685665022,1685671051),
    ("t1","2", "2023-06-02T01:57:31.868+0000","2023-06-02T02:25:55.484+0000",1685671051,1685672755),
    ("t1","3", "2023-06-02T02:25:55.484+0000","2023-06-02T13:56:47.373+0000",1685672755,1685714207),
    ("t1","1", "2023-06-02T13:56:47.373+0000","2023-06-02T23:53:39.829+0000",1685714207,1685750019),
    ("t1","4", "2023-06-02T23:53:39.829+0000",None,1685750019,None),
    ("t2","A", "2023-06-02T14:00:00.373+0000","2023-06-02T23:53:39.829+0000",1685714418,1685750019)]

df_op_login = spark.createDataFrame(data=data1,schema=columns1)

df_op_login = df_op_login.select(
    col('truck'),col('operator'),col('firstlogin'),col('lead_login'),
    col('firstlogin_unix').cast(LongType()),col('leadlogin_unix').cast(LongType())
)

df_op_login.show(truncate=False)

columns2 = ["truck","log_out_ts","log_out_unix_ts"]
data2 = [("t1","2023-06-02T00:02:50.242000",1685664170),
    ("t1","2023-06-02T01:50:42.436000",1685670642),
    ("t1","2023-06-02T01:53:31.231000",1685670811),
    ("t1","2023-06-02T02:00:27.855000",1685671227),
    ("t1","2023-06-02T02:46:20.058000",1685673980),
    ("t1","2023-06-02T05:57:15.370000",1685685435),
    ("t1","2023-06-02T06:06:30.526000",1685685990),
    ("t1","2023-06-02T06:15:16.062000",1685686516),
    ("t1","2023-06-02T10:26:53.795000",1685701613),    
    ("t1","2023-06-02T10:33:43.012000",1685702023),
    ("t2","2023-06-02T15:35:43.012000",1685720143)]

df_op_logout = spark.createDataFrame(data=data2,schema=columns2)

df_op_logout = df_op_logout.select(
    col('truck'),col('log_out_ts'),col('log_out_unix_ts').cast(LongType())
)

df_op_logout.show(truncate=False)

现在我创建了一个UDF来获取基于“firstlogin_unix”和“leadlogin_unix”之间的时间戳的注销时间戳。但我被错误阻止无法解析给定输入列的'firstlogin_unix':[logout_ts,logout_unix_ts,truck];

w =  Window.partitionBy("truck")

def get_logout(x, y, z):  
  df_op_logout_tmp = (
      df_op_logout
      .filter(col('truck') == x)
      .filter(col('log_out_unix_ts') >= y)
      .filter(col('log_out_unix_ts') <= z)
      .withColumn('max_ts', sf.max('log_out_unix_ts').over(w))
      .filter(sf.col('log_out_unix_ts') == sf.col('max_ts') )
  )
  if df_op_logout_tmp.count() > 0:
    return df_op_logout_tmp.collect()[0]['log_out_ts']
  else:
    return None
  
# get_logout('t1', 1685665022, 1685671051)

df_op_final = (
    df_op_login
    # .withColumn('op_logout',sf.lit(get_logout('t1', 1685665022, 1685671051)))
    .withColumn('op_logout',sf.lit(get_logout(col('truck'), col('firstlogin_unix'), col('leadlogin_unix'))))
)

display(df_op_final)

预期输出:

逻辑(遵循颜色代码):一辆卡车可以由不同的操作员操作。对于给定的卡车,我试图根据登录时间戳和新操作员登录(lead_login)找到注销时间戳

但我被错误阻止无法解析'firstlogin_unix'给定的输入列:[logout_ts,logout_unix_ts,truck];

46scxncf

46scxncf1#

这回答了你的问题吗?

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import max
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

 # Create DataFrames
df_op_login = spark.createDataFrame(data1, schema=columns1)
df_op_logout = spark.createDataFrame(data2, schema=columns2)

# Create a window
w = Window.partitionBy("truck")

# Define a UDF to get the logout timestamp
def get_logout(x, y, z):
    df_op_logout_tmp = (
    df_op_logout
    .filter(col('truck') == x)
    .filter(col('log_out_unix_ts') >= y)
    .filter(col('log_out_unix_ts') <= z)
    .withColumn('max_ts', max('log_out_unix_ts').over(w))
    .filter(col('log_out_unix_ts') == col('max_ts') )
    )
    if df_op_logout_tmp.count() > 0:
        return df_op_logout_tmp.collect()[0]['log_out_ts']
    else:
        return None

# Apply the UDF
df_op_final = (df_op_login.withColumn('op_logout', udf(get_logout, TimestampType())(col('truck'), col('firstlogin_unix'), col('leadlogin_unix'))))

# Display the results
df_op_final.show(truncate=False)

相关问题