Bug描述
当使用 num_workers
创建 IngestionPipeline 时,管道的工作分配如下:
with ProcessPoolExecutor(max_workers=num_workers) as p:
node_batches = self._node_batcher(
num_batches=num_workers, nodes=nodes_to_run
)
tasks = [
loop.run_in_executor(
p,
partial(
arun_transformations_wrapper,
transformations=self.transformations,
in_place=in_place,
cache=self.cache if not self.disable_cache else None,
cache_collection=cache_collection,
),
batch,
)
for batch in node_batches
]
result: List[List[BaseNode]] = await asyncio.gather(*tasks)
nodes = reduce(lambda x, y: x + y, result, [])
这里的问题是,这个部分函数需要可序列化才能在另一个线程中使用。对于一些不可序列化的转换可能会有问题,但对于我简单的解析和嵌入情况来说效果很好。问题出现在我使用了 Redis IngestionCache。缓存本身不可序列化,因此这将抛出异常。
我认为问题特别在于 _redis_client
不可序列化。
IngestionCache(collection='llama_cache', cache=<llama_index.storage.kvstore.redis.base.RedisKVStore object at 0x304725d10>, nodes_key='nodes')
我认为一个潜在的解决方法是将初始化缓存所需的参数传递给每个线程,并让每个线程重新初始化其自己的缓存对象。这将需要重写 IngestionPipeline 以接受 cache_factory 而不是对象本身。
版本
0.10.52
重现步骤
只需创建一个使用 Redis IngestionCache 的 IngestionPipeline:
embed_model = AzureOpenAIEmbedding(
model=os.environ["AZURE_EMBEDDING_MODEL"],
deployment_name=os.environ["AZURE_EMBEDDING_DEPLOYMENT"],
azure_endpoint=os.environ["AZURE_EMBEDDING_ENDPOINT"],
api_key=os.environ["AZURE_EMBEDDING_API_KEY"],
api_version=os.environ["AZURE_EMBEDDING_API_VERSION"],
)
ingestion_cache = IngestionCache(
cache=RedisKVStore(
redis_client=Redis(**redis_kwargs),
async_redis_client=AsyncRedis(**redis_kwargs),
)
)
pipeline = IngestionPipeline(
vector_store=vector_store,
docstore=document_store,
transformations=[Settings.embed_model],
cache=ingestion_cache,
)
await pipeline.arun(
documents=documents, num_workers=multiprocessing.cpu_count()
)
相关日志/回溯
- 无响应*
7条答案
按热度按时间6ju8rftf1#
这就是为什么我犹豫是否要在摄取管道中添加多线程--大量的对象对pickle不友好😅 我不确定这里的修复会是什么样子。也许如果变换和缓存在部分之外被访问。像这样(感觉有些hacky,但可能有效)
neskvpey2#
这也是我为什么在摄取管道中添加多线程时犹豫的原因——大量的对象对pickle不友好😅 我不确定这里的修复方法会是什么样子。也许如果变换和缓存是在partial之外访问的。像这样(感觉有些hacky,但可能有效)
我认为这不会起作用,因为在函数内部定义的局部函数和对象无法被pickle。😕
我认为一个潜在的解决方法是将初始化缓存所需的参数传递给每个线程,并让每个线程重新初始化其自己的缓存对象。这将需要重写IngestionPipeline,使其接受一个cache_factory而不是示例本身。我会尝试这个方法并看看结果如何。我认为这将有助于解决缓存问题,但变换将是一个完全不同的问题。
ubbxdtey3#
我觉得这会起作用...让我试试上面的方法:)
3pvhb19x4#
我感觉这实际上会起作用...让我尝试上面的方法:
无法pickle本地对象😕
8e2ybdfx5#
lol just hit that too. Hmm
zynd9foi6#
我刚刚也试了。嗯
但是即使是我提出的方案可能也不会奏效,因为缓存的种类繁多。我们实际上需要传递的不仅仅是你的
IngestionCache
的类型和kwargs,还需要传递你的IngestionCache
的KVStore
的类型和kwargs,一直到Redis
客户端。这个方案可以工作,但太混乱了。7kjnsjlb7#
我同意。我相当确定有一种笨拙的方法可以做到这一点,绕过pickling...捣鼓一些东西