pyspark 流水线RDD和普通RDD有什么区别?

dpiehjr4  于 2023-01-20  发布在  Spark
关注(0)|答案(1)|浏览(186)

现在,我尝试使用下面的代码从hdfs加载数据,并尝试使用函数“jsonParse”将值连接到普通csv,以便我认为我可以根据需要获得普通字符串RDD,但似乎此方法不起作用,当我尝试打印一些记录时,它告诉我“data”变量是不可迭代的PipelinedRDD,任何人都可以告诉我如何才能得到'数据'(正常的rdd)想要的,谢谢:

def jsonParse(x):
    s=json.loads(x)
    print "ssssssssssss"+s['age']+","+s['sex']+","+s['xueya']+","+s['danguchun']+","+s['na']+","+s['k']+","+s['yaowu']
    return s['age']+","+s['sex']+","+s['xueya']+","+s['danguchun']+","+s['na']+","+s['k']+","+s['yaowu']

conf = SparkConf()
sc = SparkContext(conf=conf)
hc = HiveContext(sc)
#json=sc.textFile('hdfs://hacluster/new')
json=hc.sql("select * from default.test_yj_200 limit 1000").toJSON()

data=json.map(jsonParse)
wtlkbnrh

wtlkbnrh1#

流水线RDD

流水线RDD操作被流水线化并发送给工作者;这段代码是从上到下执行的。2它是RDD的一个子类。

快速部署数据库

表示可并行处理的常量、已分区元素集合。

相关问题