如何在Temporal Python-SDK中编写Temporal worker Interceptor类

tvokkenx  于 2023-06-25  发布在  Python
关注(0)|答案(1)|浏览(143)

我正在尝试为我们用Python编写的时态工作流实现一个拦截器类。但由于文档并不完全可用,我尝试了多种方法来实现它。但到目前为止还没有成功。
下面是时态活动、工作流、拦截器类、工作者和工作流执行器的一些示例代码。

  • 活动.py*
from temporalio import activity

@activity.defn
async def activity1():
    print("Hi from activity 1")

@activity.defn
async def activity2():
    print("Hi from activity 2")
  • workflow.py*
from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from mactivities import activity1, activity2

@workflow.defn
class MyWorkflow1:

    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
        return True


@workflow.defn
class MyWorkflow2:

    @workflow.run
    async def run(self):
        output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
        return True
  • my_interceptor.py*
from temporalio.worker import Interceptor

class MyInterceptor(Interceptor):  # NEED HELP IN THIS CLASS
    def __init__(self, next):
        super().__init__(next)

    async def intercept_activity(self, input):
        print("I am printing before activity execution start")
        return await.self.next.intercept_activity(input)

    async workflow_interceptor_class(self, input):
        print("Doing something before a workflow execution start")
        return None
  • run_workflows.py *
import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2

async def main():
    client = await Client.connect("localhost:7233")

    await client.execute_workflow(
                  MyWorkflow1.run,
                  id="123",
                  task_queue="MY_QUEUE"
                   )

    await client.execute_workflow(
                  MyWorkflow2.run,
                  id="123",
                  task_queue="MY_QUEUE"
                   )
    return True

if __name__=="__main__":
    asyncio.run(main())
  • run_worker.py*
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker

from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor  # <--- NEED HELP IN IMPLEMENTING THIS CLASS

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
             client,
             task_queue="MY_QUEUE",
             workflows=[MyWorkflow1, MyWorkflow2],
             activities=[activity1, activity2],
             interceptors=[MyInterceptor]  # <--- NEED HELP IN IMPLEMENTING THIS CLASS
              )
     await worker.run()

if __name__=="__main__":
    asyncio.run(main())

如果我使用我写的Interceptor类,我会得到异常。例如,workflow_interceptor_class需要一个参数“input”,但是当worker初始化时,它不可用。我的活动失败,并显示消息“正在完成失败的活动”等。
基本上实现拦截器类的方式是错误的。

我想要实现的目标是什么?

1.在开始执行工作流之前,打印该工作流的名称。
1.我将在一个变量中保存一些信息(执行时传递的参数)以备将来使用。

j0pj023g

j0pj023g1#

首先,你可能想看看以下关于如何在Temporal Python SDK中实现Activity和Workflow拦截器的示例:

  • 来自Temporal的Python示例库的示例;
  • 来自Temporal Python SDK测试套件的拦截器测试文件;
  • Temporal Python SDK存储库中Contribution目录中的OpenTelemetry Interceptor(请注意,最后一个比其他两个复杂得多)。

根据您提供的代码片段,我认为您可能错过了一个非常重要的细节:您提供给WorkerInterceptor实现本身实际上不会处理Workflow/Activity调用的拦截。
相反,您提供的Interceptor对象将用于构建具体拦截器链。Interceptor类的职责如下:

  • intercept_activity():示例化、链接并返回一个扩展ActivityInboundInterceptor的对象;如果不想拦截活动,只需返回next;
  • workflow_interceptor_class():返回一个扩展WorkflowInboundInterceptor的类;返回None如果你不想拦截工作流。

因此,拦截器的完整实现通常包含两个或三个类,例如可以命名为:

  • MyInterceptor,扩展Interceptor
  • MyActivityInboundInterceptor,它扩展了ActivityInboundInterceptor
  • MyWorkflowInboundInterceptor,扩展WorkflowInboundInterceptor

MyInterceptor的简单实现如下所示:

class MyInterceptor(Interceptor):
    def intercept_activity(
        self, next: ActivityInboundInterceptor
    ) -> ActivityInboundInterceptor:
        return MyActivityInboundInterceptor(next)

    def workflow_interceptor_class(
        self, input: WorkflowInterceptorClassInput
    ) -> Optional[Type[WorkflowInboundInterceptor]]:
        return MyWorkflowInboundInterceptor

话虽如此,我认为其余部分应该更容易到位,但只要添加一个评论,如果你需要更多的细节的东西。
顺便说一句,你可能认为所有这些体操都是不直观的,复杂的和无用的,但让我们不要停在这里,只是接受这是有强烈理由的。

相关问题