我有一个应用程序(KafkaConnect),这是生成我的avro文件到S3。这个文件是压缩与avro代码“snappy”。我试图阅读他们与javascript(我不是一个非常强大的javascript开发人员,因为你将能够猜到)。
我试着用avro-js或avsc作为库来帮助我,因为我找到的大多数在线例子都引用了它们,我发现最完整和最有用的例子是here。
无论如何,我发现的大多数例子都是使用snappy version 6,它似乎与版本7(最新版本)有点不同。我注意到的一个主要问题是,它现在提供了两种解压缩方法。一种带有sync,另一种返回一个promise,但没有一种可以接收回调函数。
无论如何,我认为这不是一个问题,因为我可以让它工作,无论如何,但我最好的例子来阅读这个文件将是这样的东西(与avsc)。
const avsc = require('avsc');
const avsc = require('avsc');
const snappy = require('snappy');
const codecs = {
snappy: function (buf, cb) {
// Avro appends checksums to compressed blocks, which we skip here.
const buffer = snappy.uncompressSync(buf.slice(0, buf.length - 4));
return cb(buffer);
}
};
avsc.createFileDecoder('person-10.snappy.avro', {codecs})
.on('metadata', function (writerType) {
console.log(writerType.name);
})
.on('data', function (obj) {
console.log('on data ');
console.log('obj');
})
.on('end', function () {
console.log('end');
});
无论如何,元数据的处理工作没有问题(我可以访问完整的模式信息),但数据总是在Uncaught Error: snappy codec decompression error
中失败
我正在寻找的人,由于某种原因与avro和snappy在最新版本的工作,并设法使这项工作。
因为我真的很难理解这一点,我创建了一个官方avsc repo的分支,并试图介绍我的例子,看看它是如何工作的,但如果更有用,我可以尝试创建一个更简单的可复制的场景。
1条答案
按热度按时间nlejzf6q1#
我使用的软件包的文档已经更新,现在问题已经解决。
https://github.com/mtth/avsc/wiki/API#class-blockdecoderopts
主要是我只是在如何调用回调函数和如何将缓冲区处理为snappy上犯了错误。
这是正确的方法(如文档所示)