使用kafka时,我可以通过设置kafka producer的kafka.compression.codec属性来设置编解码器。
假设我在我的producer中使用snapy压缩,当使用某个kafka消费者从kafka消费消息时,我应该做些什么来解码snapy中的数据还是kafka消费者的一些内置功能?
在相关文档中,我找不到任何与kafka consumer中的编码相关的属性(它只与生产者相关)。
有人能把这个弄清楚吗?
使用kafka时,我可以通过设置kafka producer的kafka.compression.codec属性来设置编解码器。
假设我在我的producer中使用snapy压缩,当使用某个kafka消费者从kafka消费消息时,我应该做些什么来解码snapy中的数据还是kafka消费者的一些内置功能?
在相关文档中,我找不到任何与kafka consumer中的编码相关的属性(它只与生产者相关)。
有人能把这个弄清楚吗?
2条答案
按热度按时间1tu0hz3e1#
我对v0.8.1也有同样的问题,kafka中的这种压缩解压除了说消费者应该“透明地”解压它从来没有做过的压缩数据之外,没有很好的文档记录。
在kafka网站中使用consumeriterator的示例高级客户机仅适用于未压缩的数据。一旦我在producer client中启用了压缩,消息就永远不会进入下面的“while”循环。希望他们应该尽快解决这个问题,否则他们不应该声称这个功能,因为有些用户可能会使用kafka来传输需要批处理和压缩功能的大尺寸消息。
slwdgvem2#
据我所知,去压缩是由消费者自己负责的。在他们的官方维基页面上提到过
The consumer iterator transparently decompresses compressed data and only returns an uncompressed message
如本文所示,消费者的工作方式如下使用者有后台“fetcher”线程,这些线程连续地从代理中批量提取1mb的数据,并将其添加到内部阻塞队列中。使用者线程从这个阻塞队列中取出数据,解压并遍历消息
而且在doc页下也写了端到端的批量压缩
一批消息可以聚集在一起压缩并以这种形式发送到服务器。这批消息将以压缩形式写入,并将在日志中保持压缩状态,并且仅由使用者解压缩。
因此,解压部分似乎是在使用者it self中处理的,您只需使用
compression.codec
创建生产者时的producerconfig属性。我找不到任何例子或解释,说明了在消费端的任何解压方法。如果我错了,请纠正我。