hadoop hdfs:读取正在写入的序列文件

ui7jx7zq  于 2021-06-04  发布在  Hadoop
关注(0)|答案(4)|浏览(451)

我正在使用hadoop1.0.3。
我将一个hadoop序列文件中的日志写入hdfs,在每一组日志之后调用syncfs(),但我从不关闭该文件(除非我执行每日滚动)。
我要保证的是,当文件仍在编写时,读者可以使用该文件。
我可以通过fsdatainputstream读取序列文件的字节,但是如果我尝试使用sequencefile.reader.next(key,val),它会在第一次调用时返回false。
我知道数据在文件中,因为我可以用fsdatainputstream或cat命令读取它,并且我100%确定调用了syncfs()。
我检查了namenode和datanode日志,没有错误或警告。
为什么sequencefile.reader无法读取我当前正在写入的文件?

uwopmtnx

uwopmtnx1#

我遇到了一个类似的问题,下面是我如何解决的:http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3ccaltsbby+lx6fikutgsybs5olxxzbvun0wvw_a5jbexy98hjfig@mail.gmail.com%3e

hs1ihplo

hs1ihplo2#

所以我碰到了同样的问题,经过一段时间的调查,我想出了下面的解决办法。
因此,问题是由于序列文件创建的内部实现,以及它使用的是每64mbs块更新的文件长度。
因此,我创建了以下类来创建reader,并用自己的类 Package 了hadoop fs,同时重写get length方法以返回文件长度:

public class SequenceFileUtil {

    public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {

        WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));

        return new SequenceFile.Reader(fileSystem, path, conf);
    }

    private class WrappedFileSystem extends FileSystem
    {
        private final FileSystem nestedFs;

        public WrappedFileSystem(FileSystem fs){
            this.nestedFs = fs;
        }

        @Override
        public URI getUri() {
            return nestedFs.getUri();
        }

        @Override
        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            return nestedFs.open(f,bufferSize);
        }

        @Override
        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
            return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
        }

        @Override
        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
            return nestedFs.append(f, bufferSize, progress);
        }

        @Override
        public boolean rename(Path src, Path dst) throws IOException {
            return nestedFs.rename(src, dst);
        }

        @Override
        public boolean delete(Path path) throws IOException {
            return nestedFs.delete(path);
        }

        @Override
        public boolean delete(Path f, boolean recursive) throws IOException {
            return nestedFs.delete(f, recursive);
        }

        @Override
        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
            return nestedFs.listStatus(f);
        }

        @Override
        public void setWorkingDirectory(Path new_dir) {
            nestedFs.setWorkingDirectory(new_dir);
        }

        @Override
        public Path getWorkingDirectory() {
            return nestedFs.getWorkingDirectory();
        }

        @Override
        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
            return nestedFs.mkdirs(f, permission);
        }

        @Override
        public FileStatus getFileStatus(Path f) throws IOException {
            return nestedFs.getFileStatus(f);
        }

        @Override
        public long getLength(Path f) throws IOException {

            DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
            long fileLength = open.getFileLength();
            long length = nestedFs.getLength(f);

            if (length < fileLength){
                //We might have uncompleted blocks
                return fileLength;
            }

            return length;
        }

    }
}
zysjyyx4

zysjyyx43#

您不能确保读操作完全写入数据节点端的磁盘。您可以在的文档中看到这一点 DFSClient#DFSOutputStream.sync() 其中规定:

All data is written out to datanodes. It is not guaranteed that data has
  been flushed to persistent store on the datanode. Block allocations are
  persisted on namenode.

因此它基本上用当前信息更新namenode的块Map,并将数据发送到datanode。因为您不能将数据刷新到datanode上的磁盘,但是您直接从datanode读取数据,所以您到达了一个时间段,在该时间段中,数据在某个地方被缓冲并且不可访问。因此,您的sequencefile读取器将认为数据流已完成(或为空),并且无法读取返回false到反序列化进程的其他字节。
如果块被完全接收,datanode会将数据写入磁盘(它是预先写入的,但不能从外部读取)。因此,一旦达到块大小或文件已提前关闭,就可以读取文件,从而最终确定块。这在分布式环境中是完全有意义的,因为你的作者可能会死而不能正确地完成一个块——这是一个一致性的问题。
因此,解决方法是使块的大小非常小,这样块就可以更频繁地完成。但这不是那么有效,我希望应该很清楚,你的要求不适合hdfs。

gab6jxml

gab6jxml4#

sequencefile.reader无法读取正在写入的文件的原因是它使用文件长度来执行其魔术。
在写入第一个块时,文件长度保持为0,并且仅在块已满时更新(默认为64mb)。然后文件大小被固定在64mb,直到第二个块被完全写入,依此类推。。。
这意味着您不能使用sequencefile.reader读取序列文件中最后一个不完整的块,即使原始数据可以直接使用fsinputstream读取。
关闭文件也会修复文件长度,但在我的情况下,我需要在关闭文件之前读取它们。

相关问题