.Net Core工作流WorkFlowCore

x33g5p2x  于9个月前 转载在 .NET  
字(10.7k)|赞(0)|评价(0)|浏览(403)

前言

WorkFlowCore 是一个针对 .NetCore 的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。

本篇开发环境为 .Net7 ,此处不演示 Jsonyaml 配置,详细文档请查看 官方文档项目源码地址

 一、安装与基础使用

通过以下命令安装

  1. Install-Package WorkflowCore

然后注入 WorkFlowCore

  1. builder.Services.AddWorkflow();

WorkFlowCore 主要分为两部分:步骤工作流

** 步骤**

** **多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步

  1. public classFirstStepBody: StepBody
  2. {
  3. public overrideExecutionResult Run(IStepExecutionContext context)
  4. {
  5. Console.WriteLine("Hello world!First");
  6. returnExecutionResult.Next();
  7. }
  8. }

工作流

通过继承 IWorkflow 接口定义一个工作流,接口只有 IdVersionBuild 方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流

  1. public classMyWorkflow :IWorkflow
  2. {
  3. public string Id => "HelloWorld";
  4. public int Version => 1;
  5. public void Build(IWorkflowBuilder<object>builder)
  6. {
  7. builder
  8. .StartWith<FirstStepBody>()
  9. .Then<FirstStepBody>();
  10. }
  11. }

工作流如果想使用必须在工作流主机中通过 RegisterWorkflow() 方法注册,并且通过 Start() 方法启动主机,当然也可以通过 Stop() 方法停止工作流。执行工作流需要使用 StartWorkflow() 方法,参数为工作流类的 Id ,如下

  1. [ApiController]
  2. [Route("[controller]")]
  3. public classWeatherForecastController : ControllerBase
  4. {
  5. private readonlyIWorkflowHost _workflowHost;
  6. publicWeatherForecastController(IWorkflowHost workflowHost)
  7. {
  8. _workflowHost =workflowHost;
  9. }
  10. [HttpGet(Name = "get")]
  11. publicContentResult Get()
  12. {
  13. if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))
  14. {
  15. _workflowHost.RegisterWorkflow<MyWorkflow>();
  16. }
  17. _workflowHost.Start();
  18. _workflowHost.StartWorkflow("HelloWorld");
  19. //host.Stop();
  20. return Content("ok");
  21. }
  22. }

当然也可以在构建 web 服务的时候统一注册,然后就可以直接执行啦

  1. var host = app.Services.GetService<IWorkflowHost>();
  2. host.RegisterWorkflow<MyWorkflow>();
  3. host.Start();

二、在步骤之间传递参数

每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。

  1. //步骤包含属性,并且计算
  2. public classFirstStepBody: StepBody
  3. {
  4. public int Input1 { get; set; }
  5. public int Input2 { get; set; }
  6. public int Output { get; set; }
  7. public overrideExecutionResult Run(IStepExecutionContext context)
  8. {
  9. Output = Input1 +Input2;
  10. Console.WriteLine(Output);
  11. returnExecutionResult.Next();
  12. }
  13. }
  14. //工作流包含输入输出的赋值
  15. public class MyWorkflow :IWorkflow<MyDataClass>{
  16. public string Id => "HelloWorld";
  17. public int Version => 1;
  18. public void Build(IWorkflowBuilder<MyDataClass>builder)
  19. {
  20. builder
  21. .StartWith<FirstStepBody>()
  22. .Input(step => step.Input1,data =>data.Value1)
  23. .Input(step => step.Input2, data => 100)
  24. .Output(data => data.Answer, step =>step.Output)
  25. .Then<FirstStepBody>()
  26. .Input(step => step.Input1, data =>data.Value1)
  27. .Input(step => step.Input2, data =>data.Answer)
  28. .Output(data => data.Answer, step =>step.Output);
  29. }
  30. }
  31. //工作流的属性类
  32. public classMyDataClass
  33. {
  34. public int Value1 { get; set; }
  35. public int Value2 { get; set; }
  36. public int Answer { get; set; }
  37. }
  38. //执行工作流传入参数
  39. MyDataClass myDataClass = newMyDataClass();
  40. myDataClass.Value1 = 100;
  41. myDataClass.Value2 = 200;
  42. //不传入myDataClass则每次执行都是新的数据对象
  43. _workflowHost.StartWorkflow("HelloWorld", myDataClass);

从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用 Input 方法传入, Output 方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在 StartWorkflow 方法中不传 myDataClass ,运行结果是 100100 ,否则是 200300

三、外部事件

工作流可以使用 WaitFor 方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

  1. public class MyWorkflow :IWorkflow<MyDataClass>{
  2. //省略。。。。
  3. public void Build(IWorkflowBuilder<MyDataClass>builder)
  4. {
  5. builder
  6. .StartWith<FirstStepBody>()
  7. .Input(step => step.Input1,data =>data.Value1)
  8. .Input(step => step.Input2, data => 100)
  9. .Output(data => data.Answer, step =>step.Output)
  10. .WaitFor("MyEvent",key => "EventKey")
  11. .Output(data => data.Answer,step =>step.EventData)
  12. .Then<FirstStepBody>()
  13. .Input(step => step.Input1, data =>data.Value1)
  14. .Input(step => step.Input2, data =>data.Answer)
  15. .Output(data => data.Answer, step =>step.Output);
  16. }
  17. }
  18. //。。。
  19. [HttpGet(Name = "get")]
  20. publicContentResult Get()
  21. {
  22. MyDataClass myDataClass = newMyDataClass();
  23. myDataClass.Value1 = 100;
  24. myDataClass.Value2 = 200;
  25. _workflowHost.StartWorkflow("HelloWorld", myDataClass);
  26. return Content("ok");
  27. }
  28.   [HttpPost(Name = "event")]
  29. publicContentResult PublishEvent()
  30.   {
  31.     _workflowHost.PublishEvent("MyEvent", "EventKey", 200);
  32. return Content("ok");
  33.   }

使用 WaitFor 方法可以使工作流等待监听指定事件的执行,有两个入参事件名称事件关键字。通过工作流主机去触发 PublishEvent 执行指定的事件,有三个入参触发事件名称触发事件关键字和事件参数

需要执行事件,工作流才会继续下一步,如下动图演示:

可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

  1. WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))

四、活动

活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

在本例中,工作流将等待活动 activity-1 ,直到活动完成才继续工作流。它还将 data.Value1 的值传递给活动,然后将活动的结果映射到 data.Value2

然后我们创建一个 worker 来处理活动项的队列。它使用 GetPendingActivity 方法来获取工作流正在等待的活动和数据。

  1. //.....
  2. builder
  3. .StartWith<FirstStepBody>()
  4. .Input(step => step.Input1,data =>data.Value1)
  5. .Input(step => step.Input2, data => 100)
  6. .Output(data => data.Answer, step =>step.Output)
  7. .Activity("activity-1", (data) =>data.Value1)
  8. .Output(data => data.Value2, step =>step.Result)
  9. .Then<FirstStepBody>()
  10. .Input(step => step.Input1, data =>data.Value1)
  11. .Input(step => step.Input2, data =>data.Answer)
  12. .Output(data => data.Answer, step =>step.Output);
  13. //....
  14. [HttpPost(Name = "active")]
  15. publicContentResult PublishEvent()
  16.    {
  17. var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
  18. if (activity != null)
  19.     {
  20.       Console.WriteLine(activity.Parameters);
  21.       _workflowHost.SubmitActivitySuccess(activity.Token, 100);
  22.     }
  23. return Content("ok");
  24.    }

活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

五、错误处理

每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。

  1. public void Build(IWorkflowBuilder<object>builder)
  2. {
  3. builder
  4. .StartWith<HelloWorld>()
  5. .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))
  6. .Then<GoodbyeWorld>();
  7. }

六、流程控制

工作流的流程控制包括分支、循环等各种操作

决策分支

在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。

使用 IWorkflowBuilderCreateBranch 方法定义分支。然后我们可以使用 branch 方法选择一个分支。

选择表达式将与通过 branch 方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

如果 data.Value1 的值为 1 ,则此工作流将选择 branch1 ,如果为 2 ,则选择 branch2

  1. var branch1 =builder.CreateBranch()
  2. .StartWith<PrintMessage>()
  3. .Input(step => step.Message, data => "hi from 1")
  4. .Then<PrintMessage>()
  5. .Input(step => step.Message, data => "bye from 1");
  6.   var branch2 =builder.CreateBranch()
  7. .StartWith<PrintMessage>()
  8. .Input(step => step.Message, data => "hi from 2")
  9. .Then<PrintMessage>()
  10. .Input(step => step.Message, data => "bye from 2");
  11.   builder
  12. .StartWith<HelloWorld>()
  13. .Decide(data =>data.Value1)
  14. .Branch((data, outcome) => data.Value1 == "one", branch1)
  15. .Branch((data, outcome) => data.Value1 == "two", branch2);

并行ForEach

使用 ForEach 方法启动并行 for 循环

  1. public classForEachWorkflow : IWorkflow
  2.   {
  3.   public string Id => "Foreach";
  4.   public int Version => 1;
  5.   public void Build(IWorkflowBuilder<object>builder)
  6.   {
  7.   builder
  8.   .StartWith<SayHello>()
  9.   .ForEach(data => new List<int>() { 1, 2, 3, 4})
  10.   .Do(x =>x
  11.   .StartWith<DisplayContext>()
  12.   .Input(step => step.Message, (data, context) =>context.Item)
  13.   .Then<DoSomething>())
  14.   .Then<SayGoodbye>();
  15.   }
  16.   }

While循环

使用 While 方法启动 while 循环

  1. public class WhileWorkflow : IWorkflow<MyData>{
  2.   public string Id => "While";
  3.   public int Version => 1;
  4.   public void Build(IWorkflowBuilder<MyData>builder)
  5.   {
  6.   builder
  7.   .StartWith<SayHello>()
  8.   .While(data => data.Counter < 3)
  9.   .Do(x =>x
  10.   .StartWith<DoSomething>()
  11.   .Then<IncrementStep>()
  12.   .Input(step => step.Value1, data =>data.Counter)
  13.   .Output(data => data.Counter, step =>step.Value2))
  14.   .Then<SayGoodbye>();
  15.   }
  16.   }

If判断

使用 If 方法执行 if 判断

  1. public class IfWorkflow : IWorkflow<MyData>{
  2.   public void Build(IWorkflowBuilder<MyData>builder)
  3.   {
  4.   builder
  5.   .StartWith<SayHello>()
  6.   .If(data => data.Counter < 3).Do(then =>then
  7.    .StartWith<PrintMessage>()
  8.   .Input(step => step.Message, data => "Value is less than 3")
  9.   )
  10.   .If(data => data.Counter < 5).Do(then =>then
  11.   .StartWith<PrintMessage>()
  12.   .Input(step => step.Message, data => "Value is less than 5")
  13.   )
  14.   .Then<SayGoodbye>();
  15.   }
  16.   }

并行

使用 Parallel 方法并行执行任务

  1. public class ParallelWorkflow : IWorkflow<MyData>{
  2.   public string Id => "parallel-sample";
  3.   public int Version => 1;
  4.   public void Build(IWorkflowBuilder<MyData>builder)
  5.   {
  6.   builder
  7.   .StartWith<SayHello>()
  8.   .Parallel()
  9.   .Do(then =>then.StartWith<Task1dot1>()
  10.   .Then<Task1dot2>()
  11.   .Do(then =>then.StartWith<Task2dot1>()
  12.   .Then<Task2dot2>().Join()
  13.   .Then<SayGoodbye>();
  14. }
  15. }

Schedule

使用 Schedule 方法在工作流中注册在指定时间后执行的异步方法

  1. builder
  2. .StartWith(context => Console.WriteLine("Hello"))
  3. .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule =>schedule
  4. .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
  5. )
  6. .Then(context => Console.WriteLine("Doing normal tasks"));

Recur

使用 Recure 方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

  1. builder
  2. .StartWith(context => Console.WriteLine("Hello"))
  3. .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur =>recur
  4. .StartWith(context => Console.WriteLine("Doing recurring task"))
  5. )
  6. .Then(context => Console.WriteLine("Carry on"));

七、Saga transaction 

saga 允许在 saga transaction 中封装一系列步骤,并为每一个步骤提供补偿步骤,使用 CompensateWith 方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

如下示例,步骤 Task2 如果抛出一个异常,那么补偿步骤 UndoTask2UndoTask1 将被触发。

  1. builder
  2. .StartWith(context => Console.WriteLine("Begin"))
  3. .Saga(saga =>saga
  4. .StartWith<Task1>()
  5. .CompensateWith<UndoTask1>()
  6. .Then<Task2>()
  7. .CompensateWith<UndoTask2>()
  8. .Then<Task3>()
  9. .CompensateWith<UndoTask3>()
  10. )
  11. .CompensateWith<CleanUp>()
  12. .Then(context => Console.WriteLine("End"));

也可以指定重试策略,在指定时间间隔后重试。

  1. builder
  2. .StartWith(context => Console.WriteLine("Begin"))
  3. .Saga(saga =>saga
  4. .StartWith<Task1>()
  5. .CompensateWith<UndoTask1>()
  6. .Then<Task2>()
  7. .CompensateWith<UndoTask2>()
  8. .Then<Task3>()
  9. .CompensateWith<UndoTask3>()
  10. )
  11. .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
  12. .Then(context => Console.WriteLine("End"));

八、持久化

可以使用 RedisMongdbSqlserver 等持久化,具体可以看文档,此处使用 Redis ,先安装 nuget

  1. Install-Package WorkflowCore.Providers.Redis

然后注入就可以了

  1. builder.Services.AddWorkflow(cfg =>{
  2. cfg.UseRedisPersistence("localhost:6379", "app-name");
  3. cfg.UseRedisLocking("localhost:6379");
  4. cfg.UseRedisQueues("localhost:6379", "app-name");
  5. cfg.UseRedisEventHub("localhost:6379", "channel-name");
  6. //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");
  7. //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
  8. });

运行打开可以看到

相关文章

最新文章

更多