llama_index [Bug]: LlamaIndex: 平行化摄取管道: httpx.ReadTimeout: 运行协程时出错

ffvjumwh  于 2个月前  发布在  其他
关注(0)|答案(2)|浏览(36)

Bug Description

我编写了一个没有前端界面的聊天机器人(提问的方式是将问题写入prompt.txt文件,程序逐行返回答案),在使用Ollama本地加载llama2-7b后,我使用LlamaIndex(RAG)调整了预训练模型。在转换阶段,我对Ingest Pipeline执行进行了并行化。使用串行执行不会引起问题,但在使用并行执行时,会出现一系列关于超时的异常。以下是我的代码:

import torch
import chromadb
import time
import os
import cProfile
import pstats
from pstats import SortKey
from llama_index.llms.ollama import Ollama
from llama_index.llms.openai import OpenAI
from llama_index.core import Settings
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core import ServiceContext
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.extractors import TitleExtractor
from llama_index.core.postprocessor import LongContextReorder
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.evaluation import (FaithfulnessEvaluator,
 QueryResponseEvaluator,
 DatasetGenerator,
 RelevancyEvaluator)
import asyncio
import nest_asyncio
nest_asyncio.apply()

# load for the uploaded files

def data_loading():
 reader = SimpleDirectoryReader(input_dir="data/", recursive=True)
 documents = reader.load_data(num_workers=4)
 print("finish reading files
")
 return documents

# RAG

def building_reg_pipeline():
 # LLM, Llama2
 Settings.llm = Ollama(model="llama2",
 request_timeout=300.0,
 device_map="cpu")

# query from the LLM via query engine

def querying(prompt, query_engine):
 start_time = time.time()
 response = query_engine.query(prompt)
 query_time = time.time() - start_time
 print("Total query time : ", query_time, "
")
 return response

# load for prompts(our questions)

def prompt_loading():
 with open("prompt/prompt.txt", "r") as prompt_file:
 lines = prompt_file.readlines()
 for line in lines:
 yield line.rstrip()

# load for auto generation prompts(questions), not finished yet

def auto_generator_prompt(documents):
 data_generator = DatasetGenerator.from_documents(documents)
 eval_questions = data_generator.generate_questions_from_nodes()
 yield eval_questions
def main():
 # load for query_engine
 query_engine = building_reg_pipeline()
 # performance evaluation of data loading
 p = pstats.Stats("newstas")
 p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
if __name__ == '__main__':
 main()
2ekbmq32

2ekbmq321#

Ollama超时了。您可以通过增加llm = Ollama(..., request_timeout=3600.0)来延长超时时间。

nhaq1z21

nhaq1z212#

要解决并行摄取管道中的 httpx.ReadTimeout 错误,您可以调整 num_workers 参数以控制并发请求的数量。将其设置得太高可能导致超时或速率限制。以下是相关的代码片段:

def build_pipeline():
    llm = Ollama(model="llama2-7b", request_timeout=300.0)

    transformations = [
        SentenceSplitter(chunk_size=512, chunk_overlap=20),
        TitleExtractor(),
        OllamaEmbedding(model_name="llama2-7b")
    ]

    return IngestionPipeline(transformations=transformations)

此外,在运行之间添加延迟可以帮助防止速率限制和超时:

import time

times = []
for _ in range(3):
    time.sleep(30)  # help prevent rate-limits/timeouts, keeps each run fair
    pipeline = build_pipeline()
    start = time.time()
    nodes = pipeline.run(documents=documents, num_workers=8)  # Adjust num_workers as needed
    end = time.time()
    times.append(end - start)

print(f"Average time: {sum(times) / len(times)}")

通过仔细调整 num_workers 并添加延迟,您可以减轻 httpx.ReadTimeout 错误。另外,确保工作线程的数量不超过系统中可用的 CPU 数量,如 IngestionPipeline 类实现所示:

if num_workers and num_workers > 1:
    if num_workers > multiprocessing.cpu_count():
        warnings.warn(
            "Specified num_workers exceed number of CPUs in the system. "
            "Setting `num_workers` down to the maximum CPU count."
        )
    with multiprocessing.get_context("spawn").Pool(num_workers) as p:
        node_batches = self._node_batcher(
            num_batches=num_workers, nodes=nodes_to_run
        )
        nodes_parallel = p.starmap(
            run_transformations,
            zip(
                node_batches,
                repeat(self.transformations),
                repeat(in_place),
                repeat(self.cache if not self.disable_cache else None),
                repeat(cache_collection),
            ),
        )
        nodes = reduce(lambda x, y: x + y, nodes_parallel, [])
else:
    nodes = run_transformations(
        nodes_to_run,
        self.transformations,
        show_progress=show_progress,
        cache=self.cache if not self.disable_cache else None,
        cache_collection=cache_collection,
        in_place=in_place,
        **kwargs,
    )

遵循这些准则,您可以有效地管理并行摄取管道 httpx.ReadTimeout 中的 1(https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/ingestion/async_ingestion_pipeline.ipynb)2(https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/ingestion/pipeline.py) 错误。

  • 要继续与 Dosu 交谈,请提及 @dosu。*

相关问题