控制结构
分支
可以在工作流中定义多个分支,更根据表达式的值选择执行哪个分支。
使用CreateBranch()
方法定义分支
//创建两个分支 var branch1 = builder.CreateBranch() .StartWith<PrintMessage>() .Input(step => step.Message, data => "hi from 1") .Then<PrintMessage>() .Input(step => step.Message, data => "bye from 1"); var branch2 = builder.CreateBranch() .StartWith<PrintMessage>() .Input(step => step.Message, data => "hi from 2") .Then<PrintMessage>() .Input(step => step.Message, data => "bye from 2"); //如果data.value1的值为one,则执行branch1 //如果data.value1的值为two,则执行branch2 builder .StartWith<HelloWorld>() .Decide(data => data.Value1) .Branch((data, outcome) => data.Value1 == "one", branch1) .Branch((data, outcome) => data.Value1 == "two", branch2);
foreach
public class ForEachWorkflow : IWorkflow { public string Id => "Foreach"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<SayHello>() .ForEach(data => new List<int>() { 1, 2, 3, 4 }) .Do(x => x .StartWith<DisplayContext>() .Input(step => step.Message, (data, context) => context.Item) .Then<DoSomething>()) .Then<SayGoodbye>(); } }
while
public class WhileWorkflow : IWorkflow<MyData> { public string Id => "While"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .While(data => data.Counter < 3) .Do(x => x .StartWith<DoSomething>() .Then<IncrementStep>() .Input(step => step.Value1, data => data.Counter) .Output(data => data.Counter, step => step.Value2)) .Then<SayGoodbye>(); } }
if
public class IfWorkflow : IWorkflow<MyData> { public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .If(data => data.Counter < 3).Do(then => then .StartWith<PrintMessage>() .Input(step => step.Message, data => "Value is less than 3") ) .If(data => data.Counter < 5).Do(then => then .StartWith<PrintMessage>() .Input(step => step.Message, data => "Value is less than 5") ) .Then<SayGoodbye>(); } }
并行
public class ParallelWorkflow : IWorkflow<MyData> { public string Id => "parallel-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .Parallel() .Do(then => then.StartWith<Task1dot1>() .Then<Task1dot2>() .Do(then => then.StartWith<Task2dot1>() .Then<Task2dot2>() .Do(then => then.StartWith<Task3dot1>() .Then<Task3dot2>() .Join() .Then<SayGoodbye>(); } }
按计划执行
在未来按照计划执行,使得工作流在后台异步执行
builder .StartWith(context => Console.WriteLine("Hello")) .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule .StartWith(context => Console.WriteLine("Doing scheduled tasks")) ) .Then(context => Console.WriteLine("Doing normal tasks"));
延迟执行
延迟工作流的当前分支一段时间
builder .StartWith(context => Console.WriteLine("Hello")) .Delay(p => TimeSpan.FromSeconds(5) );
重复
设置一组重复的后台步骤,知道满足特定条件
builder .StartWith(context => Console.WriteLine("Hello")) .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur .StartWith(context => Console.WriteLine("Doing recurring task")) ) .Then(context => Console.WriteLine("Carry on"));
案例 Parallel ForEach
steps
public class SayHello : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello"); return ExecutionResult.Next(); } } public class DisplayContext : StepBody { public object Item { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine($"Working on item {Item}"); return ExecutionResult.Next(); } } public class DoSomething : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Doing something..."); return ExecutionResult.Next(); } } public class SayGoodbye : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Goodbye"); return ExecutionResult.Next(); } }
workflow
public class ForEachWorkflow : IWorkflow { public string Id => "Foreach"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<SayHello>() .ForEach(data => new List<int> { 1, 2, 3, 4 }) .Do(x => x .StartWith<DisplayContext>() .Input(step => step.Item, (data, context) => context.Item) .Then<DoSomething>()) .Then<SayGoodbye>(); } }
program
IServiceCollection services = new ServiceCollection(); services.AddLogging(); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<ForEachWorkflow>(); host.Start(); Console.WriteLine("Starting workflow..."); string workflowId = host.StartWorkflow("Foreach").Result; Console.ReadLine(); host.Stop();
案例If
steps
public class PrintMessage : StepBody { public string Message { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine(Message); return ExecutionResult.Next(); } } public class SayGoodbye : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Goodbye"); return ExecutionResult.Next(); } } public class SayHello : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello"); return ExecutionResult.Next(); } }
workflow
public class MyData { public int Counter { get; set; } } public class IfWorkflow : IWorkflow<MyData> { public string Id => "if-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .If(data => data.Counter < 3).Do(then => then .StartWith<PrintMessage>() .Input(step => step.Message, data => "Value is less than 3") ) .If(data => data.Counter < 5).Do(then => then .StartWith<PrintMessage>() .Input(step => step.Message, data => "Value is less than 5") ) .Then<SayGoodbye>(); } }
program
IServiceCollection services = new ServiceCollection(); services.AddLogging(); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<IfWorkflow, MyData>(); host.Start(); Console.WriteLine("Starting workflow..."); string workflowId = host.StartWorkflow("if-sample", new MyData { Counter = 4 }).Result; Console.ReadLine(); host.Stop();
案例 Parallel Tasks
steps
public class PrintMessage : StepBody { public string Message { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("当前线程:"+ Thread.GetCurrentProcessorId().ToString()+ "message:"+Message); return ExecutionResult.Next(); } } public class SayHello : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello"); return ExecutionResult.Next(); } } public class SayGoodbye : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Goodbye"); return ExecutionResult.Next(); } }
workflow
public class MyData { public int Counter { get; set; } } public class ParallelWorkflow : IWorkflow<MyData> { public string Id => "parallel-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<SayHello>() .Parallel() .Do(then => then.StartWith<PrintMessage>() .Input(step => step.Message, data => "Item 1.1") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 1.2")) .Do(then => then.StartWith<PrintMessage>() .Input(step => step.Message, data => "Item 2.1") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 2.2") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 2.3")) .Do(then => then.StartWith<PrintMessage>() .Input(step => step.Message, data => "Item 3.1") .Then<PrintMessage>() .Input(step => step.Message, data => "Item 3.2")) .Join() .Then<SayGoodbye>(); } }
program
IServiceCollection services = new ServiceCollection(); services.AddLogging(); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); var controller = serviceProvider.GetService<IWorkflowController>(); controller.RegisterWorkflow<ParallelWorkflow, MyData>();//使用容器进行注册 Console.WriteLine("Starting workflow..."); controller.StartWorkflow<MyData>("parallel-sample");//使用容器开始 Console.ReadLine(); host.Stop();
案例Schedule
workflow
class ScheduleWorkflow : IWorkflow { public string Id => "schedule-sample"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder) { builder .StartWith(context => Console.WriteLine("Hello")) .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule .StartWith(context => Console.WriteLine("Doing scheduled tasks")) ) .Then(context => { Console.WriteLine("Doing normal tasks"); Console.WriteLine("等待5s"); }); } }
案例Recur
public class MyData { public int Counter { get; set; } } class RecurSampleWorkflow : IWorkflow<MyData> { public string Id => "recur-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith(context => Console.WriteLine("Hello")) //5s一次,直到data.Counter > 5 .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur .StartWith(context => Console.WriteLine("Doing recurring task")) ) .Then(context => Console.WriteLine("Carry on")); } }
案例Branch
steps
public class PrintMessage : StepBody { public string Message { get; set; } public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine(Message); return ExecutionResult.Next(); } } public class SayHello : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello"); return ExecutionResult.Next(); } }
workflow
public class MyData { public int Value { get; set; } } public class OutcomeWorkflow : IWorkflow<MyData> { public string Id => "outcome-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder) { var branch1 = builder.CreateBranch() .StartWith<PrintMessage>() .Input(step => step.Message, data => "hi from 1") .Then<PrintMessage>() .Input(step => step.Message, data => "bye from 1"); var branch2 = builder.CreateBranch() .StartWith<PrintMessage>() .Input(step => step.Message, data => "hi from 2") .Then<PrintMessage>() .Input(step => step.Message, data => "bye from 2"); //如果data.value1的值为one,则执行branch1 //如果data.value1的值为two,则执行branch2 builder .StartWith<SayHello>() .Decide(data => data.Value) .Branch(1, branch1) .Branch(2, branch2); } }
program
IServiceCollection services = new ServiceCollection(); services.AddLogging(); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<OutcomeWorkflow, MyData>(); host.Start(); Console.WriteLine("Starting workflow..."); host.StartWorkflow("outcome-sample", new MyData { Value = 2 }); Console.ReadLine(); host.Stop();