在行类型的pyspark中向配置单元表插入值

iyfjxgzm  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(375)

我对Pypark的工作还不熟悉。我有一个函数,它计算查询的最大值,并插入行类型的最大值,以及其他两个值date和product name。

def findCount(query, prod_date, prod_name):
        count = query.agg({"count": "max"}).collect()[0] (returns Row(max(count)=Decimal('1.0000000000')))
        reopen = hc.sql('insert into details values(row_date, row_name, count)')
        print(=count)

这是调用函数的代码:

for row in aggs_list:
        prod_date= row.date
        prod_name = row.product_name
        query = prod_load.filter((col("date") == prod_date) & (col("prod_name") == row_name))
        findCount(query, prod_date, prod_name)

这是我尝试过的,但不起作用。有没有更有效的方法?

lymnna71

lymnna711#

您可能应该远离行类型,这通常意味着您已经收集了所有数据给驱动程序。如果是这样,就没有理由使用spark,因为您没有利用并行计算环境。
您可以使用spark sql完成以下任务: max_data = spark.sql("SELECT product_name, max(count), product_date FROM table") 至于插入数据库(我猜您使用的是 hc 大多数人会每天运行作业,并将结果写入一个日期分区表,如下所示:
第一个寄存器临时配置单元表 max_data.registerTempTable("md") 然后覆盖分区 spark.sql("INSERT OVERWRITE new_table PARTITION(dt=product_date) SELECT * FROM md")

相关问题