如何从PySpark中的某个表中找到的多个表中获取所有数据?

tpxzln5u  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(136)

使用pyspark/SQL我有一个表(MAIN_TABLE),它包含三个列:

DATABASE_NAME
TABLE_NAME
SOURCE_TYPE

我想从主表中的数据库名和表名列中找到的实际数据库和表中获取所有数据。然而,我只想从具有SOURCE_TYPE = 'STANDARD'的表中抓取数据,其他任何数据都不应该抓取。
我需要的基本上是一个联盟的所有表的数据下找到的MAIN_TABLE其中SOURCE_TYPE = '标准',他们满足一定的条件。我试着运行,但它没有抓取在MAIN_TABLE下所有具有SOURCE_TYPE = 'STANDARD'的表下找到的数据。我看起来像是少了什么吗?

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()

# Filter the tables where SOURCE_TYPE = 'STANDARD'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'STANDARD'")

# Initialize an empty DataFrame to store the result
result_df = None

# Loop through the filtered tables
for row in config_df.collect():
    database_name = row["database_name"]
    table_name = row["table_name"]

    # Generate a dynamic SQL query to select data from the source table
    sql_query = f"""
        SELECT
            header.profile,
            attributes.id,
            header.location,
            'SOURCE_TYPE' as source_type,
            header.actionname as actionname,
            transform.date,
            header.ip,
            header.country,
            '{database_name}' as source_database_name,
            '{table_name}' as source_event_name
        FROM {database_name}.{table_name}
        """

    # Execute the SQL query and create a DataFrame
    source_data_df = spark.sql(sql_query)

    # Union the source_data_df with the result_df
    if result_df is None:
        result_df = source_data_df
    else:
        result_df = result_df.unionAll(source_data_df)

# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")

# Stop the SparkSession
spark.stop()

为了从SOURCE_TYPE = 'ACTUAL'的所有表中获取所有数据,我是否做错了什么?

mkh04yzy

mkh04yzy1#

在代码中,您根据SOURCE_TYPE = 'STANDARD'过滤MAIN_TABLE。然而,在你的解释中,你提到你想**select data from tables where SOURCE_TYPE = 'ACTUAL'**。如果是这种情况,您应该相应地调整过滤条件。
下面是基于SOURCE_TYPE = 'ACTUAL'过滤的修改后的代码,请尝试以下方式:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("InsertData").getOrCreate()

# Filter the tables where SOURCE_TYPE = 'ACTUAL'
config_df = spark.table("MAIN_TABLE").filter("SOURCE_TYPE = 'ACTUAL'")

# Initialize an empty DataFrame to store the result
result_df = None

# Loop through the filtered tables
for row in config_df.collect():
    database_name = row["database_name"]
    table_name = row["table_name"]

    # Generate a dynamic SQL query to select data from the source table
    sql_query = f"""
        SELECT
            header.profile,
            attributes.id,
            header.location,
            'SOURCE_TYPE' as source_type,
            header.actionname as actionname,
            transform.date,
            header.ip,
            header.country,
            '{database_name}' as source_database_name,
            '{table_name}' as source_event_name
        FROM {database_name}.{table_name}
        """

    # Execute the SQL query and create a DataFrame
    source_data_df = spark.sql(sql_query)

    # Union the source_data_df with the result_df
    if result_df is None:
        result_df = source_data_df
    else:
        result_df = result_df.unionAll(source_data_df)

# Insert the combined data into MAIN.NEW_RESULTED_TABLE
result_df.write.mode("append").saveAsTable("MAIN.NEW_RESULTED_TABLE")

# Stop the SparkSession
spark.stop()

确保SOURCE_TYPE条件符合您的要求,无论是 * 否'STANDARD'还是 * 否'ACTUAL'

相关问题