如何在pyspark中使用不同的输入参数多次调用函数

ovfsdjhp  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(115)

我有一个函数f(df, index),它接受一个pyspark对象框架df和一个索引作为输入参数,并返回一个带有附加列的对象框架。我有多个索引值,我想运行函数并存储输出的数组。比如index_all = df.select('index').distinct()。现在我想对index_all中的所有值运行函数,所以:

total_results = []
for index in index_all

   result = f(df, index)
   
   total_results = total_results.append(result)

如何尽可能高效地使用pyspark框架?

mf98qq94

mf98qq941#

一个来自其他答案的示例解决方案。

import sys
import threading
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from time import sleep
from multiprocessing.pool import ThreadPool

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    ["01001", 1288770200],
    ["01002", 1278770000],
    ["01003", 1258780201],
    ["01003", 1238750044],
    ["01001", 1288770243],
    ["01001", 1278775030],
    ["01001", 1258780281],
    ["01003", 1238753044],
    ["01005", 1278770000],
    ["01006", 1258780201],
    ["01006", 1238750044],

]

df1Columns = ["index", "value"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

columns_list = list(df1.columns)
print(columns_list)

index_list = df1.select(["index"]).distinct().rdd.map(lambda x: x["index"]).collect()
print(index_list)

def operate_on_df(df: DataFrame, given_index: int):
    count_value = df.filter(F.col("index") == given_index).count()
    return {"given_index": given_index, "count_value": count_value}

count_result_list = []

for ii in index_list:
    my_count = operate_on_df(df1, ii)
    count_result_list.append((ii, my_count))

print(count_result_list)

def task(arg_tuple):
    df = arg_tuple[0]
    ii = arg_tuple[1]
    return operate_on_df(df, ii)

def run_multiple_jobs():
    # for jj in range(40):
    #  t = threading.Thread(target=task, args=(df1, index_list[jj%2]))
    #  t.start()

    int_list = list(range(40))
    arg_list = []
    for kk in int_list:
        arg_tuple = (df1, index_list[kk%5])
        arg_list.append(arg_tuple)

    print("Argument list is as follows")
    print(arg_list)

    with ThreadPool() as pool:
        # issue tasks into the thread pool

        result = pool.map_async(task, arg_list)
        # wait for tasks to complete
        result.wait()
        # report all tasks done
        print('All tasks are done')

    return result.get()

returned_values = run_multiple_jobs()

print(returned_values)

描述性和自定义输出:

['index', 'value']
['01005', '01001', '01006', '01003', '01002']
[('01005', {'given_index': '01005', 'count_value': 1}), ('01001', {'given_index': '01001', 'count_value': 4}), ('01006', {'given_index': '01006', 'count_value': 2}), ('01003', {'given_index': '01003', 'count_value': 3}), ('01002', {'given_index': '01002', 'count_value': 1})]
Argument list is as follows
[(DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002'), (DataFrame[index: string, value: bigint], '01005'), (DataFrame[index: string, value: bigint], '01001'), (DataFrame[index: string, value: bigint], '01006'), (DataFrame[index: string, value: bigint], '01003'), (DataFrame[index: string, value: bigint], '01002')]
All tasks are done
[{'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}, {'given_index': '01005', 'count_value': 1}, {'given_index': '01001', 'count_value': 4}, {'given_index': '01006', 'count_value': 2}, {'given_index': '01003', 'count_value': 3}, {'given_index': '01002', 'count_value': 1}]

相关问题