从tar packing到webhdfs的nodejs管道错误

wsxa1bj1  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(421)

我目前正在构建一个节点应用程序,当我的服务没有运行时,它使用hadoop作为数据的长期存储。由于预期的传输量和最少的处理时间是首选,数据不会写入磁盘,而是直接通过管道传输到我打算用它做的事情。
我收到以下错误:

\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588
    src.unpipe(req);
        ^

TypeError: src.unpipe is not a function
    at Request.onPipe (\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588:9)
    at emitOne (events.js:101:20)
    at Request.emit (events.js:188:7)
    at Pack.Stream.pipe (stream.js:103:8)
    at Object.hadoop.putServer (\nodejs_host\hadoop.js:37:29)
    at Object.<anonymous> (\nodejs_host\hadoop.js:39:8)
    at Module._compile (module.js:541:32)
    at Object.Module._extensions..js (module.js:550:10)
    at Module.load (module.js:458:32)
    at tryModuleLoad (module.js:417:12)

我的代码基于以下文档:
https://github.com/npm/node-tar/blob/master/examples/packer.jshttps://github.com/harrisiirak/webhdfs/blob/master/readme.md(写入远程文件)
这是我写的代码:

var webhdfs = require('webhdfs');
var fs = require('fs');
var tar = require('tar');
var fstream = require('fstream');

var hdfs = webhdfs.createClient({
    path: '/webhdfs/v1',
    // private
});

var hadoop = {}

hadoop.putServer = function(userid, svcid, serverDirectory, callback){  
    var readStream = fstream.Reader({path: serverDirectory, type: 'Directory'})
    var writeStream = hdfs.createWriteStream('/services/' + userid + '/' + svcid + '.tar')
    var packer = tar.Pack({noProprietary: true})

    packer.on('error', function(){console.error(err), callback(err, false)})
    readStream.on('error', function(){console.error(err), callback(err, false)})
    writeStream.on('error', function(){console.error(err), callback(err, false)})
    writeStream.on('finish', function(){callback(null, true)})

    readStream.pipe(packer).pipe(writeStream);
}
hadoop.putServer('1', '1', 'C:/test', function(){console.log('hadoop.putServer test done')});

文件显示这应该是可行的,有人能告诉我我做错了什么吗?
在lib干过一番\webhdfs:588 here

req.on('pipe', function onPipe (src) {
// Pause read stream
stream = src;
stream.pause();

// This is not an elegant solution but here we go
// Basically we don't allow pipe() method to resume reading input
// and set internal _readableState.flowing to false
canResume = false;
stream.on('resume', function () {
  if (!canResume) {
    stream._readableState.flowing = false;
  }
});

// Unpipe initial request
src.unpipe(req); // <-- Line 588
req.end();
});
zed5wv10

zed5wv101#

好吧,我在github页面上查看了这些模块的问题,发现有人提到放弃tar包来使用tarfs。给了它一个机会,并立即工作:)
因此,如果有人对webhdfs和tar有相关的问题,请看tar-fshttps://github.com/mafintosh/tar-fs

相关问题