Commit 1fffdf16 by dingsongjie

完善逻辑与测试

parent e519b36d
......@@ -5,6 +5,7 @@ using Pole.Core.Grains;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Backet.Api.Grains
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -15,12 +16,12 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public Task Compensate(Transaction1Dto data)
public Task Compensate(Transaction1Dto data,CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data)
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack");
}
public Task<ActivityExecuteResult> Execute(Transaction1Dto data)
public Task<ActivityExecuteResult> Execute(Transaction1Dto data, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -15,18 +16,18 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data)
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok", cancellationToken);
return ActivityExecuteResult.Success;
}
}
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data)
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -15,12 +16,12 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public Task Compensate(Transaction2Dto data)
public Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data)
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack");
}
public Task<ActivityExecuteResult> Execute(Transaction2Dto data)
public Task<ActivityExecuteResult> Execute(Transaction2Dto data, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -15,14 +16,14 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data)
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
......@@ -16,14 +17,14 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data)
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data, CancellationToken cancellationToken)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -3,19 +3,20 @@ using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction3ExceptionActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data)
public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
}
public Task<ActivityExecuteResult> Execute(Transaction3Dto data)
public Task<ActivityExecuteResult> Execute(Transaction3Dto data, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
......
......@@ -3,19 +3,20 @@ using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction3HasResultActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data)
public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
}
public Task<ActivityExecuteResult> Execute(Transaction3Dto data)
public Task<ActivityExecuteResult> Execute(Transaction3Dto data, CancellationToken cancellationToken)
{
Console.WriteLine("Transaction3 commit");
var result = new ActivityExecuteResult
......
......@@ -3,19 +3,20 @@ using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction3ReturnFalseActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data)
public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
}
public Task<ActivityExecuteResult> Execute(Transaction3Dto data)
public Task<ActivityExecuteResult> Execute(Transaction3Dto data, CancellationToken cancellationToken)
{
Console.WriteLine("Transaction3 commit");
var result = new ActivityExecuteResult
......
......@@ -24,8 +24,8 @@ namespace SagasTest.Api.Controllers
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
......@@ -117,29 +117,5 @@ namespace SagasTest.Api.Controllers
var result = await sagas.GetResult();
}
// GET api/values/5
[HttpGet("{id}")]
public ActionResult<string> Get(int id)
{
return "value";
}
// POST api/values
[HttpPost]
public void Post([FromBody] string value)
{
}
// PUT api/values/5
[HttpPut("{id}")]
public void Put(int id, [FromBody] string value)
{
}
// DELETE api/values/5
[HttpDelete("{id}")]
public void Delete(int id)
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core.Abstraction
{
public interface IActivity<TData>
{
Task<ActivityExecuteResult> Execute(TData data);
Task Compensate(TData data);
Task<ActivityExecuteResult> Execute(TData data ,CancellationToken cancellationToken);
Task Compensate(TData data, CancellationToken cancellationToken);
}
}
......@@ -6,12 +6,14 @@ namespace Pole.Sagas.Core.Abstraction
public interface IEventSender
{
Task SagaStarted(string sagaId, string serviceName);
Task SagaEnded(string sagaId,DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent,int order);
Task ActivityRetried(string activityId, string status, int retries,string resultContent);
Task ActivityExecuteAborted(string activityId,string resultContent,string errors);
Task ActivityCompensateAborted(string activityId,string sagaId,string errors);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, string parameterContent, int order);
Task ActivityRetried(string activityId, string status, int retries, string resultContent);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityEnded(string activityId, string resultContent);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityRevoked(string activityId);
}
}
......@@ -7,9 +7,13 @@ namespace Pole.Sagas.Core
public enum ActivityStatus
{
NotStarted,
Executing,
Executed,
Compensating,
Compensated,
ExecuteAborted,
Revoked,
CompensateAborted,
Overtime
}
}
......@@ -3,6 +3,7 @@ using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
......@@ -16,21 +17,23 @@ namespace Pole.Sagas.Core
public int Order { get; set; }
public ActivityStatus ActivityState { get; set; }
public IServiceProvider ServiceProvider { get; set; }
public DateTime TimeOut { get; set; }
public int TimeOutSeconds { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
public Task<ActivityExecuteResult> InvokeExecute()
{
var activityObjParams = Expression.Parameter(typeof(object), "activity");
var activityParams = Expression.Convert(activityObjParams, ActivityType);
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType });
var body = Expression.Call(activityParams, method, dataParams);
var func = Expression.Lambda<Func<object, object, Task<ActivityExecuteResult>>>(body, activityObjParams, dataObjParams).Compile();
var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct");
var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType, typeof(CancellationToken) });
var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams);
var func = Expression.Lambda<Func<object, object, CancellationToken, Task<ActivityExecuteResult>>>(body, true, activityObjParams, dataObjParams, cancellationTokenParams).Compile();
using (var scope = ServiceProvider.CreateScope())
{
var activity = scope.ServiceProvider.GetRequiredService(ActivityType);
return func(activity, DataObj);
return func(activity, DataObj, CancellationTokenSource.Token);
}
}
public Task InvokeCompensate()
......@@ -39,14 +42,15 @@ namespace Pole.Sagas.Core
var activityParams = Expression.Convert(activityObjParams, ActivityType);
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType });
var body = Expression.Call(activityParams, method, dataParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, activityObjParams, dataObjParams).Compile();
var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct");
var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType, typeof(CancellationToken) });
var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams);
var func = Expression.Lambda<Func<object, object, CancellationToken, Task>>(body, activityObjParams, dataObjParams, cancellationTokenParams).Compile();
using (var scope = ServiceProvider.CreateScope())
{
var activity = scope.ServiceProvider.GetRequiredService(ActivityType);
return func(activity, DataObj);
return func(activity, DataObj, CancellationTokenSource.Token);
}
}
}
......
......@@ -23,7 +23,7 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
public Task ActivityExecuteAborted(string activityId, string resultContent, string errors)
public Task ActivityExecuteAborted(string activityId, string errors)
{
return Task.CompletedTask;
}
......@@ -33,7 +33,7 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
public Task ActivityExecuteStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent, int order)
public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, string parameterContent, int order)
{
return Task.CompletedTask;
}
......@@ -47,5 +47,15 @@ namespace Pole.Sagas.Core
{
return Task.CompletedTask;
}
public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors)
{
return Task.CompletedTask;
}
public Task ActivityRevoked(string activityId)
{
throw new NotImplementedException();
}
}
}
......@@ -8,7 +8,7 @@ namespace Pole.Sagas.Core
public interface ISaga
{
string Id { get; }
void AddActivity(string activityName, object data);
void AddActivity(string activityName, object data,int timeOutSeconds=2);
Task<SagaResult> GetResult();
}
}
......@@ -18,13 +18,22 @@ namespace Pole.Sagas.Core
private ISnowflakeIdGenerator snowflakeIdGenerator;
private IActivityFinder activityFinder;
private PoleSagasOption poleSagasOption;
private int _currentMaxOrder = 0;
public int CurrentMaxOrder
{
get { return activities.Count; }
}
/// <summary>
/// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义
/// </summary>
private int _currentExecuteOrder = 0;
private int _currentCompensateOrder = 0;
/// <summary>
/// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义
/// </summary>
private int _currentCompensateOrder = -1;
private ISerializer serializer;
public string Id { get; }
public Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder)
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
......@@ -35,7 +44,7 @@ namespace Pole.Sagas.Core
Id = snowflakeIdGenerator.NextId();
}
public void AddActivity(string activityName, object data)
public void AddActivity(string activityName, object data, int timeOutSeconds = 2)
{
var targetActivityType = activityFinder.FindType(activityName);
......@@ -45,15 +54,15 @@ namespace Pole.Sagas.Core
throw new ActivityImplementIrregularException(activityName);
}
var dataType = activityInterface.GetGenericArguments()[0];
_currentMaxOrder++;
ActivityWapper activityWapper = new ActivityWapper
{
ActivityDataType = dataType,
ActivityState = ActivityStatus.NotStarted,
ActivityType = targetActivityType,
DataObj = data,
Order = _currentMaxOrder,
ServiceProvider = serviceProvider
Order = CurrentMaxOrder,
ServiceProvider = serviceProvider,
TimeOutSeconds = 2,
};
activities.Add(activityWapper);
}
......@@ -75,7 +84,7 @@ namespace Pole.Sagas.Core
private ActivityWapper GetNextExecuteActivity()
{
if (_currentExecuteOrder == _currentMaxOrder)
if (_currentExecuteOrder == CurrentMaxOrder)
{
return null;
}
......@@ -115,15 +124,16 @@ namespace Pole.Sagas.Core
{
var activityId = snowflakeIdGenerator.NextId();
activityWapper.Id = activityId;
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
try
{
var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOut, jsonContent, activityWapper.Order);
await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, jsonContent, activityWapper.Order);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
await eventSender.ActivityExecuteAborted(activityId, serializer.Serialize(result.Result), string.Empty);
await CompensateActivity(result);
await eventSender.ActivityRevoked(activityId);
await CompensateActivity(result,_currentExecuteOrder);
return result;
}
await eventSender.ActivityEnded(activityId, string.Empty);
......@@ -139,20 +149,37 @@ namespace Pole.Sagas.Core
}
catch (Exception exception)
{
var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
var result = new ActivityExecuteResult
if (activityWapper.CancellationTokenSource.Token.IsCancellationRequested)
{
var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
var result = new ActivityExecuteResult
{
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteOvertime(activityId, Id, errors);
// 超时的时候 需要首先补偿这个超时的操作
return await CompensateActivity(result,_currentExecuteOrder+1);
}
else
{
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteAborted(activityId, string.Empty, errors);
return await CompensateActivity(result);
var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
var result = new ActivityExecuteResult
{
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteAborted(activityId, errors);
// 出错的时候 需要首先补偿这个出错的操作
return await CompensateActivity(result, _currentExecuteOrder + 1);
}
}
}
private async Task<ActivityExecuteResult> CompensateActivity(ActivityExecuteResult result)
private async Task<ActivityExecuteResult> CompensateActivity(ActivityExecuteResult result,int currentCompensateOrder)
{
_currentCompensateOrder = _currentExecuteOrder;
_currentCompensateOrder = currentCompensateOrder;
_currentExecuteOrder = -1;
var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null)
{
......
......@@ -30,5 +30,9 @@ namespace Pole.Sagas.Core
{
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder);
}
internal ISaga CreateSaga(string id)
{
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder);
}
}
}
......@@ -8,6 +8,7 @@ namespace Pole.Sagas.Core
{
Started,
Ended,
Error
Error,
Overtime
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment