事件
工作流还可以设定等待外部事件
//设定工作流 public class EventSampleWorkflow : IWorkflow<MyDataClass> { public void Build(IWorkflowBuilder<MyDataClass> builder) { builder .StartWith(context => ExecutionResult.Next()) .WaitFor("MyEvent", data => "0")//等待MyEvent的事件发生,且key为0 .Output(data => data.Value, step => step.EventData) .Then<CustomMessage>() .Input(step => step.Message, data => "The data from the event is " + data.Value); } } ... //外部定义的事件 //事件数据的key为0时,value为hello host.PublishEvent("MyEvent", "0", "hello");
生效日期
还可以在等待事件时设定指定生效日期。以便于响应过去已经发生的事件,或者仅仅响应生效日期之后发生的事件。
案例
dataclass
public class MyDataClass { public string Value1 { get; set; } }
step
public class CustomMessage : StepBody { public string Message { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine(Message); return ExecutionResult.Next(); } }
workflow
public class EventSampleWorkflow : IWorkflow<MyDataClass> { public string Id => "EventSampleWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder<MyDataClass> builder) { builder .StartWith(context => ExecutionResult.Next()) .WaitFor("MyEvent", (data, context) => context.Workflow.Id) .Output(data => data.Value1, step => step.EventData) .Then<CustomMessage>() .Input(step => step.Message, data => "The data from the event is " + data.Value1) .Then(context => Console.WriteLine("workflow complete")); } }
program
IServiceCollection services = new ServiceCollection(); services.AddLogging(); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<EventSampleWorkflow, MyDataClass>(); host.Start(); var initialData = new MyDataClass(); var workflowId = host.StartWorkflow("EventSampleWorkflow", 1, initialData).Result; Console.WriteLine("Enter value to publish"); string value = Console.ReadLine(); host.PublishEvent("MyEvent", workflowId, value); Console.ReadLine(); host.Stop();
活动
活动是外部的一个运行程序,该程序可以被工作流等待。
//该工作流将等待activity-1执行,然后再继续执行。 //同时,工作流还将data.Value1的值传递给activity-1活动,并且获取activity-1的运算结果复制给data.value2 public class ActivityWorkflow : IWorkflow<MyData> { public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<HelloWorld>() .Activity("activity-1", (data) => data.Value1) .Output(data => data.Value2, step => step.Result) .Then<PrintMessage>() .Input(step => step.Message, data => data.Value2); } } ... //获取活动和工作流正在等待的数据 参数:活动、工作流id、过期时间 var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result; if (activity != null) { Console.WriteLine(activity.Parameters); host.SubmitActivitySuccess(activity.Token, "Some response data"); }
案例
dataclass
class MyData { public string Request { get; set; } public string ApprovedBy { get; set; } }
steps
public class HelloWorld : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello world"); return ExecutionResult.Next(); } } public class GoodbyeWorld : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Goodbye world"); return ExecutionResult.Next(); } } public class CustomMessage : StepBody { public string Message { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine(Message); return ExecutionResult.Next(); } }
workflow
class ActivityWorkflow : IWorkflow<MyData> { public string Id => "activity-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<HelloWorld>() .Activity("get-approval", (data) => data.Request) .Output(data => data.ApprovedBy, step => step.Result) .Then<CustomMessage>() .Input(step => step.Message, data => "Approved by " + data.ApprovedBy) .Then<GoodbyeWorld>(); } }
program
IServiceCollection services = new ServiceCollection(); services.AddLogging(cfg => { cfg.AddConsole(); cfg.AddDebug(); }); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<ActivityWorkflow, MyData>(); host.Start(); Console.WriteLine("Starting workflow..."); var workflowId = host.StartWorkflow("activity-sample", new MyData { Request = "Spend $1,000,000" }).Result; var approval = host.GetPendingActivity("get-approval", workflowId, TimeSpan.FromMinutes(1)).Result; if (approval != null) { Console.WriteLine("Approval required for " + approval.Parameters); host.SubmitActivitySuccess(approval.Token, "John Smith"); } Console.ReadLine(); host.Stop();
错误处理
每个步骤都可以配置自己的错误处理流程,可以稍后重试,暂停工作流或者终止工作流
public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<HelloWorld>() .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10)) .Then<GoodbyeWorld>(); }
WorkflowHost服务可以定于全局的错误处理,可以用于全局级别拦截工作流的异常
host.OnStepError +=处理异常事件