从pyspark运行大量的配置单元查询

b91juud3  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(634)

我希望执行大量的配置单元查询并将结果存储在Dataframe中。
我有一个非常大的数据集,结构如下:

+-------------------+-------------------+---------+--------+--------+
|         visid_high|          visid_low|visit_num|genderid|count(1)|
+-------------------+-------------------+---------+--------+--------+
|3666627339384069624| 693073552020244687|       24|       2|      14|
|1104606287317036885|3578924774645377283|        2|       2|       8|
|3102893676414472155|4502736478394082631|        1|       2|      11|
| 811298620687176957|4311066360872821354|       17|       2|       6|
|5221837665223655432| 474971729978862555|       38|       2|       4|
+-------------------+-------------------+---------+--------+--------+

我想创建一个派生的dataframe,它使用每一行作为辅助查询的输入:

result_set = []
for session in sessions.collect()[:100]:
    query = "SELECT prop8,count(1) FROM hit_data WHERE dt = {0} AND visid_high = {1} AND visid_low = {2} AND visit_num = {3} group by prop8".format(date,session['visid_high'],session['visid_low'],session['visit_num'])
    result = hc.sql(query).collect()
    result_set.append(result)

这在一百行中可以正常工作,但会导致livy在负载较高时超时。
我试过使用map或foreach:

def f(session):
    query = "SELECT prop8,count(1) FROM hit_data WHERE dt = {0} AND visid_high = {1} AND visid_low = {2} AND visit_num = {3} group by prop8".format(date,session.visid_high,session.visid_low,session.visit_num)
    return hc.sql(query)

test = sampleRdd.map(f)

造成 PicklingError: Could not serialize object: TypeError: 'JavaPackage' object is not callable . 我从这个答案和这个答案中了解到spark上下文对象是不可序列化的。
我没有尝试先生成所有查询,然后运行批处理,因为我从这个问题了解到批处理查询不受支持。
我该怎么做?

o2rvlv0m

o2rvlv0m1#

我想要的是:
通过编写适当的联接一次性查询所有必需的数据
添加自定义列,基于使用 pyspark.sql.functions.when() 以及 df.withColumn() ,那么
使用 df.groupBy() 以及 pyspark.sql.functions.sum() 我想我还没有完全意识到spark处理Dataframe很懒。支持的工作方式是定义大型Dataframe,然后进行适当的转换。spark将尝试在最后一秒一次性执行数据检索和转换,并将其分发。我试图预先限制范围,这导致了不受支持的功能。

相关问题