spark用另一个值替换rdd字段值

nom7f22z  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(521)

我是个有Spark的新手。
我可以使用以下方法查看elasticsearch数据库中第一个rdd的内容:

print(es_rdd.first())
>>>(u'1', {u'name': u'john'})

我还可以使用以下方法获取数据流所需的值:

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers})
name=kvs.map(lambda x: x[1])
name.pprint()
>>>>robert

我打算用“robert”替换rdd“name”:“john”,然后用saveasnewapihadoopfile()在elasticsearch中插入新的rdd
我该怎么做?有没有办法把“罗伯特”Map成一个新的rdd?像。。

new_rdd=es_rdd.map(lambda item: {item[0]:name})

谢谢

33qvvth1

33qvvth11#

我们可以根据索引列表用另一个rdd替换rdd的一部分。例如,将(rdd)中的元素从1,2,3,4替换为2,3,4,4。

a = sc.parallelize([1,2,3,4])
repVals = sc.parallelize([2,3,4])
idx = sc.parallelize([0,1,2]) . # idx has the same number of values with repVals

a = a.zipWithIndex()
ref = idx.zip(repVals).collectAsMap() # create a dictionary of format {idex:repValue}

anew = a.map(lambda x:ref[x[1]] if x[1] in ref else x[0])
anew.collect()

结果表明[2,3,4,4]

相关问题