我对Pypark的工作还不熟悉。我有一个函数,它计算查询的最大值,并插入行类型的最大值,以及其他两个值date和product name。
def findCount(query, prod_date, prod_name):
count = query.agg({"count": "max"}).collect()[0] (returns Row(max(count)=Decimal('1.0000000000')))
reopen = hc.sql('insert into details values(row_date, row_name, count)')
print(=count)
这是调用函数的代码:
for row in aggs_list:
prod_date= row.date
prod_name = row.product_name
query = prod_load.filter((col("date") == prod_date) & (col("prod_name") == row_name))
findCount(query, prod_date, prod_name)
这是我尝试过的,但不起作用。有没有更有效的方法?
1条答案
按热度按时间lymnna711#
您可能应该远离行类型,这通常意味着您已经收集了所有数据给驱动程序。如果是这样,就没有理由使用spark,因为您没有利用并行计算环境。
您可以使用spark sql完成以下任务:
max_data = spark.sql("SELECT product_name, max(count), product_date FROM table")
至于插入数据库(我猜您使用的是hc
大多数人会每天运行作业,并将结果写入一个日期分区表,如下所示:第一个寄存器临时配置单元表
max_data.registerTempTable("md")
然后覆盖分区spark.sql("INSERT OVERWRITE new_table PARTITION(dt=product_date) SELECT * FROM md")