scala—在群集模式下使用spark将文件写入本地系统

o7jaxewo  于 2021-05-29  发布在  Hadoop
关注(0)|答案(5)|浏览(653)

我知道这是使用spark的一种奇怪的方式,但是我正在尝试使用spark将Dataframe保存到本地文件系统(而不是hdfs),即使我正在使用spark cluster mode . 我知道我可以用 client mode 但我真的想跑进去 cluster mode 并且不关心应用程序将作为驱动程序在哪个节点(3个节点中的哪个节点)上运行。下面的代码是我试图做的伪代码。

// create dataframe
val df = Seq(Foo("John", "Doe"), Foo("Jane", "Doe")).toDF()
// save it to the local file system using 'file://' because it defaults to hdfs://
df.coalesce(1).rdd.saveAsTextFile(s"file://path/to/file")

这就是我提交spark申请的方式。 spark-submit --class sample.HBaseSparkRSample --master yarn-cluster hbase-spark-r-sample-assembly-1.0.jar 如果我在的话,这个很好用 local mode 但不在里面 yarn-cluster mode .
例如, java.io.IOException: Mkdirs failed to create file 与上述代码一起发生。
我改变了主意 df.coalesce(1) 分给 df.collect 并试图用普通scala保存一个文件,但结果是 Permission denied .
我也试过: spark-submitroot 用户 chown 预计起飞时间 yarn:yarn , yarn:hadoop , spark:sparkchmod 777 到相关目录
但运气不好。
我想这和 clusters , drivers and executors ,和 user 他正试图写入本地文件系统,但我自己却在解决这个问题。
我正在使用:
Spark:1.6.0-cdh5.8.2
斯卡拉:2.10.5
hadoop:2.6.0-cdh5.8.2
欢迎任何支持,并提前表示感谢。
我试过的一些文章:
“spark saveastextfile()导致为一半目录创建mkdirs失败”->尝试更改用户,但未更改任何内容
“无法将rdd作为文本文件保存到本地文件系统”-> chmod 没有帮助我

编辑(2016/11/25)

这是我得到的例外。

java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/11/24 20:24:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
oaxa6hgo

oaxa6hgo1#

如果你以 yarn-cluster mode ,驱动程序将运行在由Yarn管理的任何机器中,因此如果 saveAsTextFile 具有本地文件路径,则它将输出存储在运行驱动程序的任何计算机中。
试着运行作业 yarn-client mode 所以驱动程序在客户机上运行

pxyaymoc

pxyaymoc2#

我要回答我自己的问题,因为最终,所有的答案似乎都不能解决我的问题。尽管如此,还是要谢谢你的回答,并给我指出我可以检查的替代方案。
我认为@ricardo最接近于提到spark应用程序的用户。我查过了 whoamiProcess("whoami") 用户是 yarn . 问题可能是我试图输出到 /home/foo/work/rhbase/r/input/input.csv 尽管 /home/foo/work/rhbase 属于 yarn:yarn , /home/foo 属于 foo:foo . 我没有详细检查,但这可能是原因 permission 问题。
当我击中 pwd 在我的spark应用程序中 Process("pwd") ,it输出 /yarn/path/to/somewhere . 所以我决定把我的文件输出到 /yarn/input.csv 尽管如此,它还是取得了成功 cluster mode .
我可能可以得出结论,这只是一个简单的许可问题。任何进一步的解决方案都是值得欢迎的,但现在,这就是我解决这个问题的方式。

mrphzbgm

mrphzbgm3#

检查您是否正在尝试使用spark服务以外的用户运行/写入文件。在这种情况下,可以通过预设目录acl来解决权限问题。例子:

setfacl -d -m group:spark:rwx /path/to/

(将“spark”修改为尝试写入文件的用户组)

ruarlubt

ruarlubt4#

请参阅spark文档以了解 --master 中的选项 spark-submit . --master local 应该在本地运行时使用。 --master yarn --deploy-mode cluster 应该是在实际运行在Yarn簇上时使用的。
参考这个和这个。

3j86kqsm

3j86kqsm5#

使用foreachpartition方法,然后为每个分区获取文件系统对象,并将一条记录逐一写入其中,下面是我在这里向hdfs编写的示例代码,您也可以使用本地文件系统

Dataset<String> ds=....

ds.toJavaRdd.foreachPartition(new VoidFunction<Iterator<String>>() {
    @Override
    public void call(Iterator<String> iterator) throws Exception {

    final FileSystem hdfsFileSystem = FileSystem.get(URI.create(finalOutPathLocation), hadoopConf);

    final FSDataOutputStream fsDataOutPutStream = hdfsFileSystem.exists(finalOutPath)
            ? hdfsFileSystem.append(finalOutPath) : hdfsFileSystem.create(finalOutPath);

    long processedRecCtr = 0;
    long failedRecsCtr = 0;

    while (iterator.hasNext()) {

        try {
            fsDataOutPutStream.writeUTF(iterator.next);
        } catch (Exception e) {
            failedRecsCtr++;
        }
        if (processedRecCtr % 3000 == 0) {
            LOGGER.info("Flushing Records");
            fsDataOutPutStream.flush();
        }
    }

    fsDataOutPutStream.close();
        }
});

相关问题