我是个有Spark的新手。
我可以使用以下方法查看elasticsearch数据库中第一个rdd的内容:
print(es_rdd.first())
>>>(u'1', {u'name': u'john'})
我还可以使用以下方法获取数据流所需的值:
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers})
name=kvs.map(lambda x: x[1])
name.pprint()
>>>>robert
我打算用“robert”替换rdd“name”:“john”,然后用saveasnewapihadoopfile()在elasticsearch中插入新的rdd
我该怎么做?有没有办法把“罗伯特”Map成一个新的rdd?像。。
new_rdd=es_rdd.map(lambda item: {item[0]:name})
谢谢
1条答案
按热度按时间33qvvth11#
我们可以根据索引列表用另一个rdd替换rdd的一部分。例如,将(rdd)中的元素从1,2,3,4替换为2,3,4,4。
结果表明[2,3,4,4]