我有以下问题:我需要根据a列的每个id在b列中找到所有值的组合,并将结果作为dataframe返回
在下面的输入Dataframe示例中
A B
0 5 10
1 1 20
2 1 15
3 3 50
4 5 14
5 1 30
6 1 15
7 3 33
我需要获得以下输出Dataframe(它用于graphx\graphframe)
src dist A
0 10 14 5
1 50 33 3
2 20 15 1
3 30 15 1
4 20 30 1
到目前为止,我认为唯一的解决办法是:
df_result = df.drop_duplicates().\
map(lambda (A,B):(A,[B])).\
reduceByKey(lambda p, q: p + q).\
map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))
print df_result.take(3)
输出:[(1,[(20,15),(30,20),(30,15)],(5,[(10,14)],(3,[(50,33)]]
我被困在这里:(如何将它返回到我需要的Dataframe?一个想法是使用并行化:
import spark_sc
edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))
为了 spark_sc
我还有一个文件名为spark\u sc.py
def init():
global sc
global sqlContext
sc = SparkContext(conf=conf,
appName="blablabla",
pyFiles=['my_file_with_code.py'])
sqlContext = SQLContext(sc)
但我的代码失败了:
AttributeError: 'module' object has no attribute 'sc'
如果我使用 spark_sc.sc()
不进入 map()
它起作用了。
你知道我在最后一步错过了什么吗?有没有可能使用 parallelize()
? 或者我需要完全不同的解决方案?谢谢!
1条答案
按热度按时间e0bqpujr1#
您肯定需要另一个解决方案,它可以简单到:
哪里:
通过将表与自身连接起来
A
,以及
选择具有较低值的值
id
作为源,更高的id作为目标。最后:删除自循环。
一般来说,不可能使用
SparkContext
从一个动作或一个转变(在你的情况下这样做没有任何意义)。