llama_index [Bug]:多工作线程IngestionPipeline与缓存不兼容,

knpiaxh1  于 2个月前  发布在  其他
关注(0)|答案(7)|浏览(30)

Bug描述

当使用 num_workers 创建 IngestionPipeline 时,管道的工作分配如下:

with ProcessPoolExecutor(max_workers=num_workers) as p:
    node_batches = self._node_batcher(
        num_batches=num_workers, nodes=nodes_to_run
    )
    tasks = [
        loop.run_in_executor(
            p,
            partial(
                arun_transformations_wrapper,
                transformations=self.transformations,
                in_place=in_place,
                cache=self.cache if not self.disable_cache else None,
                cache_collection=cache_collection,
            ),
            batch,
        )
        for batch in node_batches
    ]
    result: List[List[BaseNode]] = await asyncio.gather(*tasks)
    nodes = reduce(lambda x, y: x + y, result, [])

这里的问题是,这个部分函数需要可序列化才能在另一个线程中使用。对于一些不可序列化的转换可能会有问题,但对于我简单的解析和嵌入情况来说效果很好。问题出现在我使用了 Redis IngestionCache。缓存本身不可序列化,因此这将抛出异常。
我认为问题特别在于 _redis_client 不可序列化。

IngestionCache(collection='llama_cache', cache=<llama_index.storage.kvstore.redis.base.RedisKVStore object at 0x304725d10>, nodes_key='nodes')

我认为一个潜在的解决方法是将初始化缓存所需的参数传递给每个线程,并让每个线程重新初始化其自己的缓存对象。这将需要重写 IngestionPipeline 以接受 cache_factory 而不是对象本身。

版本

0.10.52

重现步骤

只需创建一个使用 Redis IngestionCache 的 IngestionPipeline:

embed_model = AzureOpenAIEmbedding(
    model=os.environ["AZURE_EMBEDDING_MODEL"],
    deployment_name=os.environ["AZURE_EMBEDDING_DEPLOYMENT"],
    azure_endpoint=os.environ["AZURE_EMBEDDING_ENDPOINT"],
    api_key=os.environ["AZURE_EMBEDDING_API_KEY"],
    api_version=os.environ["AZURE_EMBEDDING_API_VERSION"],
)

ingestion_cache = IngestionCache(
    cache=RedisKVStore(
        redis_client=Redis(**redis_kwargs),
        async_redis_client=AsyncRedis(**redis_kwargs),
    )
)

pipeline = IngestionPipeline(
    vector_store=vector_store,
    docstore=document_store,
    transformations=[Settings.embed_model],
    cache=ingestion_cache,
)

await pipeline.arun(
    documents=documents, num_workers=multiprocessing.cpu_count()
)

相关日志/回溯

  • 无响应*
6ju8rftf

6ju8rftf1#

这就是为什么我犹豫是否要在摄取管道中添加多线程--大量的对象对pickle不友好😅 我不确定这里的修复会是什么样子。也许如果变换和缓存在部分之外被访问。像这样(感觉有些hacky,但可能有效)

def get_arun_transformations_wrapper(transformations, cache):
  def arun_transformations_wrapper(...):
    <use the transformations and cache>

  return arun_transformations_wrapper
    
...

wrapper_fn = get_arun_transformations_wrapper(self.transformations, self.cache)
tasks = [
        loop.run_in_executor(
            p,
            wrapper_fn,
            batch,
        )
        for batch in node_batches
    ]
neskvpey

neskvpey2#

这也是我为什么在摄取管道中添加多线程时犹豫的原因——大量的对象对pickle不友好😅 我不确定这里的修复方法会是什么样子。也许如果变换和缓存是在partial之外访问的。像这样(感觉有些hacky,但可能有效)

def get_arun_transformations_wrapper(transformations, cache):
  def arun_transformations_wrapper(...):
    <use the transformations and cache>

  return arun_transformations_wrapper
    
...

wrapper_fn = get_arun_transformations_wrapper(self.transformations, self.cache)
tasks = [
        loop.run_in_executor(
            p,
            wrapper_fn,
            batch,
        )
        for batch in node_batches
    ]

我认为这不会起作用,因为在函数内部定义的局部函数和对象无法被pickle。😕
我认为一个潜在的解决方法是将初始化缓存所需的参数传递给每个线程,并让每个线程重新初始化其自己的缓存对象。这将需要重写IngestionPipeline,使其接受一个cache_factory而不是示例本身。我会尝试这个方法并看看结果如何。我认为这将有助于解决缓存问题,但变换将是一个完全不同的问题。

ubbxdtey

ubbxdtey3#

我觉得这会起作用...让我试试上面的方法:)

3pvhb19x

3pvhb19x4#

我感觉这实际上会起作用...让我尝试上面的方法:

[2024-07-04T17:02:56.074Z] AttributeError: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'
[2024-07-04T17:02:56.075Z] An unexpected error occurred: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'
[2024-07-04T17:02:56.075Z] concurrent.futures.process._RemoteTraceback: 
[2024-07-04T17:02:56.075Z] """
[2024-07-04T17:02:56.075Z] Traceback (most recent call last):
[2024-07-04T17:02:56.075Z]   File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
[2024-07-04T17:02:56.075Z]     obj = _ForkingPickler.dumps(obj)
[2024-07-04T17:02:56.075Z]           ^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z]   File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
[2024-07-04T17:02:56.075Z]     cls(buf, protocol).dump(obj)
[2024-07-04T17:02:56.075Z] AttributeError: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'
[2024-07-04T17:02:56.075Z] """
[2024-07-04T17:02:56.075Z] 
[2024-07-04T17:02:56.075Z] The above exception was the direct cause of the following exception:
[2024-07-04T17:02:56.075Z] 
[2024-07-04T17:02:56.075Z] Traceback (most recent call last):
[2024-07-04T17:02:56.075Z]   File "/Users/falven/Source/AJG/src/ingestion/ingest/__init__.py", line 116, in ingest
[2024-07-04T17:02:56.075Z]     await asyncio.gather(*ingestion_tasks)
[2024-07-04T17:02:56.075Z]   File "/Users/falven/Source/AJG/src/ingestion/.venv/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 255, in async_wrapper
[2024-07-04T17:02:56.075Z]     result = await func(*args, **kwargs)
[2024-07-04T17:02:56.075Z]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z]   File "/Users/falven/Source/AJG/src/ingestion/.venv/lib/python3.11/site-packages/llama_index/core/ingestion/pipeline.py", line 752, in arun
[2024-07-04T17:02:56.075Z]     result: List[List[BaseNode]] = await asyncio.gather(*tasks)
[2024-07-04T17:02:56.075Z]                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z]   File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/queues.py", line 244, in _feed
[2024-07-04T17:02:56.075Z]     obj = _ForkingPickler.dumps(obj)
[2024-07-04T17:02:56.075Z]           ^^^^^^^^^^^^^^^^^^^^^^^^^^
[2024-07-04T17:02:56.075Z]   File "/Users/falven/.pyenv/versions/3.11.9/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
[2024-07-04T17:02:56.075Z]     cls(buf, protocol).dump(obj)
[2024-07-04T17:02:56.075Z] AttributeError: Can't pickle local object 'get_arun_transformations_wrapper.<locals>.arun_transformations_wrapper'

无法pickle本地对象😕

8e2ybdfx

8e2ybdfx5#

lol just hit that too. Hmm

zynd9foi

zynd9foi6#

我刚刚也试了。嗯
但是即使是我提出的方案可能也不会奏效,因为缓存的种类繁多。我们实际上需要传递的不仅仅是你的 IngestionCache 的类型和kwargs,还需要传递你的 IngestionCacheKVStore 的类型和kwargs,一直到 Redis 客户端。这个方案可以工作,但太混乱了。

7kjnsjlb

7kjnsjlb7#

我同意。我相当确定有一种笨拙的方法可以做到这一点,绕过pickling...捣鼓一些东西

相关问题