如何在pyspark中将两个rdd合并为一个

3lxsmp7m  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(1484)

我有两个 RDD 想要合并成一个 RDD 具体如下:

rdd_1 = ['a1', 'a2', 'a3', 'a4', 'a5', ]
rdd_2 = ['b1', 'b2', 'b3', 'b4', 'b5', ]

# concat and combine these two rdd into one

rdd = ['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

我知道我可以改变这两个 RDD 进入 DataFrame 把它粘进去 spark.sql 如下所示:

df = df.withColumn('col1_col2', concat(col('col1'), lit(' '), col('col2')))

但对于十亿级的样品来说,这是不够有效的。
所以我想知道有没有更快的办法进去 RRD 编程。

brqmpdu1

brqmpdu11#

从列表中创建RDD,然后对两个RDD进行压缩,然后使用map和join迭代这个和concat。

rd1 = sc.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
rd2 = sc.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])
rd1.zip(rd2).map(lambda x: x[0]+'_'+x[1]).collect()
rd1.zip(rd2).map(lambda x: '_'.join(x)).collect()
rd1.zip(rd2).map('_'.join).collect()

['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']
a9wyjsp7

a9wyjsp72#

我想我们应该齐心协力加入:

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()

或者没有 lambda :

rdd_1.zip(rdd_2).map('_'.join).collect()

例子:

rdd_1 = spark.sparkContext.parallelize(['a1', 'a2', 'a3', 'a4', 'a5', ])
rdd_2 = spark.sparkContext.parallelize(['b1', 'b2', 'b3', 'b4', 'b5', ])

rdd_1.zip(rdd_2).map(lambda x : '_'.join(x)).collect()
['a1_b1', 'a2_b2', 'a3_b3', 'a4_b4', 'a5_b5']

相关问题