我正在尝试使用node.js将文件上载到hdfs。我使用的是hdfs模块,但最终出现在hdfs上的文件是空的(stupidfile.txt不是)
var WebHDFS = require('webhdfs');
var hdfs = WebHDFS.createClient();
var fs = require('fs')
var localFilePath = "stupidfile.txt";
var remoteFilePath = "/user/cloudera/doesthiswork.txt";
var localFileStream = fs.createReadStream(localFilePath);
var remoteFileStream = hdfs.createWriteStream(remoteFilePath);
localFileStream.pipe(remoteFileStream);
console.log("opening stream to HDFS");
remoteFileStream.on('error', function onError (err) {
// Do something with the error
console.log("it failed");
console.log(err);
});
remoteFileStream.on('finish', function onFinish () {
// Upload is done
console.log("it is done!");
});
控制台输出
[cloudera@quickstart Documents]$ node hdfs-upload.js
opening stream to HDFS
it is done!
在日志下面更新到stdout和我添加console.log()的源代码;
/**
* Create writable stream for given path
*
* @example
*
* var WebHDFS = require('webhdfs');
* var hdfs = WebHDFS.createClient();
*
* var localFileStream = fs.createReadStream('/path/to/local/file');
* var remoteFileStream = hdfs.createWriteStream('/path/to/remote/file');
*
* localFileStream.pipe(remoteFileStream);
*
* remoteFileStream.on('error', function onError (err) {
* // Do something with the error
* });
*
* remoteFileStream.on('finish', function onFinish () {
* // Upload is done
* });
*
* @method createWriteStream
* @fires WebHDFS#finish
*
* @param {String} path
* @param {Boolean} [append] If set to true then append data to the file
* @param {Object} [opts]
*
* @returns {Object}
*/
WebHDFS.prototype.createWriteStream = function createWriteStream (path, append, opts) {
if (typeof append === 'object') {
opts = append;
append = false;
}
// Validate path
if (!path || typeof path !== 'string') {
throw new Error('path must be a string');
}
var endpoint = this._getOperationEndpoint(append ? 'append' : 'create', path, extend({
overwrite: true,
permissions: '0777'
}, opts));
var self = this;
var stream = null;
var params = {
method: append ? 'POST' : 'PUT',
url: endpoint,
json: true
};
var req = request(params, function (err, res, body) {
// Handle redirect only if there was not an error (e.g. res is defined)
if (res && self._isRedirect(res)) {
var upload = request(extend(params, { url: res.headers.location }), function (err, res, body) {
if (err) {
return req.emit('error', err);
} else if (self._isError(res)) {
return req.emit('error', self._parseError(body));
}
if (res.headers.hasOwnProperty('location')) {
return req.emit('finish', res.headers.location);
} else {
return req.emit('finish');
}
});
console.log(stream);
stream.pipe(upload);
stream.resume();
}
});
// Handle possible server error
req.on('data', function onError (data) {
var error = self._parseError(data.toString());
if (error) {
stream.emit('error', error);
req.emit('error', error);
}
});
req.on('error', function onError (err) {
req.emit('finish'); // Request is finished
});
req.on('pipe', function onPipe (src) {
// Unpipe initial request
src.unpipe(req);
req.end();
// Pause read stream
stream = src;
stream.pause();
});
return req;
};
退货:
{ _readableState:
{ objectMode: false,
highWaterMark: 65536,
buffer: [],
length: 0,
pipes: null,
pipesCount: 0,
flowing: true,
ended: true,
endEmitted: true,
reading: false,
sync: false,
needReadable: false,
emittedReadable: false,
readableListening: false,
defaultEncoding: 'utf8',
ranOut: false,
awaitDrain: 0,
readingMore: false,
decoder: null,
encoding: null,
resumeScheduled: false },
readable: false,
domain: null,
_events: { end: [ [Function] ] },
_maxListeners: undefined,
path: 'stupidfile.txt',
fd: null,
flags: 'r',
mode: 438,
start: undefined,
end: undefined,
autoClose: true,
pos: undefined,
destroyed: true,
closed: true }
运行webhdfs测试时,其中3个失败:
WebHDFS
✓ should make a directory
✓ should create and write data to a file (39ms)
✓ should append content to an existing file (44ms)
✓ should create and stream data to a file
✓ should append stream content to an existing file
1) should open and read a file stream
2) should open and read a file
✓ should list directory status
✓ should change file permissions
3) should change file owner
✓ should rename file
✓ should check file existence
✓ should stat file
✓ should create symbolic link
✓ should delete file
✓ should delete directory recursively
13 passing (308ms)
3 failing
1) webhdfs应打开并读取文件流:
Uncaught AssertionError: "" must equal "random datamore random data"
+ expected - actual
+random datamore random data
at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/test/webhdfs.js:77:49)
at Request.emit (events.js:104:17)
at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:627:9)
at Request.emit (events.js:107:17)
at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1057:14)
at Request.emit (events.js:129:20)
at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12)
at IncomingMessage.emit (events.js:129:20)
at _stream_readable.js:903:16
at process._tickCallback (node.js:343:11)
2) webhdfs应打开并读取文件:
Uncaught AssertionError: "" must equal "random datamore random data"
+ expected - actual
+random datamore random data
at /home/cloudera/node_modules/webhdfs/test/webhdfs.js:86:34
at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:467:26)
at Request.emit (events.js:104:17)
at Request.<anonymous> (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:627:9)
at Request.emit (events.js:107:17)
at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1057:14)
at Request.emit (events.js:129:20)
at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12)
at IncomingMessage.emit (events.js:129:20)
at _stream_readable.js:903:16
at process._tickCallback (node.js:343:11)
3) webhdfs应更改文件所有者:
Uncaught AssertionError: {} must be null
at /home/cloudera/node_modules/webhdfs/test/webhdfs.js:114:26
at /home/cloudera/node_modules/webhdfs/lib/webhdfs.js:231:24
at Request.onComplete [as _callback] (/home/cloudera/node_modules/webhdfs/lib/webhdfs.js:172:26)
at Request.self.callback (/home/cloudera/node_modules/request/request.js:123:22)
at Request.emit (events.js:110:17)
at Request.<anonymous> (/home/cloudera/node_modules/request/request.js:1047:14)
at Request.emit (events.js:129:20)
at IncomingMessage.<anonymous> (/home/cloudera/node_modules/request/request.js:998:12)
at IncomingMessage.emit (events.js:129:20)
at _stream_readable.js:903:16
at process._tickCallback (node.js:343:11)
npm ERR! Test failed. See above f
1条答案
按热度按时间fdx2calv1#
我现在觉得自己很蠢,但如果有一天有人碰到同样的问题,我会继续讲下去。
问题是节点安装。我通过git-repo安装了它,从官方网站安装了最新的稳定版本修复了这个问题。