From 92eeb084da9a32d9d0493dc1244a66a5093aab2d Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Wed, 11 Mar 2020 16:50:54 +0800 Subject: [PATCH] 完善功能 --- samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs | 3 ++- samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs | 3 ++- samples/apis/SagasTest.Api/Controllers/SagasTestController.cs | 1 + src/Pole.Sagas.Client/Abstraction/IActivity.cs | 15 +++++++++++++++ src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs | 12 ++++++++++++ src/Pole.Sagas.Client/Abstraction/IEventSender.cs | 19 +++++++++++++++++++ src/Pole.Sagas.Client/Abstraction/ISaga.cs | 15 +++++++++++++++ src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs | 11 +++++++++++ src/Pole.Sagas.Client/ActivityFinder.cs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Client/ActivityWapper.cs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Client/EventSender.cs | 151 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs | 13 +++++++++++-- src/Pole.Sagas.Client/PoleSagasOption.cs | 15 +++++++++++++++ src/Pole.Sagas.Client/Saga.cs | 242 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Client/SagaFactory.cs | 36 ++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Client/SagaRestorer.cs | 39 +++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Server/ISagasBuffer.cs | 2 +- src/Pole.Sagas.Server/PoleSagasServerOption.cs | 3 ++- src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs | 3 ++- src/Pole.Sagas.Server/SagasBuffer.cs | 2 +- src/Pole.Sagas.Server/Services/SagaService.cs | 68 +++++++++++++++++++++++++++++++++++++++----------------------------- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs | 7 ++++--- src/Pole.Sagas/Core/Abstraction/IActivity.cs | 14 -------------- src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs | 12 ------------ src/Pole.Sagas/Core/Abstraction/IEventSender.cs | 19 ------------------- src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs | 11 ----------- src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs | 10 ---------- src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs | 26 ++++++++++++++++++++++++++ src/Pole.Sagas/Core/ActivityEntity.cs | 1 + src/Pole.Sagas/Core/ActivityFinder.cs | 57 --------------------------------------------------------- src/Pole.Sagas/Core/ActivityWapper.cs | 61 ------------------------------------------------------------- src/Pole.Sagas/Core/EventSender.cs | 150 ------------------------------------------------------------------------------------------------------------------------------------------------------ src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs | 2 +- src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs | 14 ++++++++++++++ src/Pole.Sagas/Core/ISaga.cs | 14 -------------- src/Pole.Sagas/Core/ISagaStorage.cs | 26 -------------------------- src/Pole.Sagas/Core/PoleSagasOption.cs | 14 -------------- src/Pole.Sagas/Core/Saga.cs | 209 ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/Pole.Sagas/Core/SagaEntity.cs | 2 +- src/Pole.Sagas/Core/SagaFactory.cs | 38 -------------------------------------- src/Pole.Sagas/Protos/saga.proto | 5 ++--- 51 files changed, 902 insertions(+), 689 deletions(-) create mode 100644 src/Pole.Sagas.Client/Abstraction/IActivity.cs create mode 100644 src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs create mode 100644 src/Pole.Sagas.Client/Abstraction/IEventSender.cs create mode 100644 src/Pole.Sagas.Client/Abstraction/ISaga.cs create mode 100644 src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs create mode 100644 src/Pole.Sagas.Client/ActivityFinder.cs create mode 100644 src/Pole.Sagas.Client/ActivityWapper.cs create mode 100644 src/Pole.Sagas.Client/EventSender.cs create mode 100644 src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs create mode 100644 src/Pole.Sagas.Client/PoleSagasOption.cs create mode 100644 src/Pole.Sagas.Client/Saga.cs create mode 100644 src/Pole.Sagas.Client/SagaFactory.cs create mode 100644 src/Pole.Sagas.Client/SagaRestorer.cs delete mode 100644 src/Pole.Sagas/Core/Abstraction/IActivity.cs delete mode 100644 src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs delete mode 100644 src/Pole.Sagas/Core/Abstraction/IEventSender.cs delete mode 100644 src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs delete mode 100644 src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs create mode 100644 src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs delete mode 100644 src/Pole.Sagas/Core/ActivityFinder.cs delete mode 100644 src/Pole.Sagas/Core/ActivityWapper.cs delete mode 100644 src/Pole.Sagas/Core/EventSender.cs create mode 100644 src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs delete mode 100644 src/Pole.Sagas/Core/ISaga.cs delete mode 100644 src/Pole.Sagas/Core/ISagaStorage.cs delete mode 100644 src/Pole.Sagas/Core/PoleSagasOption.cs delete mode 100644 src/Pole.Sagas/Core/Saga.cs delete mode 100644 src/Pole.Sagas/Core/SagaFactory.cs diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs index 6d164d8..d5f11fa 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs index 18797dc..8defe62 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs index ec2930d..10f1c3d 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs index 6f446a4..66dfd26 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs index d75dddf..5786bc3 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs index 7e3610a..385d74d 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs index ef68f52..baad6cd 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs index 0da07ab..8b1a86f 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs index e071511..50e2a59 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs index 74f1470..2bbc783 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs index af083f7..84bed97 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs b/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs index 7912332..af61da5 100644 --- a/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs +++ b/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; +using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core.Abstraction; using SagasTest.Api.Activities; diff --git a/src/Pole.Sagas.Client/Abstraction/IActivity.cs b/src/Pole.Sagas.Client/Abstraction/IActivity.cs new file mode 100644 index 0000000..c5995e8 --- /dev/null +++ b/src/Pole.Sagas.Client/Abstraction/IActivity.cs @@ -0,0 +1,15 @@ +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Client.Abstraction +{ + public interface IActivity + { + Task Execute(TData data ,CancellationToken cancellationToken); + Task Compensate(TData data, CancellationToken cancellationToken); + } +} diff --git a/src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs b/src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs new file mode 100644 index 0000000..c742d9c --- /dev/null +++ b/src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Client.Abstraction +{ + public interface IActivityFinder + { + Type FindType(string name); + string GetName(Type type); + } +} diff --git a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs new file mode 100644 index 0000000..31befd7 --- /dev/null +++ b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs @@ -0,0 +1,19 @@ +using System; +using System.Threading.Tasks; + +namespace Pole.Sagas.Client.Abstraction +{ + public interface IEventSender + { + Task SagaStarted(string sagaId, string serviceName, DateTime addTime); + Task SagaEnded(string sagaId, DateTime ExpiresAt); + Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); + Task ActivityExecuteAborted(string activityId); + Task ActivityCompensateAborted(string activityId, string sagaId, string errors); + Task ActivityExecuted(string activityId); + Task ActivityCompensated(string activityId); + Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime); + Task ActivityRevoked(string activityId); + Task ActivityCompensating(string activityId,int compensateTimes); + } +} diff --git a/src/Pole.Sagas.Client/Abstraction/ISaga.cs b/src/Pole.Sagas.Client/Abstraction/ISaga.cs new file mode 100644 index 0000000..8f99545 --- /dev/null +++ b/src/Pole.Sagas.Client/Abstraction/ISaga.cs @@ -0,0 +1,15 @@ +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Client.Abstraction +{ + public interface ISaga + { + string Id { get; } + void AddActivity(string activityName, object data,int timeOutSeconds=2); + Task GetResult(); + } +} diff --git a/src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs b/src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs new file mode 100644 index 0000000..3e10991 --- /dev/null +++ b/src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Client.Abstraction +{ + public interface ISagaFactory + { + ISaga CreateSaga(); + } +} diff --git a/src/Pole.Sagas.Client/ActivityFinder.cs b/src/Pole.Sagas.Client/ActivityFinder.cs new file mode 100644 index 0000000..4e5cbaf --- /dev/null +++ b/src/Pole.Sagas.Client/ActivityFinder.cs @@ -0,0 +1,58 @@ +using Microsoft.Extensions.Logging; +using Pole.Core.Utils; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Core.Exceptions; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Pole.Sagas.Client +{ + public class ActivityFinder : IActivityFinder + { + private readonly ConcurrentDictionary nameDict = new ConcurrentDictionary(); + private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); + readonly ILogger logger; + public ActivityFinder(ILogger logger) + { + this.logger = logger; + var baseActivityType = typeof(IActivity<>); + foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) + { + + foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract)) + { + if (!type.FullName.EndsWith("Activity")) + { + throw new ActivityNameIrregularException(type); + } + var activityName = type.Name.Substring(0, type.Name.Length - "Activity".Length); + typeDict.TryAdd(type, activityName); + + if (!nameDict.TryAdd(activityName, type)) + { + throw new ActivityNameRepeatedException(activityName); + } + } + } + } + public Type FindType(string name) + { + if (nameDict.TryGetValue(name, out Type type)) + { + return type; + } + throw new ActivityNotFoundByNameException(name); + } + + public string GetName(Type type) + { + if (!typeDict.TryGetValue(type, out var value)) + throw new ActivityNotFoundByTypeException(type); + return value; + } + } +} diff --git a/src/Pole.Sagas.Client/ActivityWapper.cs b/src/Pole.Sagas.Client/ActivityWapper.cs new file mode 100644 index 0000000..aaf6d1b --- /dev/null +++ b/src/Pole.Sagas.Client/ActivityWapper.cs @@ -0,0 +1,62 @@ +using Microsoft.Extensions.DependencyInjection; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Client +{ + public class ActivityWapper + { + public string Id { get; set; } + public string Name { get; set; } + public Type ActivityType { get; set; } + public Type ActivityDataType { get; set; } + public object DataObj { get; set; } + public int Order { get; set; } + public int ExecuteTimes { get; set; } + public int CompensateTimes { get; set; } + public ActivityStatus ActivityStatus { get; set; } + public IServiceProvider ServiceProvider { get; set; } + + public int TimeOutSeconds { get; set; } + public CancellationTokenSource CancellationTokenSource { get; set; } + public Task InvokeExecute() + { + var activityObjParams = Expression.Parameter(typeof(object), "activity"); + var activityParams = Expression.Convert(activityObjParams, ActivityType); + var dataObjParams = Expression.Parameter(typeof(object), "data"); + var dataParams = Expression.Convert(dataObjParams, ActivityDataType); + var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); + var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType, typeof(CancellationToken) }); + var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); + var func = Expression.Lambda>>(body, true, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); + + using (var scope = ServiceProvider.CreateScope()) + { + var activity = scope.ServiceProvider.GetRequiredService(ActivityType); + return func(activity, DataObj, CancellationTokenSource.Token); + } + } + public Task InvokeCompensate() + { + var activityObjParams = Expression.Parameter(typeof(object), "activity"); + var activityParams = Expression.Convert(activityObjParams, ActivityType); + var dataObjParams = Expression.Parameter(typeof(object), "data"); + var dataParams = Expression.Convert(dataObjParams, ActivityDataType); + var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); + var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType, typeof(CancellationToken) }); + var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); + var func = Expression.Lambda>(body, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); + + using (var scope = ServiceProvider.CreateScope()) + { + var activity = scope.ServiceProvider.GetRequiredService(ActivityType); + return func(activity, DataObj, CancellationTokenSource.Token); + } + } + } +} diff --git a/src/Pole.Sagas.Client/EventSender.cs b/src/Pole.Sagas.Client/EventSender.cs new file mode 100644 index 0000000..95bd85b --- /dev/null +++ b/src/Pole.Sagas.Client/EventSender.cs @@ -0,0 +1,151 @@ +using Grpc.Net.Client; +using Microsoft.Extensions.Options; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Core.Exceptions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using static Pole.Sagas.Server.Grpc.Saga; + +namespace Pole.Sagas.Client +{ + public class EventSender : IEventSender + { + private readonly SagaClient sagaClient; + public EventSender(SagaClient sagaClient) + { + this.sagaClient = sagaClient; + } + public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + { + var result = await sagaClient.ActivityCompensateAbortedAsync(new Server.Grpc.ActivityCompensateAbortedRequest + { + ActivityId = activityId, + Errors = errors, + SagaId = sagaId + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityCompensated(string activityId) + { + var result = await sagaClient.ActivityCompensatedAsync(new Server.Grpc.ActivityCompensatedRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityExecuted(string activityId) + { + var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityExecuteAborted(string activityId) + { + var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest + { + ActivityId = activityId + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes) + { + var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest + { + ActivityId = activityId, + ActivityName = activityName, + AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), + Order = order, + ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), + SagaId = sagaId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task SagaEnded(string sagaId, DateTime ExpiresAt) + { + var result = await sagaClient.SagaEndedAsync(new Server.Grpc.SagaEndedRequest + { + SagaId = sagaId, + ExpiresAt = ExpiresAt.ToString("yyyy-MM-dd HH:mm:ss.fff"), + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime) + { + var result = await sagaClient.SagaStartedAsync(new Server.Grpc.SagaStartedRequest + { + SagaId = sagaId, + ServiceName = serviceName, + AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime) + { + var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityRevoked(string activityId) + { + var result = await sagaClient.ActivityRevokedAsync(new Server.Grpc.ActivityRevokedRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + + public async Task ActivityCompensating(string activityId, int compensateTimes) + { + var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } + } +} diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs new file mode 100644 index 0000000..24a795b --- /dev/null +++ b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs @@ -0,0 +1,99 @@ +using Grpc.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using static Pole.Sagas.Server.Grpc.Saga; + +namespace Pole.Sagas.Client +{ + public class NotEndedSagasCompensateRetryBackgroundService : IHostedService + { + private readonly PoleSagasOption options; + private readonly SagaClient sagaClient; + private readonly SagaRestorer sagaRestorer; + public NotEndedSagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider) + { + this.options = options.Value; + this.sagaClient = sagaClient; + sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService(), serviceProvider, serviceProvider.GetRequiredService(), this.options, serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService()); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) + { + while (await stream.ResponseStream.MoveNext(cancellationToken)) + { + if (stream.ResponseStream.Current.IsSuccess) + { + var sagas = stream.ResponseStream.Current.Sagas.Select(m => + { + var result = new SagaEntity + { + Id = m.Id, + }; + result.ActivityEntities = m.Activities.Select(n => new ActivityEntity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = n.ParameterData.ToByteArray(), + SagaId = n.SagaId, + Status = n.Status + }).ToList(); + return result; + }).ToList(); + sagas.ForEach(async sagaEntity => + { + var saga = sagaRestorer.CreateSaga(sagaEntity); + await saga.Compensate(); + }); + } + } + //await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken)) + //{ + // if (getSagasResponse.IsSuccess) + // { + // var sagas = getSagasResponse.Sagas.Select(m => + // { + // var result = new SagaEntity + // { + // Id = m.Id, + // }; + // result.ActivityEntities = m.Activities.Select(n => new ActivityEntity + // { + // CompensateTimes = n.CompensateTimes, + // ExecuteTimes = n.ExecuteTimes, + // Id = n.Id, + // Name = n.Id, + // Order = n.Order, + // ParameterData = n.ParameterData.ToByteArray(), + // SagaId = n.SagaId, + // Status = n.Status + // }).ToList(); + // return result; + // }); + + // } + //} + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs index 3c053c8..61964d0 100644 --- a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs @@ -2,10 +2,13 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using Grpc.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Pole.Core; using Pole.Core.Utils; +using Pole.Sagas.Client; +using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Exceptions; @@ -21,14 +24,20 @@ namespace Microsoft.Extensions.DependencyInjection startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); - using(var provider = startupOption.Services.BuildServiceProvider()) + PoleSagasOption sagasOption = null; + using (var provider = startupOption.Services.BuildServiceProvider()) { - var sagasOption = provider.GetRequiredService>().Value; + sagasOption = provider.GetRequiredService>().Value; startupOption.Services.AddGrpcClient(o => { o.Address = new Uri(sagasOption.SagasServerHost); }); } + RegisterActivities(startupOption); + } + + private static void RegisterActivities(StartupConfig startupOption) + { var baseActivityType = typeof(IActivity<>); foreach (var assembly in AssemblyHelper.GetAssemblies()) { diff --git a/src/Pole.Sagas.Client/PoleSagasOption.cs b/src/Pole.Sagas.Client/PoleSagasOption.cs new file mode 100644 index 0000000..77a25d5 --- /dev/null +++ b/src/Pole.Sagas.Client/PoleSagasOption.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Client +{ + public class PoleSagasOption + { + public string ServiceName { get; set; } + public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20; + public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; + public int SagasTimeOutSeconds { get; set; } = 60; + public string SagasServerHost { get; set; } + } +} diff --git a/src/Pole.Sagas.Client/Saga.cs b/src/Pole.Sagas.Client/Saga.cs new file mode 100644 index 0000000..9110042 --- /dev/null +++ b/src/Pole.Sagas.Client/Saga.cs @@ -0,0 +1,242 @@ +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Core.Exceptions; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Client +{ + public class Saga : ISaga + { + private List activities = new List(); + private IServiceProvider serviceProvider; + private IEventSender eventSender; + private ISnowflakeIdGenerator snowflakeIdGenerator; + private IActivityFinder activityFinder; + private PoleSagasOption poleSagasOption; + private int CurrentMaxOrder + { + get { return activities.Count; } + } + /// + /// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义 + /// + private int currentExecuteOrder = 0; + /// + /// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义 + /// + private int currentCompensateOrder = -1; + private ISerializer serializer; + public string Id { get; } + + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) + { + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption; + this.serializer = serializer; + this.activityFinder = activityFinder; + Id = snowflakeIdGenerator.NextId(); + } + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, string id) + { + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption; + this.serializer = serializer; + this.activityFinder = activityFinder; + Id = id; + this.currentExecuteOrder = -1; + } + + public void AddActivity(string activityName, object data, int timeOutSeconds = 2) + { + var targetActivityType = activityFinder.FindType(activityName); + + var activityInterface = targetActivityType.GetInterfaces().FirstOrDefault(); + if (!activityInterface.IsGenericType) + { + throw new ActivityImplementIrregularException(activityName); + } + var dataType = activityInterface.GetGenericArguments()[0]; + ActivityWapper activityWapper = new ActivityWapper + { + Name = activityName, + ActivityDataType = dataType, + ActivityStatus = ActivityStatus.NotStarted, + ActivityType = targetActivityType, + DataObj = data, + Order = CurrentMaxOrder, + ServiceProvider = serviceProvider, + TimeOutSeconds = 2, + }; + activities.Add(activityWapper); + } + internal void AddActivity(string activityName, string activityStatus, object data, int order, int timeOutSeconds = 2) + { + var targetActivityType = activityFinder.FindType(activityName); + + var activityInterface = targetActivityType.GetInterfaces().FirstOrDefault(); + if (!activityInterface.IsGenericType) + { + throw new ActivityNotFoundWhenCompensateRetryException(activityName); + } + var dataType = activityInterface.GetGenericArguments()[0]; + ActivityWapper activityWapper = new ActivityWapper + { + Name = activityName, + ActivityDataType = dataType, + ActivityStatus = (ActivityStatus)Enum.Parse(typeof(ActivityStatus), activityStatus), + ActivityType = targetActivityType, + DataObj = data, + Order = order, + ServiceProvider = serviceProvider, + TimeOutSeconds = 2, + }; + activities.Add(activityWapper); + } + + public async Task GetResult() + { + await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow); + var executeActivity = GetNextExecuteActivity(); + if (executeActivity == null) + { + var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds); + await eventSender.SagaEnded(Id, expiresAt); + return SagaResult.SuccessResult; + } + var result = await RecursiveExecuteActivity(executeActivity); + return result; + } + internal async Task Compensate() + { + this.currentCompensateOrder = CurrentMaxOrder+1; + var compensateActivity = GetNextCompensateActivity(); + if (compensateActivity == null) + { + return ; + } + await RecursiveCompensateActivity(compensateActivity); + } + + private ActivityWapper GetNextExecuteActivity() + { + if (currentExecuteOrder == CurrentMaxOrder) + { + return null; + } + currentExecuteOrder++; + return activities[currentExecuteOrder - 1]; + } + private ActivityWapper GetNextCompensateActivity() + { + currentCompensateOrder--; + if (currentCompensateOrder == 0) + { + return null; + } + + return activities[currentCompensateOrder - 1]; + } + private async Task RecursiveCompensateActivity(ActivityWapper activityWapper) + { + var activityId = activityWapper.Id; + try + { + await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes); + await activityWapper.InvokeCompensate(); + await eventSender.ActivityCompensated(activityId); + var compensateActivity = GetNextCompensateActivity(); + if (compensateActivity == null) + { + return; + } + await RecursiveCompensateActivity(compensateActivity); + } + catch (Exception exception) + { + await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace); + } + } + private async Task RecursiveExecuteActivity(ActivityWapper activityWapper) + { + var activityId = snowflakeIdGenerator.NextId(); + activityWapper.Id = activityId; + activityWapper.ExecuteTimes++; + activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); + try + { + var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); + await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes); + var result = await activityWapper.InvokeExecute(); + if (!result.IsSuccess) + { + await eventSender.ActivityRevoked(activityId); + await CompensateActivity(result, currentExecuteOrder); + return result; + } + await eventSender.ActivityExecuted(activityId); + var executeActivity = GetNextExecuteActivity(); + if (executeActivity == null) + { + return result; + } + else + { + return await RecursiveExecuteActivity(executeActivity); + } + } + catch (Exception exception) + { + if (activityWapper.CancellationTokenSource.Token.IsCancellationRequested) + { + var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; + var result = new ActivityExecuteResult + { + IsSuccess = false, + Errors = errors + }; + var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); + await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent, DateTime.UtcNow); + // 超时的时候 需要首先补偿这个超时的操作 + return await CompensateActivity(result, currentExecuteOrder + 1); + } + else + { + var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; + var result = new ActivityExecuteResult + { + IsSuccess = false, + Errors = errors + }; + await eventSender.ActivityExecuteAborted(activityId); + // 出错的时候 需要首先补偿这个出错的操作 + return await CompensateActivity(result, currentExecuteOrder + 1); + } + } + } + + private async Task CompensateActivity(ActivityExecuteResult result, int currentCompensateOrder) + { + this.currentCompensateOrder = currentCompensateOrder; + currentExecuteOrder = -1; + var compensateActivity = GetNextCompensateActivity(); + if (compensateActivity == null) + { + return result; + } + await RecursiveCompensateActivity(compensateActivity); + return result; + } + } +} diff --git a/src/Pole.Sagas.Client/SagaFactory.cs b/src/Pole.Sagas.Client/SagaFactory.cs new file mode 100644 index 0000000..1cf6035 --- /dev/null +++ b/src/Pole.Sagas.Client/SagaFactory.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Client +{ + public class SagaFactory : ISagaFactory + { + private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + private readonly IServiceProvider serviceProvider; + private readonly IEventSender eventSender; + private readonly PoleSagasOption poleSagasOption; + private readonly ISerializer serializer; + private readonly IActivityFinder activityFinder; + public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, IOptions poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) + { + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption.Value; + this.serializer = serializer; + this.activityFinder = activityFinder; + } + + public ISaga CreateSaga() + { + return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); + } + } +} diff --git a/src/Pole.Sagas.Client/SagaRestorer.cs b/src/Pole.Sagas.Client/SagaRestorer.cs new file mode 100644 index 0000000..2e46c4a --- /dev/null +++ b/src/Pole.Sagas.Client/SagaRestorer.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Client +{ + class SagaRestorer + { + private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + private readonly IServiceProvider serviceProvider; + private readonly IEventSender eventSender; + private readonly PoleSagasOption poleSagasOption; + private readonly ISerializer serializer; + private readonly IActivityFinder activityFinder; + public SagaRestorer(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) + { + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption; + this.serializer = serializer; + this.activityFinder = activityFinder; + } + internal Saga CreateSaga(SagaEntity sagaEntity) + { + var saga = new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder, sagaEntity.Id); + foreach (var activity in sagaEntity.ActivityEntities) + { + saga.AddActivity(activity.Name, activity.Status, activity.ParameterData, activity.Order); + } + return saga; + } + } +} diff --git a/src/Pole.Sagas.Server/ISagasBuffer.cs b/src/Pole.Sagas.Server/ISagasBuffer.cs index e9bb538..8eac43f 100644 --- a/src/Pole.Sagas.Server/ISagasBuffer.cs +++ b/src/Pole.Sagas.Server/ISagasBuffer.cs @@ -9,7 +9,7 @@ namespace Pole.Sagas.Server { public interface ISagasBuffer { - Task> GetSagas(string serviceName, DateTime dateTime, int limit); + Task> GetSagas(string serviceName, int limit); Task AddSagas(IAsyncEnumerable sagasGroupEntities); } } diff --git a/src/Pole.Sagas.Server/PoleSagasServerOption.cs b/src/Pole.Sagas.Server/PoleSagasServerOption.cs index 20a8d1c..cb80b5b 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerOption.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerOption.cs @@ -6,7 +6,8 @@ namespace Pole.Sagas.Server { public class PoleSagasServerOption { - public int NotEndedSagasFetchIntervalSeconds { get; set; } = 10; + public int NotEndedSagasFetchIntervalSeconds { get; set; } = 30; + public int GetSagasGrpcStreamingResponseDelaySeconds { get; set; } = 20; public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60; public int ExpiredDataDeleteBatchCount { get; set; } = 1000; public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3; diff --git a/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs b/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs index b8d02fc..32fa570 100644 --- a/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs +++ b/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Options; using Pole.Core.Processor; using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Text; @@ -45,7 +46,7 @@ namespace Pole.Sagas.Server.Processor private async Task ProcessInternal() { var addTimeFilter = DateTime.UtcNow.AddMinutes(-4); - var sagas = sagaStorage.GetSagas(addTimeFilter, 500); + var sagas = sagaStorage.GetSagas(addTimeFilter, 2000); await sagasBuffer.AddSagas(sagas); } } diff --git a/src/Pole.Sagas.Server/SagasBuffer.cs b/src/Pole.Sagas.Server/SagasBuffer.cs index 0fb6854..20dc0ba 100644 --- a/src/Pole.Sagas.Server/SagasBuffer.cs +++ b/src/Pole.Sagas.Server/SagasBuffer.cs @@ -49,7 +49,7 @@ namespace Pole.Sagas.Server } } - public async Task> GetSagas(string serviceName, DateTime dateTime, int limit) + public async Task> GetSagas(string serviceName, int limit) { try { diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 727fa6f..dfd3a2a 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -1,6 +1,8 @@ using Google.Protobuf; using Grpc.Core; +using Microsoft.Extensions.Options; using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Server.Grpc; using System; using System.Collections.Generic; @@ -14,10 +16,12 @@ namespace Pole.Sagas.Server.Services { private readonly ISagaStorage sagaStorage; private readonly ISagasBuffer sagasBuffer; - public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer) + private readonly PoleSagasServerOption poleSagasServerOption; + public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer, IOptions poleSagasServerOption) { this.sagaStorage = sagaStorage; this.sagasBuffer = sagasBuffer; + this.poleSagasServerOption = poleSagasServerOption.Value; } public override async Task ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context) { @@ -159,40 +163,46 @@ namespace Pole.Sagas.Server.Services } return commonResponse; } - public override async Task GetSagas(GetSagasRequest request, ServerCallContext context) + public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter responseStream, ServerCallContext context) { - GetSagasResponse getSagasResponse = new GetSagasResponse(); - try + while (!context.CancellationToken.IsCancellationRequested) { - var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, Convert.ToDateTime(request.AddTime), request.Limit); - var sagaDtoes = sagaEntities.Select(m => + await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds*1000); + + GetSagasResponse getSagasResponse = new GetSagasResponse(); + try { - var result = new GetSagasResponse.Types.Saga + var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, request.Limit); + var sagaDtoes = sagaEntities.Select(m => { - Id = m.Id, - }; - result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity - { - CompensateTimes = n.CompensateTimes, - ExecuteTimes = n.ExecuteTimes, - Id = n.Id, - Name = n.Id, - Order = n.Order, - ParameterData = ByteString.CopyFrom(n.ParameterData), - SagaId = n.SagaId, - Status = n.Status - })); - return result; - }); - getSagasResponse.Sagas.Add(sagaDtoes); - getSagasResponse.IsSuccess = true; - } - catch (Exception ex) - { - getSagasResponse.Errors = CombineError(ex); + var result = new GetSagasResponse.Types.Saga + { + Id = m.Id, + }; + result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = ByteString.CopyFrom(n.ParameterData), + SagaId = n.SagaId, + Status = n.Status + })); + return result; + }); + getSagasResponse.Sagas.Add(sagaDtoes); + getSagasResponse.IsSuccess = true; + } + catch (Exception ex) + { + getSagasResponse.Errors = CombineError(ex); + } + await responseStream.WriteAsync(getSagasResponse); } - return getSagasResponse; } + private string CombineError(Exception exception) { return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 9609673..4db0c32 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -34,7 +34,7 @@ namespace Pole.Sagas.Storage.PostgreSql using (var tansaction = await connection.BeginTransactionAsync()) { var updateActivitySql = -$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; +$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id"; await connection.ExecuteAsync(updateActivitySql, new { Id = activityId, @@ -230,11 +230,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@Compen using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { var updateActivitySql = -$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\""; +$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status "; var activities = await connection.QueryAsync(updateActivitySql, new { AddTime = dateTime, Limit = limit, + Status = nameof(ActivityStatus.Compensated) }); var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName); foreach (var groupedByServiceName in groupedByServiceNameActivities) @@ -271,7 +272,7 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I } } - public Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) + public Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { diff --git a/src/Pole.Sagas/Core/Abstraction/IActivity.cs b/src/Pole.Sagas/Core/Abstraction/IActivity.cs deleted file mode 100644 index bfc4ecf..0000000 --- a/src/Pole.Sagas/Core/Abstraction/IActivity.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core.Abstraction -{ - public interface IActivity - { - Task Execute(TData data ,CancellationToken cancellationToken); - Task Compensate(TData data, CancellationToken cancellationToken); - } -} diff --git a/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs b/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs deleted file mode 100644 index ca724c4..0000000 --- a/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Core.Abstraction -{ - public interface IActivityFinder - { - Type FindType(string name); - string GetName(Type type); - } -} diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs deleted file mode 100644 index d4cb597..0000000 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core.Abstraction -{ - public interface IEventSender - { - Task SagaStarted(string sagaId, string serviceName, DateTime addTime); - Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); - Task ActivityExecuteAborted(string activityId); - Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityExecuted(string activityId); - Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime); - Task ActivityRevoked(string activityId); - Task ActivityCompensating(string activityId,int compensateTimes); - } -} diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs b/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs deleted file mode 100644 index e02ac24..0000000 --- a/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Core.Abstraction -{ - public interface ISagaFactory - { - ISaga CreateSaga(); - } -} diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs b/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs deleted file mode 100644 index 69629b0..0000000 --- a/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Core.Abstraction -{ - public interface ISagaInvoker - { - } -} diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs new file mode 100644 index 0000000..8a7aca0 --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs @@ -0,0 +1,26 @@ +using Google.Protobuf.Collections; +using Pole.Sagas.Core; +using Pole.Sagas.Server.Grpc; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface ISagaStorage + { + Task SagaStarted(string sagaId, string serviceName,DateTime addTime); + Task SagaEnded(string sagaId, DateTime ExpiresAt); + Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); + Task ActivityExecuteAborted(string activityId); + Task ActivityCompensateAborted(string activityId, string sagaId, string errors); + Task ActivityExecuted(string activityId); + Task ActivityCompensated(string activityId); + Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime); + Task ActivityRevoked(string activityId); + Task ActivityCompensating(string activityId, int compensateTimes); + IAsyncEnumerable GetSagas(DateTime dateTime, int limit); + Task DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount); + } +} diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs index 8c98e1f..fea4a46 100644 --- a/src/Pole.Sagas/Core/ActivityEntity.cs +++ b/src/Pole.Sagas/Core/ActivityEntity.cs @@ -7,6 +7,7 @@ namespace Pole.Sagas.Core public class ActivityEntity { public string Id { get; set; } + public string Name { get; set; } public string SagaId { get; set; } public int Order { get; set; } public string Status { get; set; } diff --git a/src/Pole.Sagas/Core/ActivityFinder.cs b/src/Pole.Sagas/Core/ActivityFinder.cs deleted file mode 100644 index 17d4f7a..0000000 --- a/src/Pole.Sagas/Core/ActivityFinder.cs +++ /dev/null @@ -1,57 +0,0 @@ -using Microsoft.Extensions.Logging; -using Pole.Core.Utils; -using Pole.Sagas.Core.Abstraction; -using Pole.Sagas.Core.Exceptions; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Text; - -namespace Pole.Sagas.Core -{ - public class ActivityFinder : IActivityFinder - { - private readonly ConcurrentDictionary nameDict = new ConcurrentDictionary(); - private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); - readonly ILogger logger; - public ActivityFinder(ILogger logger) - { - this.logger = logger; - var baseActivityType = typeof(IActivity<>); - foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) - { - - foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract)) - { - if (!type.FullName.EndsWith("Activity")) - { - throw new ActivityNameIrregularException(type); - } - var activityName = type.Name.Substring(0, type.Name.Length - "Activity".Length); - typeDict.TryAdd(type, activityName); - - if (!nameDict.TryAdd(activityName, type)) - { - throw new ActivityNameRepeatedException(activityName); - } - } - } - } - public Type FindType(string name) - { - if (nameDict.TryGetValue(name, out Type type)) - { - return type; - } - throw new ActivityNotFoundByNameException(name); - } - - public string GetName(Type type) - { - if (!typeDict.TryGetValue(type, out var value)) - throw new ActivityNotFoundByTypeException(type); - return value; - } - } -} diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs deleted file mode 100644 index fae7597..0000000 --- a/src/Pole.Sagas/Core/ActivityWapper.cs +++ /dev/null @@ -1,61 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core -{ - public class ActivityWapper - { - public string Id { get; set; } - public string Name { get; set; } - public Type ActivityType { get; set; } - public Type ActivityDataType { get; set; } - public object DataObj { get; set; } - public int Order { get; set; } - public int ExecuteTimes { get; set; } - public int CompensateTimes { get; set; } - public ActivityStatus ActivityStatus { get; set; } - public IServiceProvider ServiceProvider { get; set; } - - public int TimeOutSeconds { get; set; } - public CancellationTokenSource CancellationTokenSource { get; set; } - public Task InvokeExecute() - { - var activityObjParams = Expression.Parameter(typeof(object), "activity"); - var activityParams = Expression.Convert(activityObjParams, ActivityType); - var dataObjParams = Expression.Parameter(typeof(object), "data"); - var dataParams = Expression.Convert(dataObjParams, ActivityDataType); - var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); - var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType, typeof(CancellationToken) }); - var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); - var func = Expression.Lambda>>(body, true, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); - - using (var scope = ServiceProvider.CreateScope()) - { - var activity = scope.ServiceProvider.GetRequiredService(ActivityType); - return func(activity, DataObj, CancellationTokenSource.Token); - } - } - public Task InvokeCompensate() - { - var activityObjParams = Expression.Parameter(typeof(object), "activity"); - var activityParams = Expression.Convert(activityObjParams, ActivityType); - var dataObjParams = Expression.Parameter(typeof(object), "data"); - var dataParams = Expression.Convert(dataObjParams, ActivityDataType); - var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); - var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType, typeof(CancellationToken) }); - var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); - var func = Expression.Lambda>(body, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); - - using (var scope = ServiceProvider.CreateScope()) - { - var activity = scope.ServiceProvider.GetRequiredService(ActivityType); - return func(activity, DataObj, CancellationTokenSource.Token); - } - } - } -} diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs deleted file mode 100644 index 0ddbe6c..0000000 --- a/src/Pole.Sagas/Core/EventSender.cs +++ /dev/null @@ -1,150 +0,0 @@ -using Grpc.Net.Client; -using Microsoft.Extensions.Options; -using Pole.Sagas.Core.Abstraction; -using Pole.Sagas.Core.Exceptions; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using static Pole.Sagas.Server.Grpc.Saga; - -namespace Pole.Sagas.Core -{ - public class EventSender : IEventSender - { - private readonly SagaClient sagaClient; - public EventSender(SagaClient sagaClient) - { - this.sagaClient = sagaClient; - } - public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) - { - var result = await sagaClient.ActivityCompensateAbortedAsync(new Server.Grpc.ActivityCompensateAbortedRequest - { - ActivityId = activityId, - Errors = errors, - SagaId = sagaId - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityCompensated(string activityId) - { - var result = await sagaClient.ActivityCompensatedAsync(new Server.Grpc.ActivityCompensatedRequest - { - ActivityId = activityId, - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityExecuted(string activityId) - { - var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest - { - ActivityId = activityId, - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityExecuteAborted(string activityId) - { - var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest - { - ActivityId = activityId - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes) - { - var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest - { - ActivityId = activityId, - ActivityName = activityName, - AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), - Order = order, - ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), - SagaId = sagaId, - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task SagaEnded(string sagaId, DateTime ExpiresAt) - { - var result = await sagaClient.SagaEndedAsync(new Server.Grpc.SagaEndedRequest - { - SagaId = sagaId, - ExpiresAt = ExpiresAt.ToString("yyyy-MM-dd HH:mm:ss.fff"), - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime) - { - var result = await sagaClient.SagaStartedAsync(new Server.Grpc.SagaStartedRequest - { - SagaId = sagaId, - ServiceName = serviceName, - AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime) - { - var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest - { - ActivityId = activityId, - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityRevoked(string activityId) - { - var result = await sagaClient.ActivityRevokedAsync(new Server.Grpc.ActivityRevokedRequest - { - ActivityId = activityId, - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityCompensating(string activityId, int compensateTimes) - { - var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest - { - ActivityId = activityId, - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - } -} diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs index abc428b..3003c90 100644 --- a/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs +++ b/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs @@ -4,7 +4,7 @@ using System.Text; namespace Pole.Sagas.Core.Exceptions { - class ActivityImplementIrregularException: Exception + public class ActivityImplementIrregularException : Exception { public ActivityImplementIrregularException(string name) : base($"Activity name :{name }must have and only inherit from IActivity<>") { diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs new file mode 100644 index 0000000..7e7126d --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class ActivityNotFoundWhenCompensateRetryException : Exception + { + public ActivityNotFoundWhenCompensateRetryException(string activityName):base($"Activity:{activityName} NotFound When Compensate Retry") + { + + } + } +} diff --git a/src/Pole.Sagas/Core/ISaga.cs b/src/Pole.Sagas/Core/ISaga.cs deleted file mode 100644 index 58686c0..0000000 --- a/src/Pole.Sagas/Core/ISaga.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core -{ - public interface ISaga - { - string Id { get; } - void AddActivity(string activityName, object data,int timeOutSeconds=2); - Task GetResult(); - } -} diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs deleted file mode 100644 index 1e6e747..0000000 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Google.Protobuf.Collections; -using Pole.Sagas.Core; -using Pole.Sagas.Server.Grpc; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core -{ - public interface ISagaStorage - { - Task SagaStarted(string sagaId, string serviceName,DateTime addTime); - Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); - Task ActivityExecuteAborted(string activityId); - Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityExecuted(string activityId); - Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime); - Task ActivityRevoked(string activityId); - Task ActivityCompensating(string activityId, int compensateTimes); - IAsyncEnumerable GetSagas(DateTime dateTime, int limit); - Task DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount); - } -} diff --git a/src/Pole.Sagas/Core/PoleSagasOption.cs b/src/Pole.Sagas/Core/PoleSagasOption.cs deleted file mode 100644 index adb205e..0000000 --- a/src/Pole.Sagas/Core/PoleSagasOption.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Core -{ - public class PoleSagasOption - { - public string ServiceName { get; set; } - public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; - public int SagasTimeOutSeconds { get; set; } = 60; - public string SagasServerHost { get; set; } - } -} diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs deleted file mode 100644 index 9ef4ae7..0000000 --- a/src/Pole.Sagas/Core/Saga.cs +++ /dev/null @@ -1,209 +0,0 @@ -using Pole.Core.Serialization; -using Pole.Core.Utils.Abstraction; -using Pole.Sagas.Core.Abstraction; -using Pole.Sagas.Core.Exceptions; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Sagas.Core -{ - public class Saga : ISaga - { - private List activities = new List(); - private IServiceProvider serviceProvider; - private IEventSender eventSender; - private ISnowflakeIdGenerator snowflakeIdGenerator; - private IActivityFinder activityFinder; - private PoleSagasOption poleSagasOption; - private int CurrentMaxOrder - { - get { return activities.Count; } - } - /// - /// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义 - /// - private int currentExecuteOrder = 0; - /// - /// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义 - /// - private int currentCompensateOrder = -1; - private ISerializer serializer; - public string Id { get; } - - internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) - { - this.snowflakeIdGenerator = snowflakeIdGenerator; - this.serviceProvider = serviceProvider; - this.eventSender = eventSender; - this.poleSagasOption = poleSagasOption; - this.serializer = serializer; - this.activityFinder = activityFinder; - Id = snowflakeIdGenerator.NextId(); - } - internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, int currentExecuteOrder, int currentCompensateOrder, List activities) - { - this.snowflakeIdGenerator = snowflakeIdGenerator; - this.serviceProvider = serviceProvider; - this.eventSender = eventSender; - this.poleSagasOption = poleSagasOption; - this.serializer = serializer; - this.activityFinder = activityFinder; - Id = snowflakeIdGenerator.NextId(); - this.currentExecuteOrder = currentExecuteOrder; - this.currentCompensateOrder = currentCompensateOrder; - this.activities = activities; - } - - public void AddActivity(string activityName, object data, int timeOutSeconds = 2) - { - var targetActivityType = activityFinder.FindType(activityName); - - var activityInterface = targetActivityType.GetInterfaces().FirstOrDefault(); - if (!activityInterface.IsGenericType) - { - throw new ActivityImplementIrregularException(activityName); - } - var dataType = activityInterface.GetGenericArguments()[0]; - ActivityWapper activityWapper = new ActivityWapper - { - Name = activityName, - ActivityDataType = dataType, - ActivityStatus = ActivityStatus.NotStarted, - ActivityType = targetActivityType, - DataObj = data, - Order = CurrentMaxOrder, - ServiceProvider = serviceProvider, - TimeOutSeconds = 2, - }; - activities.Add(activityWapper); - } - - public async Task GetResult() - { - await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow); - var executeActivity = GetNextExecuteActivity(); - if (executeActivity == null) - { - var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds); - await eventSender.SagaEnded(Id, expiresAt); - return SagaResult.SuccessResult; - } - var result = await RecursiveExecuteActivity(executeActivity); - return result; - } - - private ActivityWapper GetNextExecuteActivity() - { - if (currentExecuteOrder == CurrentMaxOrder) - { - return null; - } - currentExecuteOrder++; - return activities[currentExecuteOrder - 1]; - } - private ActivityWapper GetNextCompensateActivity() - { - currentCompensateOrder--; - if (currentCompensateOrder == 0) - { - return null; - } - - return activities[currentCompensateOrder - 1]; - } - private async Task RecursiveCompensateActivity(ActivityWapper activityWapper) - { - var activityId = activityWapper.Id; - try - { - await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes); - await activityWapper.InvokeCompensate(); - await eventSender.ActivityCompensated(activityId); - var compensateActivity = GetNextCompensateActivity(); - if (compensateActivity == null) - { - return; - } - await RecursiveCompensateActivity(compensateActivity); - } - catch (Exception exception) - { - await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace); - } - } - private async Task RecursiveExecuteActivity(ActivityWapper activityWapper) - { - var activityId = snowflakeIdGenerator.NextId(); - activityWapper.Id = activityId; - activityWapper.ExecuteTimes++; - activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); - try - { - var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes); - var result = await activityWapper.InvokeExecute(); - if (!result.IsSuccess) - { - await eventSender.ActivityRevoked(activityId); - await CompensateActivity(result, currentExecuteOrder); - return result; - } - await eventSender.ActivityExecuted(activityId); - var executeActivity = GetNextExecuteActivity(); - if (executeActivity == null) - { - return result; - } - else - { - return await RecursiveExecuteActivity(executeActivity); - } - } - catch (Exception exception) - { - if (activityWapper.CancellationTokenSource.Token.IsCancellationRequested) - { - var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; - var result = new ActivityExecuteResult - { - IsSuccess = false, - Errors = errors - }; - var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent,DateTime.UtcNow); - // 超时的时候 需要首先补偿这个超时的操作 - return await CompensateActivity(result, currentExecuteOrder + 1); - } - else - { - var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; - var result = new ActivityExecuteResult - { - IsSuccess = false, - Errors = errors - }; - await eventSender.ActivityExecuteAborted(activityId); - // 出错的时候 需要首先补偿这个出错的操作 - return await CompensateActivity(result, currentExecuteOrder + 1); - } - } - } - - private async Task CompensateActivity(ActivityExecuteResult result, int currentCompensateOrder) - { - this.currentCompensateOrder = currentCompensateOrder; - currentExecuteOrder = -1; - var compensateActivity = GetNextCompensateActivity(); - if (compensateActivity == null) - { - return result; - } - await RecursiveCompensateActivity(compensateActivity); - return result; - } - } -} diff --git a/src/Pole.Sagas/Core/SagaEntity.cs b/src/Pole.Sagas/Core/SagaEntity.cs index 216916d..a683231 100644 --- a/src/Pole.Sagas/Core/SagaEntity.cs +++ b/src/Pole.Sagas/Core/SagaEntity.cs @@ -8,7 +8,7 @@ namespace Pole.Sagas.Core { public string Id { get; set; } public string ServiceName { get; set; } - public List ActivityEntities { get; set; } + public List ActivityEntities { get; set; } public string Status { get; set; } public DateTime? ExpiresAt { get; set; } public DateTime AddTime { get; set; } diff --git a/src/Pole.Sagas/Core/SagaFactory.cs b/src/Pole.Sagas/Core/SagaFactory.cs deleted file mode 100644 index 627b00d..0000000 --- a/src/Pole.Sagas/Core/SagaFactory.cs +++ /dev/null @@ -1,38 +0,0 @@ -using Microsoft.Extensions.Options; -using Pole.Core.Serialization; -using Pole.Core.Utils.Abstraction; -using Pole.Sagas.Core.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Core -{ - public class SagaFactory : ISagaFactory - { - private readonly ISnowflakeIdGenerator snowflakeIdGenerator; - private readonly IServiceProvider serviceProvider; - private readonly IEventSender eventSender; - private readonly PoleSagasOption poleSagasOption; - private readonly ISerializer serializer; - private readonly IActivityFinder activityFinder; - public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, IOptions poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) - { - this.snowflakeIdGenerator = snowflakeIdGenerator; - this.serviceProvider = serviceProvider; - this.eventSender = eventSender; - this.poleSagasOption = poleSagasOption.Value; - this.serializer = serializer; - this.activityFinder = activityFinder; - } - - public ISaga CreateSaga() - { - return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); - } - internal ISaga CreateSaga(string id) - { - return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); - } - } -} diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index c91e985..2f42d05 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -15,7 +15,7 @@ service Saga { rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse); - rpc GetSagas (GetSagasRequest) returns (GetSagasResponse); + rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse); } message CommonResponse{ @@ -70,8 +70,7 @@ message ActivityCompensatingRequest { } message GetSagasRequest{ string serviceName = 1; - string addTime = 2; - int32 limit = 3; + int32 limit = 2; } message GetSagasResponse{ bool isSuccess = 1; -- libgit2 0.25.0