使用sc.parallelize inside map()或任何其他解决方案?

2w3kk1z5  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(473)

我有以下问题:我需要根据a列的每个id在b列中找到所有值的组合,并将结果作为dataframe返回
在下面的输入Dataframe示例中

A     B       
0       5    10       
1       1    20      
2       1    15       
3       3    50       
4       5    14       
5       1    30       
6       1    15       
7       3    33

我需要获得以下输出Dataframe(它用于graphx\graphframe)

src dist      A
0       10   14       5
1       50   33       3
2       20   15       1
3       30   15       1
4       20   30       1

到目前为止,我认为唯一的解决办法是:

df_result = df.drop_duplicates().\
               map(lambda (A,B):(A,[B])).\
               reduceByKey(lambda p, q: p + q).\
               map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))

print df_result.take(3)

输出:[(1,[(20,15),(30,20),(30,15)],(5,[(10,14)],(3,[(50,33)]]
我被困在这里:(如何将它返回到我需要的Dataframe?一个想法是使用并行化:

import spark_sc

edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))

为了 spark_sc 我还有一个文件名为spark\u sc.py

def init():
    global sc
    global sqlContext

    sc = SparkContext(conf=conf,
                  appName="blablabla",
                  pyFiles=['my_file_with_code.py'])

    sqlContext = SQLContext(sc)

但我的代码失败了:

AttributeError: 'module' object has no attribute 'sc'

如果我使用 spark_sc.sc() 不进入 map() 它起作用了。
你知道我在最后一步错过了什么吗?有没有可能使用 parallelize() ? 或者我需要完全不同的解决方案?谢谢!

e0bqpujr

e0bqpujr1#

您肯定需要另一个解决方案,它可以简单到:

from pyspark.sql.functions import greatest, least, col

df.alias("x").join(df.alias("y"), ["A"]).select(
    least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()

哪里:

df.alias("x").join(df.alias("y"), ["A"])

通过将表与自身连接起来 A ,

least("x.B", "y.B").alias("src")

以及

greatest("x.B", "y.B")

选择具有较低值的值 id 作为源,更高的id作为目标。最后:

where(col("src") != col("dst"))

删除自循环。
一般来说,不可能使用 SparkContext 从一个动作或一个转变(在你的情况下这样做没有任何意义)。

相关问题