llama_index [问题]:'Failed to send 1 objects in a batch of 1. Please inspect client.batch.failed_objects or collection.batch.failed_objects for the failed objects.'

qyuhtwio  于 2个月前  发布在  其他
关注(0)|答案(1)|浏览(23)

问题验证

  • 我已在文档和discord中寻找答案。

问题

import time
import weaviate
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.weaviate import WeaviateVectorStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import StorageContext, Settings
from llama_index.readers.file import PyMuPDFReader
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
import nest_asyncio
nest_asyncio.apply()  # Only needed in Jupyter notebooks
weaviate_client = weaviate.connect_to_local()
weaviate_client.connect()
Settings.llm = OpenAI(temperature=0, model="gpt-4o")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small", dimensions=512)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=100)
documents = SimpleDirectoryReader("./data1").load_data()
nodes = splitter.get_nodes_from_documents(documents)
print(nodes)
if weaviate_client.collections.exists("TextNode"):
    weaviate_client.collections.delete("TextNode")
schema = {
           "class": "TextNode",
           "properties": [
               {"name": "id_", "dataType": ["string"], },
               {"name": "embedding", "dataType": ["number[]"], },
               {"name": "file_path", "dataType": ["string"], },
               {"name": "file_name", "dataType": ["string"], },
               {"name": "file_type", "dataType": ["string"], },
               {"name": "file_size", "dataType": ["int"], },
               {"name": "creation_date", "dataType": ["string"], },
               {"name": "last_modified_date", "dataType": ["string"], },
               # {"name": "source", "dataType": ["string"], },
               {"name": "text", "dataType": ["text"], },
               {"name": "start_char_idx", "dataType": ["int"], },
               {"name": "end_char_idx", "dataType": ["int"], }
               # {"name": "metadata_str", "dataType": ["string"], },
               # {"name": "content", "dataType": ["text"], },
           ]
       }
weaviate_client.collections.create_from_dict(schema)
try:
    collection = weaviate_client.collections.get("TextNode")
    data_lines = []
    for node in nodes:
        embedding = Settings.embed_model.get_text_embedding(node.text)  # 生成嵌入
        node.embedding = embedding 
        properties = {
            "id": node.id_,
            "embedding": node.embedding,
            "file_path": node.metadata.get("file_path"),
            "file_name": node.metadata.get("file_name"),
            "file_type": node.metadata.get("file_type"),
            "file_size": node.metadata.get("file_size"),
            "creation_date": node.metadata.get("creation_date"),
            "last_modified_date": node.metadata.get("last_modified_date"),
            # "source": node.metadata.get("source"),
            "text": node.text,
            "start_char_idx": node.start_char_idx,
            "end_char_idx": node.end_char_idx,
            # "metadata_str": node.metadata_template,
            # "content": node.text,
        }
        data_lines.append(properties)
    print(data_lines)
    with collection.batch.dynamic() as batch:
        for data_line in data_lines:
            batch.add_object(properties=data_line)
    print("node insert completation!!!!!!!!!!!")
    vector_store = WeaviateVectorStore(weaviate_client=weaviate_client, index_name="TextNode")
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex.from_vector_store(vector_store)
    print(index.index_struct)
    print(index.storage_context)

    query_engine = index.as_query_engine()

    while True:
        question = input("User: ")
        if question.strip() == "":
            break
        start_time = time.time()
        response = query_engine.query(question)
        end_time = time.time()
        print(f"Time taken: {end_time - start_time} seconds")
        print(f"AI: {response}")
finally:
    weaviate_client.close()

错误信息是:
{'message': '批量发送1个对象失败,请检查client.batch.failed_objects或collection.batch.failed_objects以获取失败的对象。'}
我应该如何解决它?谢谢

翻译结果:

问题验证

  • 我已经在文档和discord中寻找答案。

问题

import time
import weaviate
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.weaviate import WeaviateVectorStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import StorageContext, Settings
from llama_index.readers.file import PyMuPDFReader
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
import nest_asyncio
nest_asyncio.apply()  # Only needed in Jupyter notebooks
weaviate_client = weaviate.connect_to_local()
weaviate_client.connect()
Settings.llm = OpenAI(temperature=0, model="gpt-4o")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small", dimensions=512)
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=100)
documents = SimpleDirectoryReader("./data1").load_data()
nodes = splitter.get_nodes_from_documents(documents)
print(nodes)
if weaviate_client.collections.exists("TextNode"):
    weaviate_client.collections.delete("TextNode")
schema = {
           "class": "TextNode",
           "properties": [
               {"name": "id_", "dataType": ["string"], },
               {"name": "embedding", "dataType": ["number[]"], },
               {"name": "file_path", "dataType": ["string"], },
               {"name": "file_name", "dataType": ["string"], },
               {"name": "file_type", "dataType": ["string"], },
               {"name": "file_size", "dataType": ["int"], },
               {"name": "creation_date", "dataType": ["string"], },
               {"name": "last_modified_date", "dataType": ["string"], },
               # {"name": "source", "dataType": ["string"], },
               {"name": "text", "dataType": ["text"], },
               {"name": "start_char_idx", "dataType": ["int"], },
               {"name": "end_char_idx", "dataType": ["int"], }
               # {"name": "metadata_str", "dataType": ["string"], },
               # {"name": "content", "dataType": ["text"], },
           ]
       }
weaviate_client.collections.create_from_dict(schema)
try:
    collection = weaviate_client.collections.get("TextNode")
    data_lines = []
    for node in nodes:
        embedding = Settings.embed_model.get_text_embedding(node.text)  # 生成嵌入
        node.embedding = embedding 
        properties = {
            "id": node.id_,
            "embedding": node.embedding,
            "file_path": node.metadata.get("file_path"),
            "file_name": node.metadata.get("file_name"),
            "file_type": node.metadata.get("file_type"),
            "file_size": node.metadata.get("file_size"),
            "creation_date": node.metadata.get("creation_date"),
            "last_modified_date": node.metadata.get("last_modified_date"),
            # "source": node.metadata.get("source"),
            "text": node.text,
            "start_char_idx": node.start_char_idx,
            "end_char_idx": node.end_char_idx,
            # "metadata_str": node.metadata_template,
            # "content": node.text,
        }
        data_lines.append(properties)
    print(data_lines)
    with collection.batch.dynamic() as batch:
        for data_line in data_lines:
            batch.add_object(properties=data_line)
    print("node insert completation!!!!!!!!!!!")
    vector_store = WeaviateVectorStore(weaviate_client=weaviate_client, index_name="TextNode")
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex.from_vector_store(vector_store)
    print(index.index_struct)
    print(index.storage_context)

    query_engine = index.as_query_engine()

    while True:
        question = input("User: ")
        if question.strip() == "":
            break
        start_time = time.time()
        response = query_engine.query(question)
        end_time = time.time()
        print(f"Time taken: {end_time - start_time} seconds")
        print(f"AI: {response}")
finally:
    weaviate_client.close()

错误信息是:
{'message': '批量发送1个对象失败,请检查client.batch.failed_objects或collection.batch.failed_objects以获取失败的对象。'}
我应该如何解决它?谢谢

lxkprmvk

lxkprmvk1#

你好,这里有人愿意帮助你解决在等待人工维护者时遇到的错误、问题或贡献。

要解决在使用批处理过程将对象插入Weaviate集合时遇到的错误,请确保在 add_node 函数中正确使用批处理对象。以下是相关代码片段,展示了如何使用批处理过程添加节点:

def add_node(
    client: "WeaviateClient",
    node: BaseNode,
    class_name: str,
    batch: Optional[Any] = None,
    text_key: str = DEFAULT_TEXT_KEY,
) -> None:
    """Add node."""
    metadata = {}
    metadata[text_key] = node.get_content(metadata_mode=MetadataMode.NONE) or ""

    additional_metadata = node_to_metadata_dict(
        node, remove_text=True, flat_metadata=False
    )
    metadata.update(additional_metadata)

    vector = node.get_embedding()
    id = node.node_id

    # if batch object is provided (via a context manager), use that instead
    if batch is not None:
        batch.add_data_object(metadata, class_name, id, vector)
    else:
        client.batch.add_data_object(metadata, class_name, id, vector)

确保正确示例化并将 batch 对象传递给 add_node 函数。如果未提供批处理对象,则函数将回退到直接将对象插入集合中。

此外,你可以参考以下测试用例来了解如何在Weaviate中模拟和测试批处理过程:

import sys
from unittest.mock import MagicMock

from llama_index.legacy.schema import NodeRelationship, RelatedNodeInfo, TextNode
from llama_index.legacy.vector_stores.weaviate import WeaviateVectorStore

def test_weaviate_add() -> None:
    # mock import
    sys.modules["weaviate"] = MagicMock()
    weaviate_client = MagicMock()
    batch_context_manager = MagicMock()
    weaviate_client.batch.__enter__.return_value = batch_context_manager

    vector_store = WeaviateVectorStore(weaviate_client=weaviate_client)

    vector_store.add(
        [
            TextNode(
                text="test node text",
                id_="test node id",
                relationships={
                    NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test doc id")
                },
                embedding=[0.5, 0.5],
            )
        ]
    )

    args, _ = batch_context_manager.add_data_object.call_args
    assert args[-1] == [0.5, 0.5]

此测试用例演示了如何模拟Weaviate客户端和批处理上下文管理器,以确保正确处理批处理过程。

此外, WeaviateVectorStore 类中的 add 方法展示了如何在上下文管理器中使用批处理过程:

def add(
    self,
    nodes: List[BaseNode],
    **add_kwargs: Any,
) -> List[str]:
    """Add nodes to index.

Args:
nodes: List[BaseNode]: list of nodes with embeddings

"""
    ids = [r.node_id for r in nodes]

    with self._client.batch.dynamic() as batch:
        for node in nodes:
            add_node(
                self._client,
                node,
                self.index_name,
                batch=batch,
                text_key=self.text_key,
            )
    return ids

此方法确保批处理过程得到正确管理,并高效地将节点添加到Weaviate集合中。

  • 要继续与 Dosu 交谈,请提及 @dosu。*

相关问题