我得到的ex未能附加文件文件是繁忙的hdfs非Map减少
我通过spark从Kafka那里拿到唱片,放在Cassandra和hdfs里 stream.map(somefunc).saveToCassandra
```
stream.map(somefunc).foreachRDD(rdd =>
fs.append.write(rdd.collect.mkstring.getBytes)
fs.close)
hdfs中的复制因子是1,我使用一个节点集群spark独立集群和两个worker
我不想 `rdd.toDF.save("append")` 因为它有很多文件。任何想法。或者可能是hdfs有办法检查,如果文件忙不忙的话另一个任务呢?
2条答案
按热度按时间mhd8tkvw1#
我不想要rdd.todf.save(“append”),因为它会生成很多文件
使用
rdd.repartition(1).toDF.save("append")
将输出文件数减少到1ars1skjm2#
这对我也不好,它使每个rdd的文件,但我想要一个大文件和小时或天
所以现在我在集群上使用try-catch-finally方案
但我认为我有例外,但它工作正常,我写10万msg到Kafka和文件上的hdfs也有,这样我控制这个,但我想,这样,如果ex,msgs不写,和fs.close