在spark中并发附加到hdfs文件

5uzkadbs  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(378)

我得到的ex未能附加文件文件是繁忙的hdfs非Map减少
我通过spark从Kafka那里拿到唱片,放在Cassandra和hdfs里 stream.map(somefunc).saveToCassandra ```
stream.map(somefunc).foreachRDD(rdd =>
fs.append.write(rdd.collect.mkstring.getBytes)
fs.close)

hdfs中的复制因子是1,我使用一个节点集群spark独立集群和两个worker
我不想 `rdd.toDF.save("append")` 因为它有很多文件。任何想法。或者可能是hdfs有办法检查,如果文件忙不忙的话另一个任务呢?
mhd8tkvw

mhd8tkvw1#

我不想要rdd.todf.save(“append”),因为它会生成很多文件
使用 rdd.repartition(1).toDF.save("append") 将输出文件数减少到1

ars1skjm

ars1skjm2#

这对我也不好,它使每个rdd的文件,但我想要一个大文件和小时或天
所以现在我在集群上使用try-catch-finally方案

try {
fs.append.write(rdd.collect.mkstring.getBytes)
}
catch {
case ex: IOException => fs.wait(1000)
}
finally {
fs.close
}

但我认为我有例外,但它工作正常,我写10万msg到Kafka和文件上的hdfs也有,这样我控制这个,但我想,这样,如果ex,msgs不写,和fs.close

相关问题