通过node.js和hdfs模块将文件上载到hdfs

ax6ht2ek  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(652)

我正在尝试使用node.js将文件上载到hdfs。我使用的是hdfs模块,但最终出现在hdfs上的文件是空的(stupidfile.txt不是)

var WebHDFS = require('webhdfs');
var hdfs = WebHDFS.createClient();
var fs = require('fs')

var localFilePath = "stupidfile.txt";
var remoteFilePath = "/user/cloudera/doesthiswork.txt";

var localFileStream = fs.createReadStream(localFilePath);
var remoteFileStream = hdfs.createWriteStream(remoteFilePath);

localFileStream.pipe(remoteFileStream);

console.log("opening stream to HDFS");

remoteFileStream.on('error', function onError (err) {
  // Do something with the error
console.log("it failed");
console.log(err);
});

remoteFileStream.on('finish', function onFinish () {
  // Upload is done
console.log("it is done!");
});

控制台输出

[cloudera@quickstart Documents]$ node hdfs-upload.js
opening stream to HDFS
it is done!

在日志下面更新到stdout和我添加console.log()的源代码;

/**
 * Create writable stream for given path
 *
 * @example
 *
 * var WebHDFS = require('webhdfs');
 * var hdfs = WebHDFS.createClient();
 *
 * var localFileStream = fs.createReadStream('/path/to/local/file');
 * var remoteFileStream = hdfs.createWriteStream('/path/to/remote/file');
 *
 * localFileStream.pipe(remoteFileStream);
 *
 * remoteFileStream.on('error', function onError (err) {
 *   // Do something with the error
 * });
 *
 * remoteFileStream.on('finish', function onFinish () {
 *  // Upload is done
 * });
 *
 * @method createWriteStream
 * @fires WebHDFS#finish
 *
 * @param {String} path
 * @param {Boolean} [append] If set to true then append data to the file
 * @param {Object} [opts]
 *
 * @returns {Object}
 */
WebHDFS.prototype.createWriteStream = function createWriteStream (path, append, opts) {
  if (typeof append === 'object') {
    opts = append;
    append = false;
  }

  // Validate path
  if (!path || typeof path !== 'string') {
    throw new Error('path must be a string');
  }

  var endpoint = this._getOperationEndpoint(append ? 'append' : 'create', path, extend({
    overwrite: true,
    permissions: '0777'
  }, opts));

  var self = this;
  var stream = null;
  var params = {
    method: append ? 'POST' : 'PUT',
    url: endpoint,
    json: true
  };

  var req = request(params, function (err, res, body) {
    // Handle redirect only if there was not an error (e.g. res is defined)
    if (res && self._isRedirect(res)) {
      var upload = request(extend(params, { url: res.headers.location }), function (err, res, body) {
        if (err) {
          return req.emit('error', err);
        } else if (self._isError(res)) {
          return req.emit('error', self._parseError(body));
        }

        if (res.headers.hasOwnProperty('location')) {
          return req.emit('finish', res.headers.location);
        } else {
          return req.emit('finish');
        }
      });
console.log(stream);
      stream.pipe(upload);
      stream.resume();
    }
  });

  // Handle possible server error
  req.on('data', function onError (data) {
    var error = self._parseError(data.toString());
    if (error) {
      stream.emit('error', error);
      req.emit('error', error);
    }
  });

  req.on('error', function onError (err) {
    req.emit('finish'); // Request is finished
  });

  req.on('pipe', function onPipe (src) {
    // Unpipe initial request
    src.unpipe(req);
    req.end();

    // Pause read stream
    stream = src;
    stream.pause();
  });

  return req;
};

退货:

{ _readableState: 
   { objectMode: false,
     highWaterMark: 65536,
     buffer: [],
     length: 0,
     pipes: null,
     pipesCount: 0,
     flowing: true,
     ended: true,
     endEmitted: true,
     reading: false,
     sync: false,
     needReadable: false,
     emittedReadable: false,
     readableListening: false,
     defaultEncoding: 'utf8',
     ranOut: false,
     awaitDrain: 0,
     readingMore: false,
     decoder: null,
     encoding: null,
     resumeScheduled: false },
  readable: false,
  domain: null,
  _events: { end: [ [Function] ] },
  _maxListeners: undefined,
  path: 'stupidfile.txt',
  fd: null,
  flags: 'r',
  mode: 438,
  start: undefined,
  end: undefined,
  autoClose: true,
  pos: undefined,
  destroyed: true,
  closed: true }

运行webhdfs测试时,其中3个失败:

WebHDFS
    ✓ should make a directory
    ✓ should create and write data to a file (39ms)
    ✓ should append content to an existing file (44ms)
    ✓ should create and stream data to a file
    ✓ should append stream content to an existing file
    1) should open and read a file stream
    2) should open and read a file
    ✓ should list directory status
    ✓ should change file permissions
    3) should change file owner
    ✓ should rename file
    ✓ should check file existence
    ✓ should stat file
    ✓ should create symbolic link
    ✓ should delete file
    ✓ should delete directory recursively

  13 passing (308ms)
  3 failing

1) webhdfs应打开并读取文件流:

Uncaught AssertionError: "" must equal "random datamore random data"
  + expected - actual

  +random datamore random data

  at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/test/webhdfs.js:77:49)
  at Request.emit (events.js:104:17)
  at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:627:9)
  at Request.emit (events.js:107:17)
  at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1057:14)
  at Request.emit (events.js:129:20)
  at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12)
  at IncomingMessage.emit (events.js:129:20)
  at _stream_readable.js:903:16
  at process._tickCallback (node.js:343:11)

2) webhdfs应打开并读取文件:

Uncaught AssertionError: "" must equal "random datamore random data"
  + expected - actual

  +random datamore random data

  at /home/cloudera/node_modules/webhdfs/test/webhdfs.js:86:34
  at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:467:26)
  at Request.emit (events.js:104:17)
  at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:627:9)
  at Request.emit (events.js:107:17)
  at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1057:14)
  at Request.emit (events.js:129:20)
  at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12)
  at IncomingMessage.emit (events.js:129:20)
  at _stream_readable.js:903:16
  at process._tickCallback (node.js:343:11)

3) webhdfs应更改文件所有者:

Uncaught AssertionError: {} must be null
      at /home/cloudera/node_modules/webhdfs/test/webhdfs.js:114:26
      at /home/cloudera/node_modules/webhdfs/lib/webhdfs.js:231:24
      at Request.onComplete [as _callback] (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:172:26)
      at Request.self.callback (/home/cloudera/node_modules/request/request.js:123:22)
      at Request.emit (events.js:110:17)
      at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1047:14)
      at Request.emit (events.js:129:20)
      at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12)
      at IncomingMessage.emit (events.js:129:20)
      at _stream_readable.js:903:16
      at process._tickCallback (node.js:343:11)

npm ERR! Test failed.  See above f
fdx2calv

fdx2calv1#

我现在觉得自己很蠢,但如果有一天有人碰到同样的问题,我会继续讲下去。
问题是节点安装。我通过git-repo安装了它,从官方网站安装了最新的稳定版本修复了这个问题。

相关问题