From 63f4a2bdbc307774e0cf63c8b15f0cbba6a9020b Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Thu, 5 Mar 2020 14:58:43 +0800 Subject: [PATCH] 添加 sagas 核心测试项目 --- Pole.sln | 7 +++++++ samples/apis/Backet.Api/Program.cs | 8 ++++---- samples/apis/SagasTest.Api/Controllers/ValuesController.cs | 45 +++++++++++++++++++++++++++++++++++++++++++++ samples/apis/SagasTest.Api/Program.cs | 24 ++++++++++++++++++++++++ samples/apis/SagasTest.Api/Properties/launchSettings.json | 30 ++++++++++++++++++++++++++++++ samples/apis/SagasTest.Api/SagasTest.Api.csproj | 18 ++++++++++++++++++ samples/apis/SagasTest.Api/Startup.cs | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ samples/apis/SagasTest.Api/appsettings.Development.json | 9 +++++++++ samples/apis/SagasTest.Api/appsettings.json | 8 ++++++++ src/Pole.Core/Processor/PendingMessageRetryProcessor.cs | 3 ++- src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs | 2 +- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 2 +- src/Pole.Sagas/Core/Abstraction/IActivity.cs | 13 +++++++++++++ src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs | 12 ++++++++++++ src/Pole.Sagas/Core/Abstraction/IEventSender.cs | 17 +++++++++++++++++ src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs | 11 +++++++++++ src/Pole.Sagas/Core/ActivityCompensateResult.cs | 15 +++++++++++++++ src/Pole.Sagas/Core/ActivityExecuteResult.cs | 14 +++++++++++++- src/Pole.Sagas/Core/ActivityFinder.cs | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas/Core/ActivityStatus.cs | 15 +++++++++++++++ src/Pole.Sagas/Core/ActivityWapper.cs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas/Core/EventSender.cs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas/Core/Exceptions/ActivityNameIrregularException.cs | 14 ++++++++++++++ src/Pole.Sagas/Core/Exceptions/ActivityNameRepeatedException.cs | 14 ++++++++++++++ src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByNameException.cs | 14 ++++++++++++++ src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByTypeException.cs | 14 ++++++++++++++ src/Pole.Sagas/Core/IActivity.cs | 13 ------------- src/Pole.Sagas/Core/ISaga.cs | 5 ++++- src/Pole.Sagas/Core/ISagaFactory.cs | 11 ----------- src/Pole.Sagas/Core/PoleSagasOption.cs | 12 ++++++++++++ src/Pole.Sagas/Core/Saga.cs | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- src/Pole.Sagas/Core/SagaFactory.cs | 26 +++++++++++++++++--------- src/Pole.Sagas/Core/SagaResult.cs | 20 ++++++++++++++++++++ src/Pole.Sagas/Pole.Sagas.csproj | 1 - src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs | 29 +++++++++++++++++++++++++++++ 35 files changed, 731 insertions(+), 48 deletions(-) create mode 100644 samples/apis/SagasTest.Api/Controllers/ValuesController.cs create mode 100644 samples/apis/SagasTest.Api/Program.cs create mode 100644 samples/apis/SagasTest.Api/Properties/launchSettings.json create mode 100644 samples/apis/SagasTest.Api/SagasTest.Api.csproj create mode 100644 samples/apis/SagasTest.Api/Startup.cs create mode 100644 samples/apis/SagasTest.Api/appsettings.Development.json create mode 100644 samples/apis/SagasTest.Api/appsettings.json create mode 100644 src/Pole.Sagas/Core/Abstraction/IActivity.cs create mode 100644 src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs create mode 100644 src/Pole.Sagas/Core/Abstraction/IEventSender.cs create mode 100644 src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs create mode 100644 src/Pole.Sagas/Core/ActivityCompensateResult.cs create mode 100644 src/Pole.Sagas/Core/ActivityFinder.cs create mode 100644 src/Pole.Sagas/Core/ActivityStatus.cs create mode 100644 src/Pole.Sagas/Core/ActivityWapper.cs create mode 100644 src/Pole.Sagas/Core/EventSender.cs create mode 100644 src/Pole.Sagas/Core/Exceptions/ActivityNameIrregularException.cs create mode 100644 src/Pole.Sagas/Core/Exceptions/ActivityNameRepeatedException.cs create mode 100644 src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByNameException.cs create mode 100644 src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByTypeException.cs delete mode 100644 src/Pole.Sagas/Core/IActivity.cs delete mode 100644 src/Pole.Sagas/Core/ISagaFactory.cs create mode 100644 src/Pole.Sagas/Core/PoleSagasOption.cs create mode 100644 src/Pole.Sagas/Core/SagaResult.cs create mode 100644 src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs diff --git a/Pole.sln b/Pole.sln index e623eff..4ffabfb 100644 --- a/Pole.sln +++ b/Pole.sln @@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas", "src\Pole.Saga EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Storage.PostgreSql", "src\Pole.Sagas.Storage.PostgreSql\Pole.Sagas.Storage.PostgreSql.csproj", "{9505BDFC-395B-4257-AEB3-2B44611147A4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -105,6 +107,10 @@ Global {9505BDFC-395B-4257-AEB3-2B44611147A4}.Debug|Any CPU.Build.0 = Debug|Any CPU {9505BDFC-395B-4257-AEB3-2B44611147A4}.Release|Any CPU.ActiveCfg = Release|Any CPU {9505BDFC-395B-4257-AEB3-2B44611147A4}.Release|Any CPU.Build.0 = Release|Any CPU + {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -127,6 +133,7 @@ Global {FB3D2F52-123A-4606-B682-9159BD7913AE} = {655E719B-4A3E-467C-A541-E0770AB81DE1} {1F06D877-E4EC-4908-9057-38EDCE5E54E6} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/samples/apis/Backet.Api/Program.cs b/samples/apis/Backet.Api/Program.cs index 8f4c69e..f75fb39 100644 --- a/samples/apis/Backet.Api/Program.cs +++ b/samples/apis/Backet.Api/Program.cs @@ -39,10 +39,10 @@ namespace Backet.Api webBuilder.UseStartup(); webBuilder.UseKestrel(option => { - option.ListenAnyIP(81, config => - { - config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1; - }); + //option.ListenAnyIP(81, config => + //{ + // config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1; + //}); option.ListenAnyIP(82, config => { config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2; diff --git a/samples/apis/SagasTest.Api/Controllers/ValuesController.cs b/samples/apis/SagasTest.Api/Controllers/ValuesController.cs new file mode 100644 index 0000000..0e31cf8 --- /dev/null +++ b/samples/apis/SagasTest.Api/Controllers/ValuesController.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Mvc; + +namespace SagasTest.Api.Controllers +{ + [Route("api/[controller]")] + [ApiController] + public class ValuesController : ControllerBase + { + // GET api/values + [HttpGet] + public ActionResult> Get() + { + return new string[] { "value1", "value2" }; + } + + // GET api/values/5 + [HttpGet("{id}")] + public ActionResult Get(int id) + { + return "value"; + } + + // POST api/values + [HttpPost] + public void Post([FromBody] string value) + { + } + + // PUT api/values/5 + [HttpPut("{id}")] + public void Put(int id, [FromBody] string value) + { + } + + // DELETE api/values/5 + [HttpDelete("{id}")] + public void Delete(int id) + { + } + } +} diff --git a/samples/apis/SagasTest.Api/Program.cs b/samples/apis/SagasTest.Api/Program.cs new file mode 100644 index 0000000..136e58c --- /dev/null +++ b/samples/apis/SagasTest.Api/Program.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +namespace SagasTest.Api +{ + public class Program + { + public static void Main(string[] args) + { + CreateWebHostBuilder(args).Build().Run(); + } + + public static IWebHostBuilder CreateWebHostBuilder(string[] args) => + WebHost.CreateDefaultBuilder(args) + .UseStartup(); + } +} diff --git a/samples/apis/SagasTest.Api/Properties/launchSettings.json b/samples/apis/SagasTest.Api/Properties/launchSettings.json new file mode 100644 index 0000000..926fbc1 --- /dev/null +++ b/samples/apis/SagasTest.Api/Properties/launchSettings.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:63500", + "sslPort": 44362 + } + }, + "profiles": { + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "launchUrl": "api/values", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "SagasTest.Api": { + "commandName": "Project", + "launchBrowser": true, + "launchUrl": "api/values", + "applicationUrl": "https://localhost:5001;http://localhost:5000", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} \ No newline at end of file diff --git a/samples/apis/SagasTest.Api/SagasTest.Api.csproj b/samples/apis/SagasTest.Api/SagasTest.Api.csproj new file mode 100644 index 0000000..6b5a7cc --- /dev/null +++ b/samples/apis/SagasTest.Api/SagasTest.Api.csproj @@ -0,0 +1,18 @@ + + + + netcoreapp3.1 + InProcess + + + + + + + + + + + + + diff --git a/samples/apis/SagasTest.Api/Startup.cs b/samples/apis/SagasTest.Api/Startup.cs new file mode 100644 index 0000000..1c1142f --- /dev/null +++ b/samples/apis/SagasTest.Api/Startup.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.HttpsPolicy; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace SagasTest.Api +{ + public class Startup + { + private IConfiguration Configuration { get; } + private IWebHostEnvironment Environment { get; } + public Startup(IConfiguration configuration, IWebHostEnvironment env) + { + Configuration = configuration; + Environment = env; + } + + // This method gets called by the runtime. Use this method to add services to the container. + public void ConfigureServices(IServiceCollection services) + { + services.AddControllers(); + services.AddPole(config => + { + config.AddSagas(); + }); + } + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + public void Configure(IApplicationBuilder app) + { + if (Environment.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + app.UsePole(); + app.UseRouting(); + + + app.UseEndpoints(endpoints => + { + endpoints.MapDefaultControllerRoute(); + }); + } + } +} diff --git a/samples/apis/SagasTest.Api/appsettings.Development.json b/samples/apis/SagasTest.Api/appsettings.Development.json new file mode 100644 index 0000000..e203e94 --- /dev/null +++ b/samples/apis/SagasTest.Api/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "System": "Information", + "Microsoft": "Information" + } + } +} diff --git a/samples/apis/SagasTest.Api/appsettings.json b/samples/apis/SagasTest.Api/appsettings.json new file mode 100644 index 0000000..def9159 --- /dev/null +++ b/samples/apis/SagasTest.Api/appsettings.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs index 54db6d7..d12faec 100644 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -68,7 +68,8 @@ namespace Pole.Core.Processor var bytes = bytesTransport.GetBytes(); if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount) { - pendingMessage.ExpiresAt = DateTime.UtcNow; + pendingMessage.StatusName = nameof(EventStatus.Failed); + continue; } pendingMessage.Retries++; var targetName = producerContainer.GetTargetName(pendingMessage.Name); diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index ad518b2..92d562a 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -143,7 +143,7 @@ namespace Pole.EventBus.RabbitMQ { retryTimes++; ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes.ToString(); - ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException?.Message + exception.StackTrace ?? exception.Message + exception.StackTrace; + ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => { using var channel = Client.PullChannel(); diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index ed76495..5759e2e 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -79,7 +79,7 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN { var fourMinAgo = DateTime.UtcNow.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.Pending}') for update skip locked LIMIT 200;"; + $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND \"StatusName\"='{EventStatus.Pending}' for update skip locked LIMIT 200;"; var result = new List(); using var connection = new NpgsqlConnection(options.ConnectionString); diff --git a/src/Pole.Sagas/Core/Abstraction/IActivity.cs b/src/Pole.Sagas/Core/Abstraction/IActivity.cs new file mode 100644 index 0000000..7df9804 --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/IActivity.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface IActivity + { + Task Execute(TData data); + Task Compensate(TData data); + } +} diff --git a/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs b/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs new file mode 100644 index 0000000..ca724c4 --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface IActivityFinder + { + Type FindType(string name); + string GetName(Type type); + } +} diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs new file mode 100644 index 0000000..85b9a70 --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -0,0 +1,17 @@ +using System; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface IEventSender + { + Task SagaStarted(string sagaId, string serviceName); + Task SagaEnded(string sagaId,DateTime ExpiresAt); + Task ActivityStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent); + Task ActivityRetried(string activityId, string status, int retries,string resultContent); + Task ActivityExecuteAborted(string activityId,string resultContent,string errors); + Task ActivityCompensateAborted(string activityId,string sagaId,string errors); + Task ActivityEnded(string activityId, string resultContent); + Task ActivityCompensated(string activityId); + } +} diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs b/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs new file mode 100644 index 0000000..e02ac24 --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface ISagaFactory + { + ISaga CreateSaga(); + } +} diff --git a/src/Pole.Sagas/Core/ActivityCompensateResult.cs b/src/Pole.Sagas/Core/ActivityCompensateResult.cs new file mode 100644 index 0000000..611b60c --- /dev/null +++ b/src/Pole.Sagas/Core/ActivityCompensateResult.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public class ActivityCompensateResult + { + /// + /// If not success , this activity will be aborted , and current saga will compensate all previous activities + /// + public bool IsSuccess { get; set; } + public string Message { get; set; } + } +} diff --git a/src/Pole.Sagas/Core/ActivityExecuteResult.cs b/src/Pole.Sagas/Core/ActivityExecuteResult.cs index 93f7b76..e758284 100644 --- a/src/Pole.Sagas/Core/ActivityExecuteResult.cs +++ b/src/Pole.Sagas/Core/ActivityExecuteResult.cs @@ -7,6 +7,18 @@ namespace Pole.Sagas.Core public class ActivityExecuteResult { public bool IsSuccess { get; set; } - public string Message { get; set; } + public object Result { get; set; } + public string Errors { get; set; } + + public static implicit operator SagaResult(ActivityExecuteResult activity) + { + return new SagaResult + { + IsSuccess = activity.IsSuccess, + Result = default(object), + HasException = !string.IsNullOrEmpty(activity.Errors), + ExceptionMessages = activity.Errors + }; + } } } diff --git a/src/Pole.Sagas/Core/ActivityFinder.cs b/src/Pole.Sagas/Core/ActivityFinder.cs new file mode 100644 index 0000000..5ed6380 --- /dev/null +++ b/src/Pole.Sagas/Core/ActivityFinder.cs @@ -0,0 +1,56 @@ +using Microsoft.Extensions.Logging; +using Pole.Core.Utils; +using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Core.Exceptions; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Pole.Sagas.Core +{ + class ActivityFinder : IActivityFinder + { + private readonly ConcurrentDictionary nameDict = new ConcurrentDictionary(); + private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); + readonly ILogger logger; + public ActivityFinder(ILogger logger) + { + this.logger = logger; + var baseActivityType = typeof(IActivity<>); + foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) + { + foreach (var type in assembly.GetTypes().Where(m => m.IsGenericType && m.GetGenericTypeDefinition() == baseActivityType && !m.IsAbstract)) + { + if (!type.FullName.EndsWith("Activity")) + { + throw new ActivityNameIrregularException(type); + } + var activityName = type.Name.Substring(0, type.Name.Length - "Activity".Length); + typeDict.TryAdd(type, activityName); + + if (!nameDict.TryAdd(activityName, type)) + { + throw new ActivityNameRepeatedException(activityName); + } + } + } + } + public Type FindType(string name) + { + if (nameDict.TryGetValue(name, out Type type)) + { + return type; + } + throw new ActivityNotFoundByNameException(name); + } + + public string GetName(Type type) + { + if (!typeDict.TryGetValue(type, out var value)) + throw new ActivityNotFoundByTypeException(type); + return value; + } + } +} diff --git a/src/Pole.Sagas/Core/ActivityStatus.cs b/src/Pole.Sagas/Core/ActivityStatus.cs new file mode 100644 index 0000000..3c7c567 --- /dev/null +++ b/src/Pole.Sagas/Core/ActivityStatus.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public enum ActivityStatus + { + NotStarted, + Executed, + Compensated, + ExecuteAborted, + CompensateAborted, + } +} diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs new file mode 100644 index 0000000..c0fc712 --- /dev/null +++ b/src/Pole.Sagas/Core/ActivityWapper.cs @@ -0,0 +1,52 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core +{ + public class ActivityWapper + { + public string Id { get; set; } + public Type ActivityType { get; set; } + public Type ActivityDataType { get; set; } + public object DataObj { get; set; } + public int Order { get; set; } + public ActivityStatus ActivityState { get; set; } + public IServiceProvider ServiceProvider { get; set; } + public DateTime TimeOut { get; set; } + public Task InvokeExecute() + { + var activityObjParams = Expression.Parameter(typeof(object), "activity"); + var activityParams = Expression.Convert(activityObjParams, ActivityType); + var dataObjParams = Expression.Parameter(typeof(object), "data"); + var dataParams = Expression.Convert(dataObjParams, ActivityDataType); + var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType }); + var body = Expression.Call(activityObjParams, method, activityObjParams, dataParams); + var func = Expression.Lambda>>(body, activityObjParams, dataObjParams).Compile(); + + using (var scope = ServiceProvider.CreateScope()) + { + var activity = scope.ServiceProvider.GetRequiredService(ActivityType); + return func(activity, DataObj); + } + } + public Task InvokeCompensate() + { + var activityObjParams = Expression.Parameter(typeof(object), "activity"); + var activityParams = Expression.Convert(activityObjParams, ActivityType); + var dataObjParams = Expression.Parameter(typeof(object), "data"); + var dataParams = Expression.Convert(dataObjParams, ActivityDataType); + var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType }); + var body = Expression.Call(activityObjParams, method, activityObjParams, dataParams); + var func = Expression.Lambda>(body, activityObjParams, dataObjParams).Compile(); + using (var scope = ServiceProvider.CreateScope()) + { + var activity = scope.ServiceProvider.GetRequiredService(ActivityType); + return func(activity, DataObj); + } + } + } +} diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs new file mode 100644 index 0000000..6a6d401 --- /dev/null +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -0,0 +1,51 @@ +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core +{ + class EventSender : IEventSender + { + public Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + { + return Task.CompletedTask; + } + + public Task ActivityCompensated(string activityId) + { + return Task.CompletedTask; + } + + public Task ActivityEnded(string activityId, string resultContent) + { + return Task.CompletedTask; + } + + public Task ActivityExecuteAborted(string activityId, string resultContent, string errors) + { + return Task.CompletedTask; + } + + public Task ActivityRetried(string activityId, string status, int retries, string resultContent) + { + return Task.CompletedTask; + } + + public Task ActivityStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent) + { + return Task.CompletedTask; + } + + public Task SagaEnded(string sagaId, DateTime ExpiresAt) + { + return Task.CompletedTask; + } + + public Task SagaStarted(string sagaId, string serviceName) + { + return Task.CompletedTask; + } + } +} diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityNameIrregularException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityNameIrregularException.cs new file mode 100644 index 0000000..b953e1d --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/ActivityNameIrregularException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class ActivityNameIrregularException:Exception + { + public ActivityNameIrregularException(Type activityType):base($"Activity : {activityType.FullName} irregular naming") + { + + } + } +} diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityNameRepeatedException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityNameRepeatedException.cs new file mode 100644 index 0000000..ee09151 --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/ActivityNameRepeatedException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class ActivityNameRepeatedException : Exception + { + public ActivityNameRepeatedException(string name):base($"Activity :{name} already exists") + { + + } + } +} diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByNameException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByNameException.cs new file mode 100644 index 0000000..8c190f0 --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByNameException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class ActivityNotFoundByNameException:Exception + { + public ActivityNotFoundByNameException(string activityName) : base($"Activity not found by name: {activityName} ") + { + + } + } +} diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByTypeException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByTypeException.cs new file mode 100644 index 0000000..2dd4b24 --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundByTypeException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class ActivityNotFoundByTypeException : Exception + { + public ActivityNotFoundByTypeException(Type activityType) : base($"Activity not found by type: {activityType.FullName}") + { + + } + } +} diff --git a/src/Pole.Sagas/Core/IActivity.cs b/src/Pole.Sagas/Core/IActivity.cs deleted file mode 100644 index b186b54..0000000 --- a/src/Pole.Sagas/Core/IActivity.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core -{ - public interface IActivity - { - Task Execute(TData data); - Task Compensate(TData data); - } -} diff --git a/src/Pole.Sagas/Core/ISaga.cs b/src/Pole.Sagas/Core/ISaga.cs index 25a52be..eac3fa1 100644 --- a/src/Pole.Sagas/Core/ISaga.cs +++ b/src/Pole.Sagas/Core/ISaga.cs @@ -1,11 +1,14 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading.Tasks; namespace Pole.Sagas.Core { public interface ISaga { - void AddActivity(IActivity activity); + string Id { get; } + void AddActivity(string activityName, TData data); + Task GetResult(); } } diff --git a/src/Pole.Sagas/Core/ISagaFactory.cs b/src/Pole.Sagas/Core/ISagaFactory.cs deleted file mode 100644 index 2ad9dd9..0000000 --- a/src/Pole.Sagas/Core/ISagaFactory.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Core -{ - public interface ISagaFactory - { - TSaga CreateSaga(TimeSpan timeOut) where TSaga : ISaga; - } -} diff --git a/src/Pole.Sagas/Core/PoleSagasOption.cs b/src/Pole.Sagas/Core/PoleSagasOption.cs new file mode 100644 index 0000000..05cf12f --- /dev/null +++ b/src/Pole.Sagas/Core/PoleSagasOption.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public class PoleSagasOption + { + public string ServiceName { get; set; } + public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; + } +} diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 19f02ad..31f3d2e 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -1,15 +1,144 @@ -using System; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Core.Exceptions; +using System; using System.Collections.Generic; using System.Text; +using System.Threading.Tasks; namespace Pole.Sagas.Core { - class Saga : ISaga + public class Saga : ISaga { - private System.Collections.Concurrent.ConcurrentQueue<> - public void AddActivity(IActivity activity) + private List activities = new List(); + private IServiceProvider serviceProvider; + private IEventSender eventSender; + private ISnowflakeIdGenerator snowflakeIdGenerator; + private PoleSagasOption poleSagasOption; + private int _currentMaxOrder = 0; + private int _currentExecuteOrder = 0; + private int _currentCompensateOrder = 0; + private ISerializer serializer; + public string Id { get; } + + public Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer) + { + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption; + this.serializer = serializer; + Id = snowflakeIdGenerator.NextId(); + } + + public void AddActivity(string activityName, TData data) + { + _currentMaxOrder++; + ActivityWapper activityWapper = new ActivityWapper + { + ActivityDataType = typeof(TData), + ActivityState = ActivityStatus.NotStarted, + ActivityType = data.GetType(), + DataObj = data, + Order = _currentMaxOrder, + ServiceProvider = serviceProvider + }; + activities.Add(activityWapper); + } + + public async Task GetResult() + { + await eventSender.SagaStarted(Id, poleSagasOption.ServiceName); + + var executeActivity = GetNextExecuteActivity(); + if (executeActivity == null) + { + var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds); + await eventSender.SagaEnded(Id, expiresAt); + return SagaResult.SuccessResult; + } + var result = await RecursiveExecuteActivity(executeActivity); + return result; + } + + private ActivityWapper GetNextExecuteActivity() + { + if (_currentExecuteOrder == _currentMaxOrder) + { + return null; + } + _currentExecuteOrder++; + return activities[_currentExecuteOrder]; + } + private ActivityWapper GetNextCompensateActivity() + { + _currentCompensateOrder--; + if (_currentExecuteOrder == 0) + { + return null; + } + + return activities[_currentCompensateOrder]; + } + private async Task RecursiveCompensateActivity(ActivityWapper activityWapper) + { + var activityId = activityWapper.Id; + try + { + //var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType); + //await eventSender.ActivityStarted(activityId, Id, activityWapper.TimeOut, jsonContent); + await activityWapper.InvokeCompensate(); + await eventSender.ActivityCompensated(activityId); + var compensateActivity = GetNextCompensateActivity(); + if (compensateActivity == null) + { + return; + } + await RecursiveCompensateActivity(compensateActivity); + } + catch (Exception exception) + { + await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace); + } + } + private async Task RecursiveExecuteActivity(ActivityWapper activityWapper) { - throw new NotImplementedException(); + var activityId = snowflakeIdGenerator.NextId(); + activityWapper.Id = activityId; + try + { + var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType); + await eventSender.ActivityStarted(activityId, Id, activityWapper.TimeOut, jsonContent); + var result = await activityWapper.InvokeExecute(); + if (!result.IsSuccess) + { + await eventSender.ActivityExecuteAborted(activityId, serializer.Serialize(result.Result), string.Empty); + _currentCompensateOrder = _currentExecuteOrder; + var compensateActivity = GetNextCompensateActivity(); + await RecursiveCompensateActivity(compensateActivity); + return result; + } + await eventSender.ActivityEnded(activityId, string.Empty); + var executeActivity = GetNextExecuteActivity(); + if (executeActivity == null) + { + return result; + } + else + { + return await RecursiveExecuteActivity(executeActivity); + } + } + catch (Exception exception) + { + var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; + await eventSender.ActivityExecuteAborted(activityId, string.Empty, errors); + return new ActivityExecuteResult + { + IsSuccess = false, + Errors = errors + }; + } } } } diff --git a/src/Pole.Sagas/Core/SagaFactory.cs b/src/Pole.Sagas/Core/SagaFactory.cs index 834de0b..54fc460 100644 --- a/src/Pole.Sagas/Core/SagaFactory.cs +++ b/src/Pole.Sagas/Core/SagaFactory.cs @@ -1,4 +1,7 @@ -using Pole.Core.Utils.Abstraction; +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Text; @@ -7,18 +10,23 @@ namespace Pole.Sagas.Core { class SagaFactory : ISagaFactory { - private readonly ISnowflakeIdGenerator _snowflakeIdGenerator; - public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator) + private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + private readonly IServiceProvider serviceProvider; + private readonly IEventSender eventSender; + private readonly PoleSagasOption poleSagasOption; + private readonly ISerializer serializer; + public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, IOptions poleSagasOption, ISerializer serializer) { - _snowflakeIdGenerator = snowflakeIdGenerator; + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption.Value; + this.serializer = serializer; } - public TSaga CreateSaga(TimeSpan timeOut) where TSaga : ISaga + public ISaga CreateSaga() { - var name = typeof(TSaga).FullName; - var SagaFlow = SagasCollection.Get(name); - var newId = _snowflakeIdGenerator.NextId(); - throw new NotImplementedException(); + return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer); } } } diff --git a/src/Pole.Sagas/Core/SagaResult.cs b/src/Pole.Sagas/Core/SagaResult.cs new file mode 100644 index 0000000..cd12fad --- /dev/null +++ b/src/Pole.Sagas/Core/SagaResult.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public class SagaResult + { + public bool IsSuccess { get; set; } + public bool HasException { get; set; } + public object Result { get; set; } + public string ExceptionMessages { get; set; } = string.Empty; + + public static SagaResult SuccessResult = new SagaResult + { + IsSuccess = true, + Result = default + }; + } +} diff --git a/src/Pole.Sagas/Pole.Sagas.csproj b/src/Pole.Sagas/Pole.Sagas.csproj index 167d5f2..7cfb216 100644 --- a/src/Pole.Sagas/Pole.Sagas.csproj +++ b/src/Pole.Sagas/Pole.Sagas.csproj @@ -5,7 +5,6 @@ - diff --git a/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs new file mode 100644 index 0000000..26f2fb1 --- /dev/null +++ b/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using Pole.Core; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class PoleSagaServiceCollectionExtensions + { + public static void AddSagas(this StartupConfig startupOption, Action rabbitConfigAction) + { + startupOption.Services.Configure(rabbitConfigAction); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + } + public static void AddSagas(this StartupConfig startupOption) + { + Action action = option => { }; + startupOption.Services.Configure(action); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + } + } +} -- libgit2 0.25.0