如何使用循环sparksql以编程方式将记录附加到配置单元表?

dohp0rv5  于 2021-06-25  发布在  Hive
关注(0)|答案(1)|浏览(400)

我写了下面的Spark逻辑。
高级:代码循环遍历一些数据,将一些记录分批拉回来,对这些记录应用一些逻辑,并将输出附加到运行时创建的另一个表中。作业成功完成,但表为空。
详细说明:代码应该创建一个带有3个名称的sparkDataframe。对于每个名称,代码构造一个查询,使用名称作为过滤条件,对返回的数据应用一些逻辑,并将其存储在新的sparkDataframe(output\u spark\u df)中。然后,将此Dataframe转换为临时表,然后使用spark.sql将结果插入myu数据库myu results。我的\数据库。我的\结果应该有三次加载到它的数据。尽管作业已成功完成,但我的\数据库。我的\结果仍为空。
任何指导都将不胜感激。

if __name__ == "__main__":
    spark = SparkSession.builder.appName('batch_job').config("spark.kryoserializer.buffer.max", "2047mb").config("spark.sql.broadcastTimeout", "-1").config("spark.sql.autoBroadcastJoinThreshold","-1").getOrCreate()

    # Set up hive table to capture results
    #-------------------------------------
    spark.sql("DROP TABLE IF EXISTS my_database.my_results")
    spark.sql("CREATE TABLE IF NOT EXISTS my_database.my_results (field1 STRING, field2 INT) STORED AS PARQUET")

    names = spark.sql("select distinct name from my_database.my_input where name IN ('mike','jane','ryan')")

    for n in names:
        input_spark_df = spark.sql("select * from my_database.my_input where name = '{}'".format(n))
        .
        .
        .
        <APPLY LOGIC>
        .
        .
        .
        output_spark_df = <logic applied>

        # Capture output and append to pre-created hive table
        #----------------------------------------------------
        output_spark_df.registerTempTable("results")
        spark.sql("INSERT INTO TABLE my_database.my_results  SELECT * FROM results")

    spark.stop()
iklwldmw

iklwldmw1#

names 仍然是代码中的一个Dataframe,因为在Dataframe上循环会导致for循环中没有匹配的记录。
使 names 变量作为我们需要做的列表 flatMap and collect 创建一个列表,然后在列表上循环。 Fix: ```

create names list

names=spark.sql("select distinct id as id from default.i").
rdd.
flatMap(lambda z:z).
collect()

to print values in the list

for n in names:
print(n)
`Example with sample data:`

sample data

spark.sql("select distinct id as id from default.i").show()

+---+

| id|

+---+

| 1|

| 2|

| 3|

+---+

creating a list

names=spark.sql("select distinct id as id from default.i").flatMap(lambda z:z).collect()

looping over the list

for n in names:
spark.sql("select * from default.i where id = '{}'".format(n)).show()

result

+---+

| id|

+---+

| 1|

+---+

+---+

| id|

+---+

| 2|

+---+

+---+

| id|

+---+

| 3|

+---+

相关问题