因此,在我们的Kafka主题中,有将近100gb的数据。我们正在运行spark结构化流来获取s3中的数据当数据达到10gb时,流媒体运行良好,我们能够在s3中获得数据。但对于100gb,Kafka的数据流需要很长时间。问:spark streaming如何读取Kafka的数据?它是否从当前偏移量获取全部数据?还是按一定的批量生产?
whhtz7ly1#
spark将与其他Kafka消费者一样,分批解决消费者群体的问题。因此,它从最后消耗的偏移量中获取尽可能多的数据(基于各种kafka消费者设置)。理论上,如果您有相同数量的分区,并且提交间隔与10gb相同,那么执行100gb只需要10x的时间。你还没有说现在需要多长时间,但对一些人来说,1分钟对10分钟可能看起来像“永远”,当然。我建议您使用 kafka-consumer-groups 命令行工具与burrow或remora之类的工具结合使用。。。如果您注意到延迟的上升趋势,那么spark消耗记录的速度就不够快。为了克服这个问题,第一种选择是确保spark执行器的数量均匀地消耗所有kafka分区。您还需要确保除了在消耗和写入记录之间进行简单的过滤器和Map之外,没有进行主要的数据转换,因为这也会引入滞后。对于非spark方法,我想指出的是,汇合的s3连接器也是batch-y,因为它只会周期性地刷新到s3,但是消耗本身仍然比spark更接近实时。不过,如果堆足够大并且flush配置设置为大值,我可以验证它是否能够写入非常大的s3文件(大小为几gb)。pinterest的secor是另一个不需要手动编码的选项
kafka-consumer-groups
1条答案
按热度按时间whhtz7ly1#
spark将与其他Kafka消费者一样,分批解决消费者群体的问题。因此,它从最后消耗的偏移量中获取尽可能多的数据(基于各种kafka消费者设置)。理论上,如果您有相同数量的分区,并且提交间隔与10gb相同,那么执行100gb只需要10x的时间。你还没有说现在需要多长时间,但对一些人来说,1分钟对10分钟可能看起来像“永远”,当然。
我建议您使用
kafka-consumer-groups
命令行工具与burrow或remora之类的工具结合使用。。。如果您注意到延迟的上升趋势,那么spark消耗记录的速度就不够快。为了克服这个问题,第一种选择是确保spark执行器的数量均匀地消耗所有kafka分区。您还需要确保除了在消耗和写入记录之间进行简单的过滤器和Map之外,没有进行主要的数据转换,因为这也会引入滞后。
对于非spark方法,我想指出的是,汇合的s3连接器也是batch-y,因为它只会周期性地刷新到s3,但是消耗本身仍然比spark更接近实时。不过,如果堆足够大并且flush配置设置为大值,我可以验证它是否能够写入非常大的s3文件(大小为几gb)。
pinterest的secor是另一个不需要手动编码的选项