From 1fffdf167bdf863f025bb4e25f948c6b52e6cb72 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Fri, 6 Mar 2020 17:37:35 +0800 Subject: [PATCH] 完善逻辑与测试 --- samples/apis/Backet.Api/Grains/BacketGrain.cs | 1 + samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs | 7 ++++--- samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs | 5 +++-- samples/apis/SagasTest.Api/Controllers/SagasTestController.cs | 28 ++-------------------------- src/Pole.Sagas/Core/Abstraction/IActivity.cs | 5 +++-- src/Pole.Sagas/Core/Abstraction/IEventSender.cs | 12 +++++++----- src/Pole.Sagas/Core/ActivityStatus.cs | 4 ++++ src/Pole.Sagas/Core/ActivityWapper.cs | 22 +++++++++++++--------- src/Pole.Sagas/Core/EventSender.cs | 14 ++++++++++++-- src/Pole.Sagas/Core/ISaga.cs | 2 +- src/Pole.Sagas/Core/Saga.cs | 67 +++++++++++++++++++++++++++++++++++++++++++++++-------------------- src/Pole.Sagas/Core/SagaFactory.cs | 4 ++++ src/Pole.Sagas/Core/SagaStatus.cs | 3 ++- 22 files changed, 130 insertions(+), 89 deletions(-) diff --git a/samples/apis/Backet.Api/Grains/BacketGrain.cs b/samples/apis/Backet.Api/Grains/BacketGrain.cs index 8a12dc5..3e72821 100644 --- a/samples/apis/Backet.Api/Grains/BacketGrain.cs +++ b/samples/apis/Backet.Api/Grains/BacketGrain.cs @@ -5,6 +5,7 @@ using Pole.Core.Grains; using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace Backet.Api.Grains diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs index ddad902..6d164d8 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -15,12 +16,12 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public Task Compensate(Transaction1Dto data) + public Task Compensate(Transaction1Dto data,CancellationToken cancellationToken) { throw new NotImplementedException(); } - public async Task Execute(Transaction1Dto data) + public async Task Execute(Transaction1Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs index 1f64ab8..18797dc 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction1Dto data) + public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack"); } - public Task Execute(Transaction1Dto data) + public Task Execute(Transaction1Dto data, CancellationToken cancellationToken) { throw new NotImplementedException(); } diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs index df23b98..ec2930d 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -15,18 +16,18 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction1Dto data) + public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack"); } - public async Task Execute(Transaction1Dto data) + public async Task Execute(Transaction1Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); - var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok"); + var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok", cancellationToken); return ActivityExecuteResult.Success; } } diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs index 4758b41..6f446a4 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction1Dto data) + public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack"); } - public async Task Execute(Transaction1Dto data) + public async Task Execute(Transaction1Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs index 85af77c..d75dddf 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -15,12 +16,12 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public Task Compensate(Transaction2Dto data) + public Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) { throw new NotImplementedException(); } - public async Task Execute(Transaction2Dto data) + public async Task Execute(Transaction2Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs index 4118a8b..7e3610a 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction2Dto data) + public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack"); } - public Task Execute(Transaction2Dto data) + public Task Execute(Transaction2Dto data, CancellationToken cancellationToken) { throw new NotImplementedException(); } diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs index 24c19e4..ef68f52 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -15,14 +16,14 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction2Dto data) + public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack"); } - public async Task Execute(Transaction2Dto data) + public async Task Execute(Transaction2Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs index 0e86cf7..0da07ab 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities @@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction2Dto data) + public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack"); } - public async Task Execute(Transaction2Dto data) + public async Task Execute(Transaction2Dto data, CancellationToken cancellationToken) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs index 57b3da2..e071511 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs @@ -3,19 +3,20 @@ using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities { public class Transaction3ExceptionActivity : IActivity { - public Task Compensate(Transaction3Dto data) + public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken) { Console.WriteLine("Transaction3 Rollback"); return Task.CompletedTask; } - public Task Execute(Transaction3Dto data) + public Task Execute(Transaction3Dto data, CancellationToken cancellationToken) { throw new NotImplementedException(); } diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs index 0ae948f..74f1470 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs @@ -3,19 +3,20 @@ using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities { public class Transaction3HasResultActivity : IActivity { - public Task Compensate(Transaction3Dto data) + public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken) { Console.WriteLine("Transaction3 Rollback"); return Task.CompletedTask; } - public Task Execute(Transaction3Dto data) + public Task Execute(Transaction3Dto data, CancellationToken cancellationToken) { Console.WriteLine("Transaction3 commit"); var result = new ActivityExecuteResult diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs index d5b7d4e..af083f7 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs @@ -3,19 +3,20 @@ using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace SagasTest.Api.Activities { public class Transaction3ReturnFalseActivity : IActivity { - public Task Compensate(Transaction3Dto data) + public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken) { Console.WriteLine("Transaction3 Rollback"); return Task.CompletedTask; } - public Task Execute(Transaction3Dto data) + public Task Execute(Transaction3Dto data, CancellationToken cancellationToken) { Console.WriteLine("Transaction3 commit"); var result = new ActivityExecuteResult diff --git a/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs b/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs index 70a0bc5..7912332 100644 --- a/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs +++ b/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs @@ -24,8 +24,8 @@ namespace SagasTest.Api.Controllers var sagas = sagaFactory.CreateSaga(); sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" }); - sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" }); - sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" }); + sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" }); + sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" }); var result = await sagas.GetResult(); } @@ -117,29 +117,5 @@ namespace SagasTest.Api.Controllers var result = await sagas.GetResult(); } - // GET api/values/5 - [HttpGet("{id}")] - public ActionResult Get(int id) - { - return "value"; - } - - // POST api/values - [HttpPost] - public void Post([FromBody] string value) - { - } - - // PUT api/values/5 - [HttpPut("{id}")] - public void Put(int id, [FromBody] string value) - { - } - - // DELETE api/values/5 - [HttpDelete("{id}")] - public void Delete(int id) - { - } } } diff --git a/src/Pole.Sagas/Core/Abstraction/IActivity.cs b/src/Pole.Sagas/Core/Abstraction/IActivity.cs index 7df9804..bfc4ecf 100644 --- a/src/Pole.Sagas/Core/Abstraction/IActivity.cs +++ b/src/Pole.Sagas/Core/Abstraction/IActivity.cs @@ -1,13 +1,14 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace Pole.Sagas.Core.Abstraction { public interface IActivity { - Task Execute(TData data); - Task Compensate(TData data); + Task Execute(TData data ,CancellationToken cancellationToken); + Task Compensate(TData data, CancellationToken cancellationToken); } } diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 7aaf7c8..515318b 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -6,12 +6,14 @@ namespace Pole.Sagas.Core.Abstraction public interface IEventSender { Task SagaStarted(string sagaId, string serviceName); - Task SagaEnded(string sagaId,DateTime ExpiresAt); - Task ActivityExecuteStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent,int order); - Task ActivityRetried(string activityId, string status, int retries,string resultContent); - Task ActivityExecuteAborted(string activityId,string resultContent,string errors); - Task ActivityCompensateAborted(string activityId,string sagaId,string errors); + Task SagaEnded(string sagaId, DateTime ExpiresAt); + Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, string parameterContent, int order); + Task ActivityRetried(string activityId, string status, int retries, string resultContent); + Task ActivityExecuteAborted(string activityId, string errors); + Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityEnded(string activityId, string resultContent); Task ActivityCompensated(string activityId); + Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); + Task ActivityRevoked(string activityId); } } diff --git a/src/Pole.Sagas/Core/ActivityStatus.cs b/src/Pole.Sagas/Core/ActivityStatus.cs index 3c7c567..e7ed9fb 100644 --- a/src/Pole.Sagas/Core/ActivityStatus.cs +++ b/src/Pole.Sagas/Core/ActivityStatus.cs @@ -7,9 +7,13 @@ namespace Pole.Sagas.Core public enum ActivityStatus { NotStarted, + Executing, Executed, + Compensating, Compensated, ExecuteAborted, + Revoked, CompensateAborted, + Overtime } } diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs index a3dbbab..52c4486 100644 --- a/src/Pole.Sagas/Core/ActivityWapper.cs +++ b/src/Pole.Sagas/Core/ActivityWapper.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq.Expressions; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace Pole.Sagas.Core @@ -16,21 +17,23 @@ namespace Pole.Sagas.Core public int Order { get; set; } public ActivityStatus ActivityState { get; set; } public IServiceProvider ServiceProvider { get; set; } - public DateTime TimeOut { get; set; } + public int TimeOutSeconds { get; set; } + public CancellationTokenSource CancellationTokenSource { get; set; } public Task InvokeExecute() { var activityObjParams = Expression.Parameter(typeof(object), "activity"); var activityParams = Expression.Convert(activityObjParams, ActivityType); var dataObjParams = Expression.Parameter(typeof(object), "data"); var dataParams = Expression.Convert(dataObjParams, ActivityDataType); - var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType }); - var body = Expression.Call(activityParams, method, dataParams); - var func = Expression.Lambda>>(body, activityObjParams, dataObjParams).Compile(); + var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); + var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType, typeof(CancellationToken) }); + var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); + var func = Expression.Lambda>>(body, true, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); using (var scope = ServiceProvider.CreateScope()) { var activity = scope.ServiceProvider.GetRequiredService(ActivityType); - return func(activity, DataObj); + return func(activity, DataObj, CancellationTokenSource.Token); } } public Task InvokeCompensate() @@ -39,14 +42,15 @@ namespace Pole.Sagas.Core var activityParams = Expression.Convert(activityObjParams, ActivityType); var dataObjParams = Expression.Parameter(typeof(object), "data"); var dataParams = Expression.Convert(dataObjParams, ActivityDataType); - var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType }); - var body = Expression.Call(activityParams, method, dataParams); - var func = Expression.Lambda>(body, activityObjParams, dataObjParams).Compile(); + var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); + var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType, typeof(CancellationToken) }); + var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); + var func = Expression.Lambda>(body, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); using (var scope = ServiceProvider.CreateScope()) { var activity = scope.ServiceProvider.GetRequiredService(ActivityType); - return func(activity, DataObj); + return func(activity, DataObj, CancellationTokenSource.Token); } } } diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index f81602c..bb400e8 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -23,7 +23,7 @@ namespace Pole.Sagas.Core return Task.CompletedTask; } - public Task ActivityExecuteAborted(string activityId, string resultContent, string errors) + public Task ActivityExecuteAborted(string activityId, string errors) { return Task.CompletedTask; } @@ -33,7 +33,7 @@ namespace Pole.Sagas.Core return Task.CompletedTask; } - public Task ActivityExecuteStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent, int order) + public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, string parameterContent, int order) { return Task.CompletedTask; } @@ -47,5 +47,15 @@ namespace Pole.Sagas.Core { return Task.CompletedTask; } + + public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) + { + return Task.CompletedTask; + } + + public Task ActivityRevoked(string activityId) + { + throw new NotImplementedException(); + } } } diff --git a/src/Pole.Sagas/Core/ISaga.cs b/src/Pole.Sagas/Core/ISaga.cs index 47fd2ba..58686c0 100644 --- a/src/Pole.Sagas/Core/ISaga.cs +++ b/src/Pole.Sagas/Core/ISaga.cs @@ -8,7 +8,7 @@ namespace Pole.Sagas.Core public interface ISaga { string Id { get; } - void AddActivity(string activityName, object data); + void AddActivity(string activityName, object data,int timeOutSeconds=2); Task GetResult(); } } diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 259f46b..72b794e 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -18,13 +18,22 @@ namespace Pole.Sagas.Core private ISnowflakeIdGenerator snowflakeIdGenerator; private IActivityFinder activityFinder; private PoleSagasOption poleSagasOption; - private int _currentMaxOrder = 0; + public int CurrentMaxOrder + { + get { return activities.Count; } + } + /// + /// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义 + /// private int _currentExecuteOrder = 0; - private int _currentCompensateOrder = 0; + /// + /// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义 + /// + private int _currentCompensateOrder = -1; private ISerializer serializer; public string Id { get; } - public Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) { this.snowflakeIdGenerator = snowflakeIdGenerator; this.serviceProvider = serviceProvider; @@ -35,7 +44,7 @@ namespace Pole.Sagas.Core Id = snowflakeIdGenerator.NextId(); } - public void AddActivity(string activityName, object data) + public void AddActivity(string activityName, object data, int timeOutSeconds = 2) { var targetActivityType = activityFinder.FindType(activityName); @@ -45,15 +54,15 @@ namespace Pole.Sagas.Core throw new ActivityImplementIrregularException(activityName); } var dataType = activityInterface.GetGenericArguments()[0]; - _currentMaxOrder++; ActivityWapper activityWapper = new ActivityWapper { ActivityDataType = dataType, ActivityState = ActivityStatus.NotStarted, ActivityType = targetActivityType, DataObj = data, - Order = _currentMaxOrder, - ServiceProvider = serviceProvider + Order = CurrentMaxOrder, + ServiceProvider = serviceProvider, + TimeOutSeconds = 2, }; activities.Add(activityWapper); } @@ -75,7 +84,7 @@ namespace Pole.Sagas.Core private ActivityWapper GetNextExecuteActivity() { - if (_currentExecuteOrder == _currentMaxOrder) + if (_currentExecuteOrder == CurrentMaxOrder) { return null; } @@ -115,15 +124,16 @@ namespace Pole.Sagas.Core { var activityId = snowflakeIdGenerator.NextId(); activityWapper.Id = activityId; + activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); try { var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOut, jsonContent, activityWapper.Order); + await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, jsonContent, activityWapper.Order); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { - await eventSender.ActivityExecuteAborted(activityId, serializer.Serialize(result.Result), string.Empty); - await CompensateActivity(result); + await eventSender.ActivityRevoked(activityId); + await CompensateActivity(result,_currentExecuteOrder); return result; } await eventSender.ActivityEnded(activityId, string.Empty); @@ -139,20 +149,37 @@ namespace Pole.Sagas.Core } catch (Exception exception) { - var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; - var result = new ActivityExecuteResult + if (activityWapper.CancellationTokenSource.Token.IsCancellationRequested) + { + var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; + var result = new ActivityExecuteResult + { + IsSuccess = false, + Errors = errors + }; + await eventSender.ActivityExecuteOvertime(activityId, Id, errors); + // 超时的时候 需要首先补偿这个超时的操作 + return await CompensateActivity(result,_currentExecuteOrder+1); + } + else { - IsSuccess = false, - Errors = errors - }; - await eventSender.ActivityExecuteAborted(activityId, string.Empty, errors); - return await CompensateActivity(result); + var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; + var result = new ActivityExecuteResult + { + IsSuccess = false, + Errors = errors + }; + await eventSender.ActivityExecuteAborted(activityId, errors); + // 出错的时候 需要首先补偿这个出错的操作 + return await CompensateActivity(result, _currentExecuteOrder + 1); + } } } - private async Task CompensateActivity(ActivityExecuteResult result) + private async Task CompensateActivity(ActivityExecuteResult result,int currentCompensateOrder) { - _currentCompensateOrder = _currentExecuteOrder; + _currentCompensateOrder = currentCompensateOrder; + _currentExecuteOrder = -1; var compensateActivity = GetNextCompensateActivity(); if (compensateActivity == null) { diff --git a/src/Pole.Sagas/Core/SagaFactory.cs b/src/Pole.Sagas/Core/SagaFactory.cs index ab6585c..aa9cb9e 100644 --- a/src/Pole.Sagas/Core/SagaFactory.cs +++ b/src/Pole.Sagas/Core/SagaFactory.cs @@ -30,5 +30,9 @@ namespace Pole.Sagas.Core { return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); } + internal ISaga CreateSaga(string id) + { + return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); + } } } diff --git a/src/Pole.Sagas/Core/SagaStatus.cs b/src/Pole.Sagas/Core/SagaStatus.cs index 59940dd..cf04215 100644 --- a/src/Pole.Sagas/Core/SagaStatus.cs +++ b/src/Pole.Sagas/Core/SagaStatus.cs @@ -8,6 +8,7 @@ namespace Pole.Sagas.Core { Started, Ended, - Error + Error, + Overtime } } -- libgit2 0.25.0