llama_index [问题]:如何在send_event函数中使用多线程?

fkvaft9z  于 23天前  发布在  其他
关注(0)|答案(3)|浏览(23)

问题验证

  • 我已经在文档和discord上寻找答案。

问题

这是我的代码

from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

class Event0(Event):
    param0: str

class Event1(Event):
    param1: str

class Event2(Event):
    param2: str

class Event3(Event):
    param3: str

class MyWorkflow(Workflow):
    @step
    async def run_event0(ctx: Context, ev: StartEvent) -> Event1 | Event2 | None:
        ctx.session.send_event(Event0(param0="event0"))
        ctx.session.send_event(Event1(param1="event1"))

        return None

    @step
    async def run_event1(ctx: Context, ev: Event1) -> Event3:
        # do something
        return Event3(param3="event3")

    @step
    async def run_event2(ctx: Context, ev: Event2) -> Event3:
        # do something
        return Event3(param3="event3")

    @step
    async def run_event3(ctx: Context, ev: Event3) -> StopEvent | None:
        # do something
        events = ctx.collect_events(ev, [Event3] * 2)
        if events is None:
            return None

        # do something with events
        return StopEvent()

我想要同时运行run_event1和run_event2

63lcw9qa

63lcw9qa1#

要同时运行 run_event1run_event2,可以使用带有 num_workers 参数的 @step 装饰器。这允许多个步骤示例并行运行。以下是如何修改代码的方法:

在这段修改后的代码中,run_event1run_event2 步骤用 @step(num_workers=2) 装饰,这使得它们可以并行运行。run_event3 中的 ctx.collect_events 方法确保在继续执行 1(https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/workflow/parallel_execution.ipynb)2(https://github.com/run-llama/llama_index/blob/main/docs/docs/module_guides/workflow/index.md) 之前等待 Event3 事件。

  • 要继续与 Dosu 对话,请提及 @dosu。*
ryhaxcpt

ryhaxcpt2#

你不能,但你可以并发运行,假设你的函数是正确异步的并且使用await,使用dosu正在谈论的num_workers参数。

qmb5sa22

qmb5sa223#

你不能,但你可以并发运行,假设你的函数是正确异步的并且使用await,使用dosu正在谈论的num_workers参数
它不起作用,我使用了以下代码:

import asyncio
import time

from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

class Event1(Event):
    param1: str

class Event2(Event):
    param2: str

class Event3(Event):
    param3: str

class MyWorkflow(Workflow):
    @step
    async def run_event0(self, ctx: Context, ev: StartEvent) -> Event1 | Event2 | None:
        ctx.session.send_event(Event1(param1="event1"))
        ctx.session.send_event(Event2(param2="event2"))

        return None

    @step(num_workers=2)
    async def run_event1(self, ctx: Context, ev: Event1) -> Event3:
        # do something
        time.sleep(5)
        return Event3(param3="event3")

    @step(num_workers=2)
    async def run_event2(self, ctx: Context, ev: Event2) -> Event3:
        # do something
        time.sleep(5)
        return Event3(param3="event3")

    @step
    async def run_event3(self, ctx: Context, ev: Event3) -> StopEvent | None:
        # do something
        events = ctx.collect_events(ev, [Event3] * 2)
        if events is None:
            return None

        # do something with events
        return StopEvent()

async def main():
    wl = MyWorkflow()
    t0 = time.time()
    await wl.run()
    print(f"Elapsed time: {time.time() - t0} seconds")

if __name__ == "__main__":
    asyncio.run(main())

如果它起作用,它应该输出大约5秒钟,但输出了10秒钟。

相关问题