我可以从spark程序而不是rdd编写纯文本hdfs(或本地)文件吗?

wf82jlnq  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(279)

我有一个spark程序(scala)和一个 SparkContext . 我正在写一些文件 RDDsaveAsTextFile . 在我的本地机器上,我可以使用本地文件路径,它可以与本地文件系统一起工作。在我的集群上,它与hdfs一起工作。
我还想写其他任意文件作为处理的结果。我将它们作为本地计算机上的常规文件写入,但希望它们进入集群上的hdfs。 SparkContext 似乎有一些与文件相关的方法,但它们似乎都是输入而不是输出。
我该怎么做?

gzjq41n4

gzjq41n41#

以下是对我最有效的方法(使用spark 2.0):

val path = new Path("hdfs://namenode:8020/some/folder/myfile.txt")
val conf = new Configuration(spark.sparkContext.hadoopConfiguration)
conf.setInt("dfs.blocksize", 16 * 1024 * 1024) // 16MB HDFS Block Size
val fs = path.getFileSystem(conf)
if (fs.exists(path))
    fs.delete(path, true)
val out = new BufferedOutputStream(fs.create(path)))
val txt = "Some text to output"
out.write(txt.getBytes("UTF-8"))
out.flush()
out.close()
fs.close()
cygmwpex

cygmwpex2#

多亏了marios和kostya,但是从spark将文本文件写入hdfs的步骤很少。

// Hadoop Config is accessible from SparkContext
val fs = FileSystem.get(sparkContext.hadoopConfiguration); 

// Output file can be created from file system.
val output = fs.create(new Path(filename));

// But BufferedOutputStream must be used to output an actual text file.
val os = BufferedOutputStream(output)

os.write("Hello World".getBytes("UTF-8"))

os.close()

请注意 FSDataOutputStream ,这是一个java序列化的对象输出流,而不是文本输出流。这个 writeUTF 方法似乎写明文,但它实际上是一种包含额外字节的二进制序列化格式。

bhmjp9jg

bhmjp9jg3#

使用hdfsapi(hadoophdfs.jar),您可以为hdfs路径创建inputstream/outputstream,并使用常规java.io类对文件进行读写。例如:

URI uri = URI.create (“hdfs://host:port/file path”);
Configuration conf = new Configuration();
FileSystem file = FileSystem.get(uri, conf);
FSDataInputStream in = file.open(new Path(uri));

此代码也可以用于本地文件(更改 hdfs://file:// ).

4ngedf3f

4ngedf3f4#

将文件写入hdfs的一种简单方法是使用sequencefiles。这里使用的是本机hadoop api,而不是spark提供的api。
下面是一个简单的片段(在scala中):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io._ 

val conf = new Configuration() // Hadoop configuration 
val sfwriter = SequenceFile.createWriter(conf,
              SequenceFile.Writer.file(new Path("hdfs://nn1.example.com/file1")),
              SequenceFile.Writer.keyClass(LongWritable.class),
              SequenceFile.Writer.valueClass(Text.class))
val lw = new LongWritable()
val txt = new Text()
lw.set(12)
text.set("hello")
sfwriter.append(lw, txt)
sfwriter.close()
...

万一你没有钥匙可以用 NullWritable.class 取而代之的是:

SequenceFile.Writer.keyClass(NullWritable.class)
sfwriter.append(NullWritable.get(), new Text("12345"));

相关问题