我正在尝试为我们用Python编写的时态工作流实现一个拦截器类。但由于文档并不完全可用,我尝试了多种方法来实现它。但到目前为止还没有成功。
下面是时态活动、工作流、拦截器类、工作者和工作流执行器的一些示例代码。
- 活动.py*
from temporalio import activity
@activity.defn
async def activity1():
print("Hi from activity 1")
@activity.defn
async def activity2():
print("Hi from activity 2")
- workflow.py*
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from mactivities import activity1, activity2
@workflow.defn
class MyWorkflow1:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity1, start_to_close_timeout=timedelta(5))
return True
@workflow.defn
class MyWorkflow2:
@workflow.run
async def run(self):
output = await workflow.execute_activity(activity2, start_to_close_timeout=timedelta(5))
return True
- my_interceptor.py*
from temporalio.worker import Interceptor
class MyInterceptor(Interceptor): # NEED HELP IN THIS CLASS
def __init__(self, next):
super().__init__(next)
async def intercept_activity(self, input):
print("I am printing before activity execution start")
return await.self.next.intercept_activity(input)
async workflow_interceptor_class(self, input):
print("Doing something before a workflow execution start")
return None
- run_workflows.py *
import asyncio
from temporalio.client import Client
from workflows import MyWorkflow1, MyWorkflow2
async def main():
client = await Client.connect("localhost:7233")
await client.execute_workflow(
MyWorkflow1.run,
id="123",
task_queue="MY_QUEUE"
)
await client.execute_workflow(
MyWorkflow2.run,
id="123",
task_queue="MY_QUEUE"
)
return True
if __name__=="__main__":
asyncio.run(main())
- run_worker.py*
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from activities import activity1, activity2
from workflow import MyWorkflow1, MyWorkflow2
from my_interceptor import MyInterceptor # <--- NEED HELP IN IMPLEMENTING THIS CLASS
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="MY_QUEUE",
workflows=[MyWorkflow1, MyWorkflow2],
activities=[activity1, activity2],
interceptors=[MyInterceptor] # <--- NEED HELP IN IMPLEMENTING THIS CLASS
)
await worker.run()
if __name__=="__main__":
asyncio.run(main())
如果我使用我写的Interceptor类,我会得到异常。例如,workflow_interceptor_class需要一个参数“input”,但是当worker初始化时,它不可用。我的活动失败,并显示消息“正在完成失败的活动”等。
基本上实现拦截器类的方式是错误的。
我想要实现的目标是什么?
1.在开始执行工作流之前,打印该工作流的名称。
1.我将在一个变量中保存一些信息(执行时传递的参数)以备将来使用。
1条答案
按热度按时间j0pj023g1#
首先,你可能想看看以下关于如何在Temporal Python SDK中实现Activity和Workflow拦截器的示例:
根据您提供的代码片段,我认为您可能错过了一个非常重要的细节:您提供给
Worker
的Interceptor
实现本身实际上不会处理Workflow/Activity调用的拦截。相反,您提供的
Interceptor
对象将用于构建具体拦截器链。Interceptor
类的职责如下:intercept_activity()
:示例化、链接并返回一个扩展ActivityInboundInterceptor
的对象;如果不想拦截活动,只需返回next
;workflow_interceptor_class()
:返回一个扩展WorkflowInboundInterceptor
的类;返回None
如果你不想拦截工作流。因此,拦截器的完整实现通常包含两个或三个类,例如可以命名为:
MyInterceptor
,扩展Interceptor
MyActivityInboundInterceptor
,它扩展了ActivityInboundInterceptor
MyWorkflowInboundInterceptor
,扩展WorkflowInboundInterceptor
MyInterceptor
的简单实现如下所示:话虽如此,我认为其余部分应该更容易到位,但只要添加一个评论,如果你需要更多的细节的东西。
顺便说一句,你可能认为所有这些体操都是不直观的,复杂的和无用的,但让我们不要停在这里,只是接受这是有强烈理由的。