pyspark 在写入数据库时,每个批次的Spark流产生不一致/无序的结果

x9ybnkn6  于 2023-08-02  发布在  Spark
关注(0)|答案(2)|浏览(110)

**问题:**我在一个流中接收到多个表/架构数据。现在,在隔离数据之后,我为每个表打开一个并行写流。

我在forEachBatch中使用的函数是:

def writeToAurora(df, batch_id, tableName):
    df = df.persist()
    stagingTable = f'{str(tableName.lower())}_delta'
    
    df.write \
            .mode("overwrite") \
            .format("jdbc") \
            .option("truncate", "true") \
            .option("driver", DB_conf['DRIVER']) \
            .option("batchsize", 1000) \
            .option("url", DB_conf['URL']) \
            .option("dbtable", stagingTable) \
            .option("user", DB_conf['USER_ID']) \
            .option("password", DB_conf['PASSWORD']) \
            .save() 
    df.unpersist()

字符串
打开多个writestreams的逻辑是

data_df = spark.readStream.format("kinesis") \
    .option("streamName", stream_name) \
    .option("startingPosition", initial_position) \
    .load()

                 
#Distinguishing table wise df
distinctTables = ['Table1', 'Table2', 'Table3']
tablesDF = {table: data_df.filter(f"TableName = '{table}'") for table in distinctTables}

#Processing Each Table
for table, tableDF in tablesDF.items():
    df = tableDF.withColumn('csvData', F.from_csv('finalData', schema=tableSchema[table], options={'sep': '|','quote': '"'}))\
        .select('csvData.*')

    vars()[table+'_query'] = df.writeStream\
                .trigger(processingTime='120 seconds') \
                .foreachBatch(lambda fdf, batch_id: writeToAurora(fdf, batch_id, table)) \
                .option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
                .start()
                
                
for table in tablesDF.keys():
    eval(table+'_query').awaitTermination()

**问题:**现在,当运行上述代码时,有时table 1会加载到table 2中,并且每次代码运行时的顺序都不同。在 Dataframe 和它应该被加载到的表之间不保持顺序。

需要帮助了解为什么会发生这种情况。

ffscu2ro

ffscu2ro1#

这是由foreachBatch方法中lambda函数的late binding引起的。
举个例子这将尝试将所有表写入“t2”,并且失败(实际上只写入“t2”表,但写入“t0”数据:

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToTable(df, epochId, table_name):
  df.write.mode("overwrite").saveAsTable(f"custanwo.dsci.stream_test_{table_name}")

data_df = spark.readStream.format("rate").load()
data_df = (data_df
           .selectExpr("value % 10 as key")
           .groupBy("key")
           .count()
           .withColumn("t", concat(lit("t"),(col("key")%3).astype("string")))
)

table_names = ["t0", "t1", "t2"]
table_df = {t: data_df.filter(f"t = '{t}'") for t in table_names}

for t, df in table_df.items():
    vars()[f"{t}_query"] = (df
                            .writeStream
                            .foreachBatch(lambda df, epochId: writeToTable(df, epochId, t))
                            .outputMode("update")
                            .start()
                            )

字符串
要解决这个问题,有几个选择。使用partial

from functools import partial

def writeToTable(df, epochId, table_name):
  df.write.mode("overwrite").saveAsTable(f"custanwo.dsci.stream_test_{table_name}")

data_df = spark.readStream.format("rate").load()
data_df = (data_df
           .selectExpr("value % 10 as key")
           .groupBy("key")
           .count()
           .withColumn("t", concat(lit("t"),(col("key")%3).astype("string")))
) 

table_names = ["t0", "t1", "t2"]
table_df = {t: data_df.filter(f"t = '{t}'") for t in table_names}

for t, df in table_df.items():
    vars()[f"{t}_query"] = (df
                            .writeStream
                            .foreachBatch(partial(writeToTable, table_name=t))
                            .outputMode("update")
                            .start()
                            )


在代码中,将writeStream重写为:

vars()[table+'_query'] = df.writeStream\
                .trigger(processingTime='120 seconds') \
                .foreachBatch(partial(writeToAurora, tableName = table)) \
                .option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
                .start()

m1m5dgzv

m1m5dgzv2#

def writeToAurora(df, batch_id, tableName):
    df = df.withColumn("TableName", F.lit(tableName))  # Add the TableName column to the DataFrame
    df = df.persist()
    stagingTable = f'{str(tableName.lower())}_delta'
    
    df.write \
        .mode("overwrite") \
        .format("jdbc") \
        .option("truncate", "true") \
        .option("driver", DB_conf['DRIVER']) \
        .option("batchsize", 1000) \
        .option("url", DB_conf['URL']) \
        .option("dbtable", stagingTable) \
        .option("user", DB_conf['USER_ID']) \
        .option("password", DB_conf['PASSWORD']) \
        .save() 
    df.unpersist()

字符串
通过此更改,传递给writeToAurora函数的DataFramedf现在将具有一个名为“TableName”的附加列,其中包含数据所属的表的名称。writeToAurora函数然后将使用此信息将数据写入Aurora中适当的staging表。

相关问题