llama_index [Bug]: 运行转换方法时出现意外的关键字参数

wydwbb8l  于 2个月前  发布在  其他
关注(0)|答案(3)|浏览(36)

Bug描述

run() 方法(llama_index.core.ingestion.pipeline.py 文件的第542行)中,参数 show_progress 被传递给了 run_transformation() 方法,但这个方法并不接受这个参数。
在 run() 方法中示例化 run_transformations() 方法:

nodes = run_transformations(
  nodes_to_run,
  self.transformations,
  show_progress=show_progress,
  cache=self.cache if not self.disable_cache else None,
  cache_collection=cache_collection,
  in_place=in_place,
  **kwargs,
)

run_transformations() 方法签名:

def run_transformations(
    nodes: List[BaseNode],
    transformations: Sequence[TransformComponent],
    in_place: bool = True,
    cache: Optional[IngestionCache] = None,
    cache_collection: Optional[str] = None,
    **kwargs: Any,
) -> List[BaseNode]:

版本

0.10.55

重现步骤

运行一个管道摄取
我的代码:

pipeline = IngestionPipeline(
    transformations=[
        sentence_split,
        text_cleaner,
        self.embed_model
    ],
    vector_store=self.vector_store,
    docstore=self.docstore
)
pipeline.run(documents=documents)

相关日志/回溯

Traceback (most recent call last):
  File "C:\data\git\.......\test.py", line 110, in <module>
    data_ingestion.main()
  File "C:\data\git\.......\test.py", line 105, in main
    self.create_index_local()
  File "C:\data\git\.......\test.py", line 83, in create_index_local
    nodes = self.run_pipeline(documents)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\data\git\.......\test.py", line 101, in run_pipeline
    nodes = pipeline.run(documents=documents)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 230, in wrapper      
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
            ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\ingestion\pipeline.py", line 122, in run_transformations  
    nodes = transform(nodes, **kwargs)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 221, in wrapper      
    bound_args = inspect.signature(func).bind(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\inspect.py", line 3242, in bind
    return self._bind(args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\inspect.py", line 3231, in _bind
    raise TypeError(
TypeError: got an unexpected keyword argument 'show_progress'
uxhixvfz

uxhixvfz1#

要解决 show_progress 参数传递给 run_transformations() 方法导致的 TypeError 问题,您需要从 IngestionPipeline 类的 run 方法中调用 run_transformations() 的调用中删除 show_progress 参数。
以下是修改后的相关部分:

def run(
self,
show_progress: bool = False,
documents: Optional[List[Document]] = None,
nodes: Optional[List[BaseNode]] = None,
cache_collection: Optional[str] = None,
in_place: bool = True,
store_doc_text: bool = True,
num_workers: Optional[int] = None,
**kwargs: Any,
) -> Sequence[BaseNode]:
"""
Args:
show_progress (bool, optional): Shows execution progress bar(s). Defaults to False.
documents (Optional[List[Document]], optional): Set of documents to be transformed. Defaults to None.
nodes (Optional[List[BaseNode]], optional): Set of nodes to be transformed. Defaults to None.
cache_collection (Optional[str], optional): Cache for transformations. Defaults to None.
in_place (bool, optional): Whether transformations creates a new list for transformed nodes or modifies the
array passed to run_transformations. Defaults to True.
num_workers (Optional[int], optional): The number of parallel processes to use.
If set to None, then sequential compute is used. Defaults to None.

Returns:
Sequence[BaseNode]: The set of transformed Nodes/Documents
"""
input_nodes = self._prepare_inputs(documents, nodes)
# check if we need to dedup
if self.docstore is not None and self.vector_store is not None:
if self.docstore_strategy in (
DocstoreStrategy.UPSERTS,
DocstoreStrategy.UPSERTS_AND_DELETE,
):
nodes_to_run = self._handle_upserts(
input_nodes, store_doc_text=store_doc_text
)
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
nodes_to_run = self._handle_duplicates(
input_nodes, store_doc_text=store_doc_text
)
else:
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
elif self.docstore is not None and self.vector_store is None:
if self.docstore_strategy == DocstoreStrategy.UPSERTS:
print(
"Docstore strategy set to upserts, but no vector store. "
"Switching to duplicates_only strategy."
)
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
elif self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
print(
"Docstore strategy set to upserts and delete, but no vector store. "
"Switching to duplicates_only strategy."
)
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
nodes_to_run = self._handle_duplicates(
input_nodes, store_doc_text=store_doc_text
)

    else:
        nodes_to_run = input_nodes

    if num_workers and num_workers > 1:
        if num_workers > multiprocessing.cpu_count():
            warnings.warn(
                "Specified num_workers exceed number of CPUs in the system. "
                "Setting `num_workers` down to the maximum CPU count."
            )
        with multiprocessing.get_context("spawn").Pool(num_workers) as p:
            node_batches = self._node_batcher(
                num_batches=num_workers, nodes=nodes_to_run
            )
            nodes_parallel = p.starmap(
                run_transformations,
                zip(
                    node_batches,
                    repeat(self.transformations),
                    repeat(in_place),
                    repeat(self.cache if not self.disable_cache else None),
                    repeat(cache_collection),
                ),
            )
            nodes = reduce(lambda x, y: x + y, nodes_parallel, [])
    else:
        nodes = run_transformations(
            nodes_to_run,
            self.transformations,
            cache=self.cache if not self.disable_cache else None,
            cache_collection=cache_collection,
            in_place=in_place,
            **kwargs,
        )

    if self.vector_store is not None:
        self.vector_store.add([n for n in nodes if n.embedding is not None])

    return nodes

通过从 `run_transformations` 的调用中删除 `show_progress` 参数,您确保只传递接受的参数,从而解决 `TypeError` [[1]](https://github.com/run-llama/llama_index/blob/main/llama-index-legacy/llama_index/legacy/ingestion/pipeline.py)[[2]](https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/ingestion/pipeline.py) 问题。

* 要继续与 [Dosu](https://dosu.dev) 对话,请提及 @dosu。*
9nvpjoqh

9nvpjoqh2#

show_process 结束于 **kwargs: Any
似乎其中一个转换没有接受 show_progress
你使用的是哪个句子分割器(sentence_split)和文本清理器(text_cleaner)?你能提供类名或一些代码吗?

iqjalb3h

iqjalb3h3#

是的,我的代码:

def run_pipeline(self, documents: List[Document]) -> Sequence[BaseNode]:
  sentence_split = SentenceSplitter(chunk_size=Configs.chunk_size, chunk_overlap=Configs.chunk_overlap)
  text_cleaner = TextCleaner()
  
  pipeline = IngestionPipeline(
      transformations=[
          sentence_split,
          text_cleaner,
          self.embed_model
      ]
  )
  nodes = pipeline.run(documents=documents)
  return nodes

class TextCleaner(TransformComponent):
    def __init__(self):
        self.nlp = spacy.load(Configs.spacy_language_fr)
        self.stop_words = set(stopwords.words('french') + stopwords.words('english'))
        self.punctuations = set(['.', ',', '/', '!', '?', ';', ':', '(', ')', '[', ']', '-', '_', '%'])

    def __call__(self, nodes):
        for node in nodes:
            node.text = self.clean_text(node.text)
        return nodes
    
    def clean_text(self, text: str) -> str:
        doc = self.nlp(text.lower())
        keywords = [token.lemma_ for token in doc if token.text not in self.stop_words and token.text not in self.punctuations and not token.is_digit and len(token.text) > 1]
        clean_data = ' '.join(keywords)
        return clean_data

我认为错误来自llama_index库,因为在run()方法中,当run_transformation()示例化时,它尝试传递show_progress

相关问题