llama_index [Bug]: LlamaIndex: 平行化摄取管道: httpx.ReadTimeout: 运行协程时出错

ffvjumwh  于 10个月前  发布在  其他
关注(0)|答案(2)|浏览(102)

Bug Description

我编写了一个没有前端界面的聊天机器人(提问的方式是将问题写入prompt.txt文件,程序逐行返回答案),在使用Ollama本地加载llama2-7b后,我使用LlamaIndex(RAG)调整了预训练模型。在转换阶段,我对Ingest Pipeline执行进行了并行化。使用串行执行不会引起问题,但在使用并行执行时,会出现一系列关于超时的异常。以下是我的代码:

  1. import torch
  2. import chromadb
  3. import time
  4. import os
  5. import cProfile
  6. import pstats
  7. from pstats import SortKey
  8. from llama_index.llms.ollama import Ollama
  9. from llama_index.llms.openai import OpenAI
  10. from llama_index.core import Settings
  11. from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
  12. from llama_index.core.node_parser import SentenceSplitter
  13. from llama_index.core.node_parser import SimpleNodeParser
  14. from llama_index.core import ServiceContext
  15. from llama_index.vector_stores.chroma import ChromaVectorStore
  16. from llama_index.core import StorageContext
  17. from llama_index.embeddings.ollama import OllamaEmbedding
  18. from llama_index.core.extractors import TitleExtractor
  19. from llama_index.core.postprocessor import LongContextReorder
  20. from llama_index.core.ingestion import IngestionPipeline
  21. from llama_index.core.evaluation import (FaithfulnessEvaluator,
  22. QueryResponseEvaluator,
  23. DatasetGenerator,
  24. RelevancyEvaluator)
  25. import asyncio
  26. import nest_asyncio
  27. nest_asyncio.apply()
  28. # load for the uploaded files
  29. def data_loading():
  30. reader = SimpleDirectoryReader(input_dir="data/", recursive=True)
  31. documents = reader.load_data(num_workers=4)
  32. print("finish reading files
  33. ")
  34. return documents
  35. # RAG
  36. def building_reg_pipeline():
  37. # LLM, Llama2
  38. Settings.llm = Ollama(model="llama2",
  39. request_timeout=300.0,
  40. device_map="cpu")
  41. # query from the LLM via query engine
  42. def querying(prompt, query_engine):
  43. start_time = time.time()
  44. response = query_engine.query(prompt)
  45. query_time = time.time() - start_time
  46. print("Total query time : ", query_time, "
  47. ")
  48. return response
  49. # load for prompts(our questions)
  50. def prompt_loading():
  51. with open("prompt/prompt.txt", "r") as prompt_file:
  52. lines = prompt_file.readlines()
  53. for line in lines:
  54. yield line.rstrip()
  55. # load for auto generation prompts(questions), not finished yet
  56. def auto_generator_prompt(documents):
  57. data_generator = DatasetGenerator.from_documents(documents)
  58. eval_questions = data_generator.generate_questions_from_nodes()
  59. yield eval_questions
  60. def main():
  61. # load for query_engine
  62. query_engine = building_reg_pipeline()
  63. # performance evaluation of data loading
  64. p = pstats.Stats("newstas")
  65. p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
  66. if __name__ == '__main__':
  67. main()
2ekbmq32

2ekbmq321#

Ollama超时了。您可以通过增加llm = Ollama(..., request_timeout=3600.0)来延长超时时间。

nhaq1z21

nhaq1z212#

要解决并行摄取管道中的 httpx.ReadTimeout 错误,您可以调整 num_workers 参数以控制并发请求的数量。将其设置得太高可能导致超时或速率限制。以下是相关的代码片段:

  1. def build_pipeline():
  2. llm = Ollama(model="llama2-7b", request_timeout=300.0)
  3. transformations = [
  4. SentenceSplitter(chunk_size=512, chunk_overlap=20),
  5. TitleExtractor(),
  6. OllamaEmbedding(model_name="llama2-7b")
  7. ]
  8. return IngestionPipeline(transformations=transformations)

此外,在运行之间添加延迟可以帮助防止速率限制和超时:

  1. import time
  2. times = []
  3. for _ in range(3):
  4. time.sleep(30) # help prevent rate-limits/timeouts, keeps each run fair
  5. pipeline = build_pipeline()
  6. start = time.time()
  7. nodes = pipeline.run(documents=documents, num_workers=8) # Adjust num_workers as needed
  8. end = time.time()
  9. times.append(end - start)
  10. print(f"Average time: {sum(times) / len(times)}")

通过仔细调整 num_workers 并添加延迟,您可以减轻 httpx.ReadTimeout 错误。另外,确保工作线程的数量不超过系统中可用的 CPU 数量,如 IngestionPipeline 类实现所示:

  1. if num_workers and num_workers > 1:
  2. if num_workers > multiprocessing.cpu_count():
  3. warnings.warn(
  4. "Specified num_workers exceed number of CPUs in the system. "
  5. "Setting `num_workers` down to the maximum CPU count."
  6. )
  7. with multiprocessing.get_context("spawn").Pool(num_workers) as p:
  8. node_batches = self._node_batcher(
  9. num_batches=num_workers, nodes=nodes_to_run
  10. )
  11. nodes_parallel = p.starmap(
  12. run_transformations,
  13. zip(
  14. node_batches,
  15. repeat(self.transformations),
  16. repeat(in_place),
  17. repeat(self.cache if not self.disable_cache else None),
  18. repeat(cache_collection),
  19. ),
  20. )
  21. nodes = reduce(lambda x, y: x + y, nodes_parallel, [])
  22. else:
  23. nodes = run_transformations(
  24. nodes_to_run,
  25. self.transformations,
  26. show_progress=show_progress,
  27. cache=self.cache if not self.disable_cache else None,
  28. cache_collection=cache_collection,
  29. in_place=in_place,
  30. **kwargs,
  31. )

遵循这些准则,您可以有效地管理并行摄取管道 httpx.ReadTimeout 中的 1(https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/ingestion/async_ingestion_pipeline.ipynb)2(https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/ingestion/pipeline.py) 错误。

  • 要继续与 Dosu 交谈,请提及 @dosu。*
展开查看全部

相关问题