diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs index 6d164d8..d5f11fa 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs index 18797dc..8defe62 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs index ec2930d..10f1c3d 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs index 6f446a4..66dfd26 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs index d75dddf..5786bc3 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs index 7e3610a..385d74d 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs index ef68f52..baad6cd 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs index 0da07ab..8b1a86f 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs index e071511..50e2a59 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs index 74f1470..2bbc783 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs index af083f7..84bed97 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs @@ -1,4 +1,5 @@ -using Pole.Sagas.Core; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; diff --git a/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs b/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs index 7912332..af61da5 100644 --- a/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs +++ b/samples/apis/SagasTest.Api/Controllers/SagasTestController.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; +using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core.Abstraction; using SagasTest.Api.Activities; diff --git a/src/Pole.Sagas/Core/Abstraction/IActivity.cs b/src/Pole.Sagas.Client/Abstraction/IActivity.cs similarity index 82% rename from src/Pole.Sagas/Core/Abstraction/IActivity.cs rename to src/Pole.Sagas.Client/Abstraction/IActivity.cs index bfc4ecf..c5995e8 100644 --- a/src/Pole.Sagas/Core/Abstraction/IActivity.cs +++ b/src/Pole.Sagas.Client/Abstraction/IActivity.cs @@ -1,10 +1,11 @@ -using System; +using Pole.Sagas.Core; +using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; -namespace Pole.Sagas.Core.Abstraction +namespace Pole.Sagas.Client.Abstraction { public interface IActivity { diff --git a/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs b/src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs similarity index 90% rename from src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs rename to src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs index ca724c4..c742d9c 100644 --- a/src/Pole.Sagas/Core/Abstraction/IActivityFinder.cs +++ b/src/Pole.Sagas.Client/Abstraction/IActivityFinder.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Core.Abstraction +namespace Pole.Sagas.Client.Abstraction { public interface IActivityFinder { diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs similarity index 95% rename from src/Pole.Sagas/Core/Abstraction/IEventSender.cs rename to src/Pole.Sagas.Client/Abstraction/IEventSender.cs index d4cb597..31befd7 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs @@ -1,7 +1,7 @@ using System; using System.Threading.Tasks; -namespace Pole.Sagas.Core.Abstraction +namespace Pole.Sagas.Client.Abstraction { public interface IEventSender { diff --git a/src/Pole.Sagas/Core/ISaga.cs b/src/Pole.Sagas.Client/Abstraction/ISaga.cs similarity index 81% rename from src/Pole.Sagas/Core/ISaga.cs rename to src/Pole.Sagas.Client/Abstraction/ISaga.cs index 58686c0..8f99545 100644 --- a/src/Pole.Sagas/Core/ISaga.cs +++ b/src/Pole.Sagas.Client/Abstraction/ISaga.cs @@ -1,9 +1,10 @@ -using System; +using Pole.Sagas.Core; +using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client.Abstraction { public interface ISaga { diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs b/src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs similarity index 90% rename from src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs rename to src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs index e02ac24..3e10991 100644 --- a/src/Pole.Sagas/Core/Abstraction/ISagaFactory.cs +++ b/src/Pole.Sagas.Client/Abstraction/ISagaFactory.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Core.Abstraction +namespace Pole.Sagas.Client.Abstraction { public interface ISagaFactory { diff --git a/src/Pole.Sagas/Core/ActivityFinder.cs b/src/Pole.Sagas.Client/ActivityFinder.cs similarity index 97% rename from src/Pole.Sagas/Core/ActivityFinder.cs rename to src/Pole.Sagas.Client/ActivityFinder.cs index 17d4f7a..4e5cbaf 100644 --- a/src/Pole.Sagas/Core/ActivityFinder.cs +++ b/src/Pole.Sagas.Client/ActivityFinder.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging; using Pole.Core.Utils; +using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Exceptions; using System; @@ -8,7 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client { public class ActivityFinder : IActivityFinder { diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas.Client/ActivityWapper.cs similarity index 97% rename from src/Pole.Sagas/Core/ActivityWapper.cs rename to src/Pole.Sagas.Client/ActivityWapper.cs index fae7597..aaf6d1b 100644 --- a/src/Pole.Sagas/Core/ActivityWapper.cs +++ b/src/Pole.Sagas.Client/ActivityWapper.cs @@ -1,4 +1,5 @@ using Microsoft.Extensions.DependencyInjection; +using Pole.Sagas.Core; using System; using System.Collections.Generic; using System.Linq.Expressions; @@ -6,7 +7,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client { public class ActivityWapper { diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas.Client/EventSender.cs similarity index 96% rename from src/Pole.Sagas/Core/EventSender.cs rename to src/Pole.Sagas.Client/EventSender.cs index 0ddbe6c..95bd85b 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas.Client/EventSender.cs @@ -1,5 +1,6 @@ using Grpc.Net.Client; using Microsoft.Extensions.Options; +using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Exceptions; using System; @@ -8,7 +9,7 @@ using System.Text; using System.Threading.Tasks; using static Pole.Sagas.Server.Grpc.Saga; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client { public class EventSender : IEventSender { diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs new file mode 100644 index 0000000..24a795b --- /dev/null +++ b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs @@ -0,0 +1,99 @@ +using Grpc.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using static Pole.Sagas.Server.Grpc.Saga; + +namespace Pole.Sagas.Client +{ + public class NotEndedSagasCompensateRetryBackgroundService : IHostedService + { + private readonly PoleSagasOption options; + private readonly SagaClient sagaClient; + private readonly SagaRestorer sagaRestorer; + public NotEndedSagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider) + { + this.options = options.Value; + this.sagaClient = sagaClient; + sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService(), serviceProvider, serviceProvider.GetRequiredService(), this.options, serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService()); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) + { + while (await stream.ResponseStream.MoveNext(cancellationToken)) + { + if (stream.ResponseStream.Current.IsSuccess) + { + var sagas = stream.ResponseStream.Current.Sagas.Select(m => + { + var result = new SagaEntity + { + Id = m.Id, + }; + result.ActivityEntities = m.Activities.Select(n => new ActivityEntity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = n.ParameterData.ToByteArray(), + SagaId = n.SagaId, + Status = n.Status + }).ToList(); + return result; + }).ToList(); + sagas.ForEach(async sagaEntity => + { + var saga = sagaRestorer.CreateSaga(sagaEntity); + await saga.Compensate(); + }); + } + } + //await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken)) + //{ + // if (getSagasResponse.IsSuccess) + // { + // var sagas = getSagasResponse.Sagas.Select(m => + // { + // var result = new SagaEntity + // { + // Id = m.Id, + // }; + // result.ActivityEntities = m.Activities.Select(n => new ActivityEntity + // { + // CompensateTimes = n.CompensateTimes, + // ExecuteTimes = n.ExecuteTimes, + // Id = n.Id, + // Name = n.Id, + // Order = n.Order, + // ParameterData = n.ParameterData.ToByteArray(), + // SagaId = n.SagaId, + // Status = n.Status + // }).ToList(); + // return result; + // }); + + // } + //} + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs index 3c053c8..61964d0 100644 --- a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs @@ -2,10 +2,13 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using Grpc.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Pole.Core; using Pole.Core.Utils; +using Pole.Sagas.Client; +using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Exceptions; @@ -21,14 +24,20 @@ namespace Microsoft.Extensions.DependencyInjection startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); - using(var provider = startupOption.Services.BuildServiceProvider()) + PoleSagasOption sagasOption = null; + using (var provider = startupOption.Services.BuildServiceProvider()) { - var sagasOption = provider.GetRequiredService>().Value; + sagasOption = provider.GetRequiredService>().Value; startupOption.Services.AddGrpcClient(o => { o.Address = new Uri(sagasOption.SagasServerHost); }); } + RegisterActivities(startupOption); + } + + private static void RegisterActivities(StartupConfig startupOption) + { var baseActivityType = typeof(IActivity<>); foreach (var assembly in AssemblyHelper.GetAssemblies()) { diff --git a/src/Pole.Sagas/Core/PoleSagasOption.cs b/src/Pole.Sagas.Client/PoleSagasOption.cs similarity index 88% rename from src/Pole.Sagas/Core/PoleSagasOption.cs rename to src/Pole.Sagas.Client/PoleSagasOption.cs index adb205e..77a25d5 100644 --- a/src/Pole.Sagas/Core/PoleSagasOption.cs +++ b/src/Pole.Sagas.Client/PoleSagasOption.cs @@ -2,11 +2,12 @@ using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client { public class PoleSagasOption { public string ServiceName { get; set; } + public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20; public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; public int SagasTimeOutSeconds { get; set; } = 60; public string SagasServerHost { get; set; } diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas.Client/Saga.cs similarity index 77% rename from src/Pole.Sagas/Core/Saga.cs rename to src/Pole.Sagas.Client/Saga.cs index 9ef4ae7..9110042 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas.Client/Saga.cs @@ -1,5 +1,7 @@ using Pole.Core.Serialization; using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Exceptions; using System; @@ -9,7 +11,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client { public class Saga : ISaga { @@ -44,7 +46,7 @@ namespace Pole.Sagas.Core this.activityFinder = activityFinder; Id = snowflakeIdGenerator.NextId(); } - internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, int currentExecuteOrder, int currentCompensateOrder, List activities) + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, string id) { this.snowflakeIdGenerator = snowflakeIdGenerator; this.serviceProvider = serviceProvider; @@ -52,10 +54,8 @@ namespace Pole.Sagas.Core this.poleSagasOption = poleSagasOption; this.serializer = serializer; this.activityFinder = activityFinder; - Id = snowflakeIdGenerator.NextId(); - this.currentExecuteOrder = currentExecuteOrder; - this.currentCompensateOrder = currentCompensateOrder; - this.activities = activities; + Id = id; + this.currentExecuteOrder = -1; } public void AddActivity(string activityName, object data, int timeOutSeconds = 2) @@ -81,6 +81,29 @@ namespace Pole.Sagas.Core }; activities.Add(activityWapper); } + internal void AddActivity(string activityName, string activityStatus, object data, int order, int timeOutSeconds = 2) + { + var targetActivityType = activityFinder.FindType(activityName); + + var activityInterface = targetActivityType.GetInterfaces().FirstOrDefault(); + if (!activityInterface.IsGenericType) + { + throw new ActivityNotFoundWhenCompensateRetryException(activityName); + } + var dataType = activityInterface.GetGenericArguments()[0]; + ActivityWapper activityWapper = new ActivityWapper + { + Name = activityName, + ActivityDataType = dataType, + ActivityStatus = (ActivityStatus)Enum.Parse(typeof(ActivityStatus), activityStatus), + ActivityType = targetActivityType, + DataObj = data, + Order = order, + ServiceProvider = serviceProvider, + TimeOutSeconds = 2, + }; + activities.Add(activityWapper); + } public async Task GetResult() { @@ -95,6 +118,16 @@ namespace Pole.Sagas.Core var result = await RecursiveExecuteActivity(executeActivity); return result; } + internal async Task Compensate() + { + this.currentCompensateOrder = CurrentMaxOrder+1; + var compensateActivity = GetNextCompensateActivity(); + if (compensateActivity == null) + { + return ; + } + await RecursiveCompensateActivity(compensateActivity); + } private ActivityWapper GetNextExecuteActivity() { @@ -174,7 +207,7 @@ namespace Pole.Sagas.Core Errors = errors }; var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent,DateTime.UtcNow); + await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent, DateTime.UtcNow); // 超时的时候 需要首先补偿这个超时的操作 return await CompensateActivity(result, currentExecuteOrder + 1); } diff --git a/src/Pole.Sagas/Core/SagaFactory.cs b/src/Pole.Sagas.Client/SagaFactory.cs similarity index 88% rename from src/Pole.Sagas/Core/SagaFactory.cs rename to src/Pole.Sagas.Client/SagaFactory.cs index 627b00d..1cf6035 100644 --- a/src/Pole.Sagas/Core/SagaFactory.cs +++ b/src/Pole.Sagas.Client/SagaFactory.cs @@ -1,12 +1,14 @@ using Microsoft.Extensions.Options; using Pole.Core.Serialization; using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Client { public class SagaFactory : ISagaFactory { @@ -30,9 +32,5 @@ namespace Pole.Sagas.Core { return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); } - internal ISaga CreateSaga(string id) - { - return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder); - } } } diff --git a/src/Pole.Sagas.Client/SagaRestorer.cs b/src/Pole.Sagas.Client/SagaRestorer.cs new file mode 100644 index 0000000..2e46c4a --- /dev/null +++ b/src/Pole.Sagas.Client/SagaRestorer.cs @@ -0,0 +1,39 @@ +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Client +{ + class SagaRestorer + { + private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + private readonly IServiceProvider serviceProvider; + private readonly IEventSender eventSender; + private readonly PoleSagasOption poleSagasOption; + private readonly ISerializer serializer; + private readonly IActivityFinder activityFinder; + public SagaRestorer(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder) + { + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.serviceProvider = serviceProvider; + this.eventSender = eventSender; + this.poleSagasOption = poleSagasOption; + this.serializer = serializer; + this.activityFinder = activityFinder; + } + internal Saga CreateSaga(SagaEntity sagaEntity) + { + var saga = new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder, sagaEntity.Id); + foreach (var activity in sagaEntity.ActivityEntities) + { + saga.AddActivity(activity.Name, activity.Status, activity.ParameterData, activity.Order); + } + return saga; + } + } +} diff --git a/src/Pole.Sagas.Server/ISagasBuffer.cs b/src/Pole.Sagas.Server/ISagasBuffer.cs index e9bb538..8eac43f 100644 --- a/src/Pole.Sagas.Server/ISagasBuffer.cs +++ b/src/Pole.Sagas.Server/ISagasBuffer.cs @@ -9,7 +9,7 @@ namespace Pole.Sagas.Server { public interface ISagasBuffer { - Task> GetSagas(string serviceName, DateTime dateTime, int limit); + Task> GetSagas(string serviceName, int limit); Task AddSagas(IAsyncEnumerable sagasGroupEntities); } } diff --git a/src/Pole.Sagas.Server/PoleSagasServerOption.cs b/src/Pole.Sagas.Server/PoleSagasServerOption.cs index 20a8d1c..cb80b5b 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerOption.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerOption.cs @@ -6,7 +6,8 @@ namespace Pole.Sagas.Server { public class PoleSagasServerOption { - public int NotEndedSagasFetchIntervalSeconds { get; set; } = 10; + public int NotEndedSagasFetchIntervalSeconds { get; set; } = 30; + public int GetSagasGrpcStreamingResponseDelaySeconds { get; set; } = 20; public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60; public int ExpiredDataDeleteBatchCount { get; set; } = 1000; public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3; diff --git a/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs b/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs index b8d02fc..32fa570 100644 --- a/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs +++ b/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Options; using Pole.Core.Processor; using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Text; @@ -45,7 +46,7 @@ namespace Pole.Sagas.Server.Processor private async Task ProcessInternal() { var addTimeFilter = DateTime.UtcNow.AddMinutes(-4); - var sagas = sagaStorage.GetSagas(addTimeFilter, 500); + var sagas = sagaStorage.GetSagas(addTimeFilter, 2000); await sagasBuffer.AddSagas(sagas); } } diff --git a/src/Pole.Sagas.Server/SagasBuffer.cs b/src/Pole.Sagas.Server/SagasBuffer.cs index 0fb6854..20dc0ba 100644 --- a/src/Pole.Sagas.Server/SagasBuffer.cs +++ b/src/Pole.Sagas.Server/SagasBuffer.cs @@ -49,7 +49,7 @@ namespace Pole.Sagas.Server } } - public async Task> GetSagas(string serviceName, DateTime dateTime, int limit) + public async Task> GetSagas(string serviceName, int limit) { try { diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 727fa6f..dfd3a2a 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -1,6 +1,8 @@ using Google.Protobuf; using Grpc.Core; +using Microsoft.Extensions.Options; using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Server.Grpc; using System; using System.Collections.Generic; @@ -14,10 +16,12 @@ namespace Pole.Sagas.Server.Services { private readonly ISagaStorage sagaStorage; private readonly ISagasBuffer sagasBuffer; - public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer) + private readonly PoleSagasServerOption poleSagasServerOption; + public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer, IOptions poleSagasServerOption) { this.sagaStorage = sagaStorage; this.sagasBuffer = sagasBuffer; + this.poleSagasServerOption = poleSagasServerOption.Value; } public override async Task ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context) { @@ -159,40 +163,46 @@ namespace Pole.Sagas.Server.Services } return commonResponse; } - public override async Task GetSagas(GetSagasRequest request, ServerCallContext context) + public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter responseStream, ServerCallContext context) { - GetSagasResponse getSagasResponse = new GetSagasResponse(); - try + while (!context.CancellationToken.IsCancellationRequested) { - var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, Convert.ToDateTime(request.AddTime), request.Limit); - var sagaDtoes = sagaEntities.Select(m => + await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds*1000); + + GetSagasResponse getSagasResponse = new GetSagasResponse(); + try { - var result = new GetSagasResponse.Types.Saga + var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, request.Limit); + var sagaDtoes = sagaEntities.Select(m => { - Id = m.Id, - }; - result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity - { - CompensateTimes = n.CompensateTimes, - ExecuteTimes = n.ExecuteTimes, - Id = n.Id, - Name = n.Id, - Order = n.Order, - ParameterData = ByteString.CopyFrom(n.ParameterData), - SagaId = n.SagaId, - Status = n.Status - })); - return result; - }); - getSagasResponse.Sagas.Add(sagaDtoes); - getSagasResponse.IsSuccess = true; - } - catch (Exception ex) - { - getSagasResponse.Errors = CombineError(ex); + var result = new GetSagasResponse.Types.Saga + { + Id = m.Id, + }; + result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = ByteString.CopyFrom(n.ParameterData), + SagaId = n.SagaId, + Status = n.Status + })); + return result; + }); + getSagasResponse.Sagas.Add(sagaDtoes); + getSagasResponse.IsSuccess = true; + } + catch (Exception ex) + { + getSagasResponse.Errors = CombineError(ex); + } + await responseStream.WriteAsync(getSagasResponse); } - return getSagasResponse; } + private string CombineError(Exception exception) { return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 9609673..4db0c32 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -34,7 +34,7 @@ namespace Pole.Sagas.Storage.PostgreSql using (var tansaction = await connection.BeginTransactionAsync()) { var updateActivitySql = -$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; +$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id"; await connection.ExecuteAsync(updateActivitySql, new { Id = activityId, @@ -230,11 +230,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@Compen using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { var updateActivitySql = -$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\""; +$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status "; var activities = await connection.QueryAsync(updateActivitySql, new { AddTime = dateTime, Limit = limit, + Status = nameof(ActivityStatus.Compensated) }); var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName); foreach (var groupedByServiceName in groupedByServiceNameActivities) @@ -271,7 +272,7 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I } } - public Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) + public Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs similarity index 96% rename from src/Pole.Sagas/Core/ISagaStorage.cs rename to src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs index 1e6e747..8a7aca0 100644 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Text; using System.Threading.Tasks; -namespace Pole.Sagas.Core +namespace Pole.Sagas.Core.Abstraction { public interface ISagaStorage { diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs index 8c98e1f..fea4a46 100644 --- a/src/Pole.Sagas/Core/ActivityEntity.cs +++ b/src/Pole.Sagas/Core/ActivityEntity.cs @@ -7,6 +7,7 @@ namespace Pole.Sagas.Core public class ActivityEntity { public string Id { get; set; } + public string Name { get; set; } public string SagaId { get; set; } public int Order { get; set; } public string Status { get; set; } diff --git a/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs b/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs index abc428b..3003c90 100644 --- a/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs +++ b/src/Pole.Sagas/Core/Exceptions/ActivityImplementIrregularException.cs @@ -4,7 +4,7 @@ using System.Text; namespace Pole.Sagas.Core.Exceptions { - class ActivityImplementIrregularException: Exception + public class ActivityImplementIrregularException : Exception { public ActivityImplementIrregularException(string name) : base($"Activity name :{name }must have and only inherit from IActivity<>") { diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs similarity index 63% rename from src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs rename to src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs index 69629b0..7e7126d 100644 --- a/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs +++ b/src/Pole.Sagas/Core/Exceptions/ActivityNotFoundWhenCompensateRetryException.cs @@ -2,9 +2,13 @@ using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Core.Abstraction +namespace Pole.Sagas.Core.Exceptions { - public interface ISagaInvoker + public class ActivityNotFoundWhenCompensateRetryException : Exception { + public ActivityNotFoundWhenCompensateRetryException(string activityName):base($"Activity:{activityName} NotFound When Compensate Retry") + { + + } } } diff --git a/src/Pole.Sagas/Core/SagaEntity.cs b/src/Pole.Sagas/Core/SagaEntity.cs index 216916d..a683231 100644 --- a/src/Pole.Sagas/Core/SagaEntity.cs +++ b/src/Pole.Sagas/Core/SagaEntity.cs @@ -8,7 +8,7 @@ namespace Pole.Sagas.Core { public string Id { get; set; } public string ServiceName { get; set; } - public List ActivityEntities { get; set; } + public List ActivityEntities { get; set; } public string Status { get; set; } public DateTime? ExpiresAt { get; set; } public DateTime AddTime { get; set; } diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index c91e985..2f42d05 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -15,7 +15,7 @@ service Saga { rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse); - rpc GetSagas (GetSagasRequest) returns (GetSagasResponse); + rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse); } message CommonResponse{ @@ -70,8 +70,7 @@ message ActivityCompensatingRequest { } message GetSagasRequest{ string serviceName = 1; - string addTime = 2; - int32 limit = 3; + int32 limit = 2; } message GetSagasResponse{ bool isSuccess = 1;