diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs index d12faec..6e928b2 100644 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -12,7 +12,7 @@ using System.Threading.Tasks; namespace Pole.Core.Processor { - public class PendingMessageRetryProcessor : ProcessorBase + class PendingMessageRetryProcessor : ProcessorBase { private readonly IEventStorage eventStorage; private readonly PoleOptions options; diff --git a/src/Pole.Sagas.Server/ISagasBuffer.cs b/src/Pole.Sagas.Server/ISagasBuffer.cs new file mode 100644 index 0000000..e9bb538 --- /dev/null +++ b/src/Pole.Sagas.Server/ISagasBuffer.cs @@ -0,0 +1,15 @@ +using Pole.Sagas.Core; +using Pole.Sagas.Server.Grpc; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Server +{ + public interface ISagasBuffer + { + Task> GetSagas(string serviceName, DateTime dateTime, int limit); + Task AddSagas(IAsyncEnumerable sagasGroupEntities); + } +} diff --git a/src/Pole.Sagas.Server/PoleSagasServerOption.cs b/src/Pole.Sagas.Server/PoleSagasServerOption.cs new file mode 100644 index 0000000..20a8d1c --- /dev/null +++ b/src/Pole.Sagas.Server/PoleSagasServerOption.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Server +{ + public class PoleSagasServerOption + { + public int NotEndedSagasFetchIntervalSeconds { get; set; } = 10; + public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60; + public int ExpiredDataDeleteBatchCount { get; set; } = 1000; + public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3; + } +} diff --git a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs index 126f699..c31dc47 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs @@ -1,4 +1,6 @@ using Microsoft.Extensions.DependencyInjection; +using Pole.Core.Processor; +using Pole.Sagas.Server.Processor; using System; using System.Collections.Generic; using System.Text; @@ -11,6 +13,9 @@ namespace Pole.Sagas.Server { services.AddGrpc(); + services.AddSingleton(); + services.AddSingleton(); + services.AddHostedService(); return services; } } diff --git a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs new file mode 100644 index 0000000..7664971 --- /dev/null +++ b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs @@ -0,0 +1,48 @@ +using Microsoft.Extensions.Hosting; +using Pole.Core.Processor; +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System.Linq; + +namespace Pole.Sagas.Server.Processor +{ + public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + { + private readonly IServiceProvider _serviceProvider; + private Task _compositeTask; + + public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + public async Task Start(CancellationToken stoppingToken) + { + var eventStorageInitializer = _serviceProvider.GetService(); + await eventStorageInitializer.InitializeAsync(stoppingToken); + + ProcessingContext processingContext = new ProcessingContext(stoppingToken); + List loopProcessors = new List(); + var innerProcessors = _serviceProvider.GetServices(); + var loggerFactory = _serviceProvider.GetService(); + foreach (var innerProcessor in innerProcessors) + { + LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory); + loopProcessors.Add(processor); + } + var tasks = loopProcessors.Select(p => p.Process(processingContext)); + + _compositeTask = Task.WhenAll(tasks); + await _compositeTask; + } + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return Start(stoppingToken); + } + } +} diff --git a/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs b/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs new file mode 100644 index 0000000..384b82b --- /dev/null +++ b/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core.Processor; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Server.Processor +{ + class ExpiredSagasCollectorProcessor : ProcessorBase + { + private readonly ISagaStorage sagaStorage; + private readonly PoleSagasServerOption options; + private readonly ILogger logger; + private readonly ISagaStorageInitializer sagaStorageInitializer; + public ExpiredSagasCollectorProcessor(ISagaStorage sagaStorage, IOptions options, ILogger logger, ISagaStorageInitializer sagaStorageInitializer) + { + this.sagaStorage = sagaStorage; + this.options = options.Value ?? throw new Exception($"{nameof(PoleSagasServerOption)} Must be injected"); + this.logger = logger; + this.sagaStorageInitializer = sagaStorageInitializer; + } + public override string Name => nameof(NotEndedSagasFetchProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + await ProcessInternal(); + } + catch (Exception ex) + { + logger.LogError(ex, $"{nameof(NotEndedSagasFetchProcessor)} Process Error"); + } + finally + { + await Task.Delay(options.ExpiredDataBulkDeleteIntervalSeconds * 1000); + } + } + + private async Task ProcessInternal() + { + var tables = new[] { sagaStorageInitializer.GetSagaTableName(), sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName() }; + + foreach (var table in tables) + { + logger.LogDebug($"Collecting expired data from table: {table}"); + + int deletedCount; + var time = DateTime.UtcNow; + do + { + deletedCount = await sagaStorage.DeleteExpiredData(table, time, options.ExpiredDataDeleteBatchCount); + + if (deletedCount == options.ExpiredDataDeleteBatchCount) + { + await Task.Delay(options.ExpiredDataPreBulkDeleteDelaySeconds * 1000); + } + } while (deletedCount == options.ExpiredDataDeleteBatchCount); + } + } + } +} diff --git a/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs b/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs new file mode 100644 index 0000000..b8d02fc --- /dev/null +++ b/src/Pole.Sagas.Server/Processor/NotEndedSagasFetchProcessor.cs @@ -0,0 +1,52 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core.Processor; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Server.Processor +{ + class NotEndedSagasFetchProcessor : ProcessorBase + { + private readonly ISagaStorage sagaStorage; + private readonly PoleSagasServerOption options; + private readonly ILogger logger; + private readonly ISagasBuffer sagasBuffer; + public NotEndedSagasFetchProcessor(ISagaStorage sagaStorage, IOptions options, ILogger logger, + ISagasBuffer sagasBuffer) + { + this.sagaStorage = sagaStorage; + this.options = options.Value ?? throw new Exception($"{nameof(PoleSagasServerOption)} Must be injected"); + this.logger = logger; + this.sagasBuffer = sagasBuffer; + } + public override string Name => nameof(NotEndedSagasFetchProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + await ProcessInternal(); + } + catch (Exception ex) + { + logger.LogError(ex, $"{nameof(NotEndedSagasFetchProcessor)} Process Error"); + } + finally + { + await Task.Delay(options.NotEndedSagasFetchIntervalSeconds * 1000); + } + } + + private async Task ProcessInternal() + { + var addTimeFilter = DateTime.UtcNow.AddMinutes(-4); + var sagas = sagaStorage.GetSagas(addTimeFilter, 500); + await sagasBuffer.AddSagas(sagas); + } + } +} diff --git a/src/Pole.Sagas.Server/SagasBuffer.cs b/src/Pole.Sagas.Server/SagasBuffer.cs new file mode 100644 index 0000000..0fb6854 --- /dev/null +++ b/src/Pole.Sagas.Server/SagasBuffer.cs @@ -0,0 +1,76 @@ +using Microsoft.Extensions.Logging; +using Pole.Sagas.Core; +using Pole.Sagas.Server.Grpc; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Server +{ + class SagasBuffer : ISagasBuffer + { + private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); + private readonly Dictionary> Sagas = new Dictionary>(); + private readonly ILogger logger; + public SagasBuffer(ILogger logger) + { + this.logger = logger; + } + public async Task AddSagas(IAsyncEnumerable sagasGroupEntities) + { + try + { + await semaphoreSlim.WaitAsync(); + await foreach (var sagasGroupEntity in sagasGroupEntities) + { + if (!Sagas.ContainsKey(sagasGroupEntity.ServiceName)) + { + Sagas.TryAdd(sagasGroupEntity.ServiceName, sagasGroupEntity.SagaEntities); + } + else + { + // 这里必然为true + Sagas.TryGetValue(sagasGroupEntity.ServiceName, out List sagaList); + sagaList.AddRange(sagasGroupEntity.SagaEntities); + } + } + return true; + } + catch (Exception ex) + { + throw ex; + } + finally + { + semaphoreSlim.Release(); + } + } + + public async Task> GetSagas(string serviceName, DateTime dateTime, int limit) + { + try + { + await semaphoreSlim.WaitAsync(); + if (Sagas.TryGetValue(serviceName, out List sagaList)) + { + var result = sagaList.Take(limit); + sagaList.RemoveAll(m => result.Select(n => n.Id).Contains(m.Id)); + Sagas[serviceName] = sagaList; + return result; + } + return Enumerable.Empty(); + } + catch (Exception ex) + { + throw ex; + } + finally + { + semaphoreSlim.Release(); + } + } + } +} diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index f14fb85..727fa6f 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -1,8 +1,10 @@ -using Grpc.Core; +using Google.Protobuf; +using Grpc.Core; using Pole.Sagas.Core; using Pole.Sagas.Server.Grpc; using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading.Tasks; @@ -11,9 +13,11 @@ namespace Pole.Sagas.Server.Services public class SagaService : Pole.Sagas.Server.Grpc.Saga.SagaBase { private readonly ISagaStorage sagaStorage; - public SagaService(ISagaStorage sagaStorage) + private readonly ISagasBuffer sagasBuffer; + public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer) { this.sagaStorage = sagaStorage; + this.sagasBuffer = sagasBuffer; } public override async Task ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context) { @@ -76,7 +80,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuteOvertime(request.ActivityId); + await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.Name, request.ParameterData.ToByteArray(), Convert.ToDateTime(request.AddTime)); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -118,7 +122,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.SagaEnded(request.SagaId,Convert.ToDateTime(request.ExpiresAt)); + await sagaStorage.SagaEnded(request.SagaId, Convert.ToDateTime(request.ExpiresAt)); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -132,7 +136,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.SagaStarted(request.SagaId,request.ServiceName,Convert.ToDateTime( request.AddTime)); + await sagaStorage.SagaStarted(request.SagaId, request.ServiceName, Convert.ToDateTime(request.AddTime)); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -155,6 +159,40 @@ namespace Pole.Sagas.Server.Services } return commonResponse; } + public override async Task GetSagas(GetSagasRequest request, ServerCallContext context) + { + GetSagasResponse getSagasResponse = new GetSagasResponse(); + try + { + var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, Convert.ToDateTime(request.AddTime), request.Limit); + var sagaDtoes = sagaEntities.Select(m => + { + var result = new GetSagasResponse.Types.Saga + { + Id = m.Id, + }; + result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = ByteString.CopyFrom(n.ParameterData), + SagaId = n.SagaId, + Status = n.Status + })); + return result; + }); + getSagasResponse.Sagas.Add(sagaDtoes); + getSagasResponse.IsSuccess = true; + } + catch (Exception ex) + { + getSagasResponse.Errors = CombineError(ex); + } + return getSagasResponse; + } private string CombineError(Exception exception) { return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; diff --git a/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs b/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs new file mode 100644 index 0000000..a96eeea --- /dev/null +++ b/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Storage.PostgreSql +{ + public class ActivityAndSagaEntity + { + public string Id { get; set; } + public string SagaId { get; set; } + public string ServiceName { get; set; } + public int Order { get; set; } + public string Status { get; set; } + public byte[] ParameterData { get; set; } + public int ExecuteTimes { get; set; } + public int CompensateTimes { get; set; } + public int Name { get; set; } + } +} diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs index 0ae681e..eb0aa2a 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs @@ -6,9 +6,10 @@ namespace Pole.Sagas.Storage.PostgreSql { public class PoleSagasStoragePostgreSqlOption { - public string SagaTableName { get; set; } - public string SchemaName { get; set; } - public string ActivityTableName { get; set; } + public string SagaTableName { get; set; } = "Sagas"; + public string SchemaName { get; set; } = "pole-sagas"; + public string ActivityTableName { get; set; } = "Activities"; + public string OvertimeCompensationGuaranteeTableName { get; set; } = "OCG-Activities"; public int SagasRecoveryIntervalSecond { get; set; } public string ConnectionString { get; set; } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 5e83ed7..9609673 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -3,8 +3,10 @@ using Microsoft.Extensions.Options; using Npgsql; using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Server.Grpc; using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading.Tasks; @@ -14,6 +16,7 @@ namespace Pole.Sagas.Storage.PostgreSql { private readonly string sagaTableName; private readonly string activityTableName; + private readonly string overtimeCompensationGuaranteeTableName; private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption; private readonly ISagaStorageInitializer sagaStorageInitializer; public PostgreSqlSagaStorage(IOptions poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer) @@ -22,6 +25,7 @@ namespace Pole.Sagas.Storage.PostgreSql this.sagaStorageInitializer = sagaStorageInitializer; sagaTableName = sagaStorageInitializer.GetSagaTableName(); activityTableName = sagaStorageInitializer.GetActivityTableName(); + overtimeCompensationGuaranteeTableName = sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName(); } public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) { @@ -95,17 +99,33 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public async Task ActivityExecuteOvertime(string activityId) + public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { - var updateActivitySql = -$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; - await connection.ExecuteAsync(updateActivitySql, new + using (var tansaction = await connection.BeginTransactionAsync()) { - Id = activityId, - Status = nameof(ActivityStatus.ExecutingOvertime) - }); + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.ExecuteAborted) + }, tansaction); + + var addOCGActivity = +$"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status\",\"ParameterData\",\"CompensateTimes\",\"AddTime\")" + + $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,,@CompensateTimes,@AddTime);"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Name = name, + ParameterData = parameterData, + CompensateTimes = 0, + AddTime = addTime, + Status = nameof(OvertimeCompensationGuaranteeActivityStatus.Executing) + }, tansaction); + } } } @@ -167,7 +187,7 @@ $"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE await connection.ExecuteAsync(updateActivitySql, new { Id = sagaId, - ExpiresAt= ExpiresAt, + ExpiresAt = ExpiresAt, Status = nameof(ActivityStatus.Revoked) }); } @@ -184,7 +204,7 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + { Id = sagaId, AddTime = addTime, - ServiceName=serviceName, + ServiceName = serviceName, Status = nameof(ActivityStatus.Revoked) }); } @@ -200,7 +220,67 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@Compen { Id = activityId, Status = nameof(ActivityStatus.Compensating), - CompensateTimes= compensateTimes, + CompensateTimes = compensateTimes, + }); + } + } + + public async IAsyncEnumerable GetSagas(DateTime dateTime, int limit) + { + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\""; + var activities = await connection.QueryAsync(updateActivitySql, new + { + AddTime = dateTime, + Limit = limit, + }); + var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName); + foreach (var groupedByServiceName in groupedByServiceNameActivities) + { + SagasGroupEntity sagasGroupEntity = new SagasGroupEntity + { + ServiceName = groupedByServiceName.Key, + }; + var groupedBySagaIds = groupedByServiceName.GroupBy(m => m.SagaId); + foreach (var groupedBySagaId in groupedBySagaIds) + { + SagaEntity sagaEntity = new SagaEntity + { + Id = groupedBySagaId.Key + }; + foreach (var activity in groupedBySagaId) + { + ActivityEntity activityEntity = new ActivityEntity + { + CompensateTimes = activity.CompensateTimes, + ExecuteTimes = activity.ExecuteTimes, + Id = activity.Id, + Order = activity.Order, + ParameterData = activity.ParameterData, + SagaId = activity.SagaId, + Status = activity.Status, + }; + sagaEntity.ActivityEntities.Add(activityEntity); + } + sagasGroupEntity.SagaEntities.Add(sagaEntity); + } + yield return sagasGroupEntity; + } + } + } + + public Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) + { + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var sql = +$"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"Id\" FROM {tableName} LIMIT @BatchCount);"; + return connection.ExecuteAsync(sql, new + { + ExpiredAt = ExpiredAt, + BatchCount = batchCount, }); } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index 7236b40..5baddf7 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -31,6 +31,10 @@ namespace Pole.Sagas.Storage.PostgreSql { return $"\"{options.SchemaName}\".\"{options.SagaTableName}\""; } + public string GetOvertimeCompensationGuaranteeTableName() + { + return $"\"{options.SchemaName}\".\"{options.OvertimeCompensationGuaranteeTableName}\""; + } public async Task InitializeAsync(CancellationToken cancellationToken) { @@ -70,12 +74,29 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", ""CompensateTimes"" int4 NOT NULL, ""AddTime"" timestamp NOT NULL -) -; +); + CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree ( ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST ); + ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""); + + +ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION; + +CREATE TABLE IF NOT EXISTS {GetOvertimeCompensationGuaranteeTableName()}( + ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""CompensateTimes"" int4 NOT NULL, + ""ParameterData"" bytea NOT NULL, + ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", + ""AddTime"" timestamp NOT NULL, + ""ExpiresAt"" timestamp, +); + +ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""OCG - Activities_pkey"" PRIMARY KEY (""Id""); "; return batchSql; } diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 4b059fc..d4cb597 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -12,7 +12,7 @@ namespace Pole.Sagas.Core.Abstraction Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId); Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId); + Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime); Task ActivityRevoked(string activityId); Task ActivityCompensating(string activityId,int compensateTimes); } diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs index 1d47d3a..920d67e 100644 --- a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs +++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs @@ -11,5 +11,6 @@ namespace Pole.Sagas.Core.Abstraction Task InitializeAsync(CancellationToken cancellationToken); string GetSagaTableName(); string GetActivityTableName(); + string GetOvertimeCompensationGuaranteeTableName(); } } diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs index b1e6ab9..8c98e1f 100644 --- a/src/Pole.Sagas/Core/ActivityEntity.cs +++ b/src/Pole.Sagas/Core/ActivityEntity.cs @@ -14,8 +14,8 @@ namespace Pole.Sagas.Core public Byte[] ParameterData { get; set; } public Byte[] ResultData { get; set; } public string Errors { get; set; } - public int ExecuteRetries { get; set; } - public int CompensateRetries { get; set; } + public int ExecuteTimes { get; set; } + public int CompensateTimes { get; set; } public DateTime AddTime { get; set; } } } diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index bdc8a84..0ddbe6c 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -111,7 +111,7 @@ namespace Pole.Sagas.Core } } - public async Task ActivityExecuteOvertime(string activityId) + public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime) { var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest { diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs index 1c090ca..1e6e747 100644 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/ISagaStorage.cs @@ -1,4 +1,6 @@ -using Pole.Sagas.Core; +using Google.Protobuf.Collections; +using Pole.Sagas.Core; +using Pole.Sagas.Server.Grpc; using System; using System.Collections.Generic; using System.Text; @@ -15,8 +17,10 @@ namespace Pole.Sagas.Core Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId); Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId); + Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime); Task ActivityRevoked(string activityId); Task ActivityCompensating(string activityId, int compensateTimes); + IAsyncEnumerable GetSagas(DateTime dateTime, int limit); + Task DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount); } } diff --git a/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs b/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs new file mode 100644 index 0000000..4d0eba5 --- /dev/null +++ b/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public enum OvertimeCompensationGuaranteeActivityStatus + { + Executing, + Executed, + Error + } +} diff --git a/src/Pole.Sagas/Core/PoleSagasOption.cs b/src/Pole.Sagas/Core/PoleSagasOption.cs index 7b81db1..adb205e 100644 --- a/src/Pole.Sagas/Core/PoleSagasOption.cs +++ b/src/Pole.Sagas/Core/PoleSagasOption.cs @@ -8,6 +8,7 @@ namespace Pole.Sagas.Core { public string ServiceName { get; set; } public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; + public int SagasTimeOutSeconds { get; set; } = 60; public string SagasServerHost { get; set; } } } diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 9815dbf..9ef4ae7 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -6,6 +6,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace Pole.Sagas.Core @@ -84,7 +85,6 @@ namespace Pole.Sagas.Core public async Task GetResult() { await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow); - var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) { @@ -173,7 +173,8 @@ namespace Pole.Sagas.Core IsSuccess = false, Errors = errors }; - await eventSender.ActivityExecuteOvertime(activityId); + var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); + await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent,DateTime.UtcNow); // 超时的时候 需要首先补偿这个超时的操作 return await CompensateActivity(result, currentExecuteOrder + 1); } diff --git a/src/Pole.Sagas/Core/SagaEntity.cs b/src/Pole.Sagas/Core/SagaEntity.cs index 3b5b9e9..216916d 100644 --- a/src/Pole.Sagas/Core/SagaEntity.cs +++ b/src/Pole.Sagas/Core/SagaEntity.cs @@ -6,7 +6,7 @@ namespace Pole.Sagas.Core { public class SagaEntity { - public int Id { get; set; } + public string Id { get; set; } public string ServiceName { get; set; } public List ActivityEntities { get; set; } public string Status { get; set; } diff --git a/src/Pole.Sagas/Core/SagasGroupEntity.cs b/src/Pole.Sagas/Core/SagasGroupEntity.cs new file mode 100644 index 0000000..5392af6 --- /dev/null +++ b/src/Pole.Sagas/Core/SagasGroupEntity.cs @@ -0,0 +1,13 @@ +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public class SagasGroupEntity + { + public string ServiceName { get; set; } + public List SagaEntities { get; set; } + } +} diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index 1d9077a..c91e985 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -15,6 +15,7 @@ service Saga { rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse); + rpc GetSagas (GetSagasRequest) returns (GetSagasResponse); } message CommonResponse{ @@ -56,13 +57,39 @@ message ActivityCompensatedRequest { } message ActivityExecuteOvertimeRequest { string activityId = 1; + string name = 2; + bytes parameterData = 3; + string addTime = 4; } message ActivityRevokedRequest { string activityId = 1; } message ActivityCompensatingRequest { string activityId = 1; - int32 CompensateTimes = 2; + int32 compensateTimes = 2; +} +message GetSagasRequest{ + string serviceName = 1; + string addTime = 2; + int32 limit = 3; +} +message GetSagasResponse{ + bool isSuccess = 1; + string errors = 2; + repeated Saga Sagas = 3; + message Saga{ + string id = 1; + repeated Activity Activities = 2; + message Activity{ + string id = 1; + string sagaId = 2; + int32 order = 3; + string status = 4; + bytes parameterData = 5; + int32 executeTimes = 6; + int32 compensateTimes = 7; + string name = 8; + } + } } -