langflow 使用临时示例管理并发的最佳方法是什么?

hof1towb  于 6个月前  发布在  其他
关注(0)|答案(5)|浏览(72)

Background

I'm running langflow using Google Cloud Run. This works well for my use case which involves very infrequent but bursty (100,000s of tokens per second) requests. It's connected to a postgres database hosted elsewhere.
I've created a solution which works, but it's hacky and inelegant and I'm hoping to get some suggestions on some better ideas.

The issue

My issue is that the docker image as it stands doesn't allow write access to the /app/data/.cache folder by default, which langflow requires to run. The gcs-fuse adaptor isn't POSIX compliant and doesn't provide concurrency control. This means that when more than one worker or more than one instances is running, I get lots of these sorts of errors:
'IO Error: Could not set lock on file "/app/data/.cache/"

My solution

I've created a separate Dockerfile and script which creates a ramdisk mounted at /app/data . When a container instance is started, the ramdisk is created and all writes go to RAM.
From what I can see, this solution has the following limitations:

  • Any logs and chat histories created in that instance are lost as soon as the container instance shuts down.
  • Creating the ramdisk for each instance slows down the start up times.

Any ideas on how I could approach this in a better way?

sxpgvts3

sxpgvts31#

你好,@h-arnold!我在这里帮助你解决任何bug、问题或贡献。让我们一起解决这个问题。
要管理Google Cloud Run上Langflow的临时示例并解决/app/data/.cache文件夹的问题,你可以利用Kubernetes和Helm图表来获得更强大的解决方案。以下是你可以遵循的步骤:

  • 使用Kubernetes进行部署:
  • 使用Kubernetes部署Langflow,这允许你管理多个示例并更有效地处理并发性。
  • 按照Kubernetes deployment guide中的步骤设置Kubernetes集群并使用Helm图表安装Langflow。
  • 持久化存储:
  • 不要为/app/data/.cache文件夹使用ramdisk,而是配置持久化存储。
  • 使用外部PostgreSQL数据库进行数据持久化。更新values.yaml文件以配置外部数据库。
postgresql:
  enabled: true
  fullnameOverride: "langflow-ide-postgresql-service"
  auth:
    username: "langflow"
    password: "langflow-postgres"
    database: "langflow-db"

langflow:
  backend:
    externalDatabase:
      enabled: true
      driver:
        value: "postgresql"
      host:
        value: "langflow-ide-postgresql-service"
      port:
        value: "5432"
      database:
        value: "langflow-db"
      user:
        value: "langflow"
      password:
        valueFrom:
          secretKeyRef:
            key: "password"
            name: "langflow-ide-postgresql-service"
    sqlite:
      enabled: false
  • 扩展:
  • 将Langflow后端和前端服务的副本数量扩展到处理增加的负载和提高并发性。
  • 更新values.yaml文件中的replicaCount值。
langflow:
  backend:
    replicaCount: 3
  frontend:
    replicaCount: 3
  • 资源管理:
  • 配置资源请求和限制,以确保每个示例都有足够的资源,并防止资源争用。
  • 更新values.yaml文件中的resources值。
langflow:
  backend:
    resources:
      requests:
        memory: "2Gi"
        cpu: "1000m"
  frontend:
    resources:
      requests:
        memory: "1Gi"
        cpu: "1000m"

通过遵循这些步骤,你可以提高在Google Cloud Run 1(https://github.com/langflow-ai/langflow/blob/main/docs/docs/deployment/kubernetes.md)上运行的Langflow临时示例的并发管理和数据持久性。

  • 要继续与Dosu交谈,请提及@dosu。
zf9nrax1

zf9nrax12#

监控服务似乎使用自己的duckdb示例,而不是postgresql,这对我来说似乎是硬编码的。这是.cache所需的唯一写入权限吗?

eit6fx6z

eit6fx6z3#

监控服务是需要对Langflow中的/app/data/.cache文件夹具有写入权限的主要组件。这从MonitorService类中可以看出,它处理数据库操作,如添加行、删除消息和更新消息,所有这些操作都涉及到写入位于缓存目录中的monitor.duckdb文件。
然而,其他组件也需要对缓存目录具有写入权限。例如,src/backend/langflow/services/cache/utils.py中的实用程序函数也执行写操作。这些函数包括save_binary_filesave_uploaded_fileclear_old_cache_files,它们负责在缓存目录中保存文件和管理缓存文件。
为了更有效地解决这个问题,您可以确保只有必要的组件具有对该目录进行写入的所需权限。这可以通过设置适当的文件系统权限或通过配置服务以具有必要的权限(1(https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/services/monitor/service.py)2(https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/services/monitor/service.py)3(https://github.com/logspace-ai/langflow/blob/dev/src/backend/langflow/services/cache/utils.py)4(https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/services/cache/utils.py)5(https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/services/cache/utils.py))来实现。

  • 要继续与Dosu交谈,请提及@dosu。*
ej83mcc0

ej83mcc04#

根据你所拥有的代码,你能找到一种方法来修改代码,将所有这些内容转移到数据库中吗?例如,更改MonitorService类可以使用的数据库,或者将二进制文件保存到文档数据库中?

bprjcwpo

bprjcwpo5#

要将 MonitorService 类中的所有写操作从 /app/data/.cache 文件夹移动到数据库,您可以将 DuckDB 操作替换为类似 MongoDB 的文档数据库操作。以下是如何修改 MonitorService 类以使用 MongoDB 而不是 DuckDB 的示例:

  1. 如果尚未安装,请安装 pymongo 包:
pip install pymongo
  1. 修改 MonitorService 类以使用 MongoDB:
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Union

from loguru import logger
from pymongo import MongoClient
from pymongo.collection import Collection

from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel

if TYPE_CHECKING:
    from langflow.services.settings.manager import SettingsService

class MonitorService(Service):
    name = "monitor_service"

    def __init__(self, settings_service: "SettingsService"):
        self.settings_service = settings_service
        self.client = MongoClient("mongodb://localhost:27017/")
        self.db = self.client["monitor_service"]
        self.table_map = {
            "transactions": self.db["transactions"],
            "messages": self.db["messages"],
            "vertex_builds": self.db["vertex_builds"],
        }

        try:
            self.ensure_tables_exist()
        except Exception as e:
            logger.exception(f"Error initializing monitor service: {e}")

    def ensure_tables_exist(self):
        # MongoDB creates collections automatically when you first insert a document
        pass

    def add_row(
        self,
        table_name: str,
        data: Union[dict, TransactionModel, MessageModel, VertexBuildModel],
    ):
        collection: Collection = self.table_map.get(table_name)
        if collection is None:
            raise ValueError(f"Unknown table name: {table_name}")

        if isinstance(data, dict):
            collection.insert_one(data)
        else:
            collection.insert_one(data.dict())

    def load_table_as_dataframe(self, table_name):
        collection: Collection = self.table_map.get(table_name)
        if collection is None:
            raise ValueError(f"Unknown table name: {table_name}")

        import pandas as pd
        return pd.DataFrame(list(collection.find()))

    @staticmethod
    def get_timestamp():
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def get_vertex_builds(
        self,
        flow_id: Optional[str] = None,
        vertex_id: Optional[str] = None,
        valid: Optional[bool] = None,
        order_by: Optional[str] = "timestamp",
    ):
        query = {}
        if flow_id:
            query["flow_id"] = flow_id
        if vertex_id:
            query["_id"] = vertex_id
        if valid is not None:
            query["valid"] = valid

        collection: Collection = self.table_map["vertex_builds"]
        cursor = collection.find(query).sort(order_by)
        return list(cursor)

    def delete_vertex_builds(self, flow_id: Optional[str] = None):
        query = {}
        if flow_id:
            query["flow_id"] = flow_id

        collection: Collection = self.table_map["vertex_builds"]
        collection.delete_many(query)

    def delete_messages(self, session_id: str):
        query = {"session_id": session_id}

        collection: Collection = self.table_map["messages"]
        collection.delete_many(query)

    def add_message(self, message: MessageModel):
        self.add_row("messages", message)

    def get_messages(
        self,
        sender: Optional[str] = None,
        sender_name: Optional[str] = None,
        session_id: Optional[str] = None,
        order_by: Optional[str] = "timestamp",
        order: Optional[str] = "DESC",
        limit: Optional[int] = None,
    ):
        query = {}
        if sender:
            query["sender"] = sender
        if sender_name:
            query["sender_name"] = sender_name
        if session_id:
            query["session_id"] = session_id

        collection: Collection = self.table_map["messages"]
        cursor = collection.find(query).sort(order_by, -1 if order == "DESC" else 1)
        if limit is not None:
            cursor = cursor.limit(limit)
        return list(cursor)

    def get_transactions(
        self,
        source: Optional[str] = None,
        target: Optional[str] = None,
        status: Optional[str] = None,
        order_by: Optional[str] = "timestamp",
    ):
        query = {}
        if source:
            query["source"] = source
        if target:
            query["target"] = target
        if status:
            query["status"] = status

        collection: Collection = self.table_map["transactions"]
        cursor = collection.find(query).sort(order_by)
        return list(cursor)

此代码将 DuckDB 操作替换为 MongoDB 操作,确保所有写操作都移动到名为 1(https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/services/monitor/service.py)2(https://github.com/langflow-ai/langflow/blob/dev/src/backend/base/langflow/services/monitor/service.py) 的 MongoDB 数据库。

  • 要继续与 Dosu 进行对话,请提及 @dosu。*

相关问题