每当我试图将消费者对象作为参数传递给一个提交给dask的函数时,我得到了一个pickle错误。我使用confluent_kafka来创建消费者,但我相信使用kafka-python时也会发生同样的情况。有什么方法可以解决这个问题吗?谢谢。
dw1jzc5e1#
你可能会对streamz感兴趣,它集成了Kafka和dask。您可能会对RapidsAI的this blog感兴趣,它展示了在GPU的帮助下每秒可以处理多少Kafka事件。如果不使用streamz,则需要在每个worker上重新创建客户端,或者作为某个全局对象,或者在每个任务内(后者会产生开销)。
1条答案
按热度按时间dw1jzc5e1#
你可能会对streamz感兴趣,它集成了Kafka和dask。
您可能会对RapidsAI的this blog感兴趣,它展示了在GPU的帮助下每秒可以处理多少Kafka事件。
如果不使用streamz,则需要在每个worker上重新创建客户端,或者作为某个全局对象,或者在每个任务内(后者会产生开销)。