diff --git a/samples/apis/SagasServer/Properties/launchSettings.json b/samples/apis/SagasServer/Properties/launchSettings.json index 895db6d..7f09999 100644 --- a/samples/apis/SagasServer/Properties/launchSettings.json +++ b/samples/apis/SagasServer/Properties/launchSettings.json @@ -17,7 +17,7 @@ }, "SagasServer": { "commandName": "Project", - "launchBrowser": true, + "launchBrowser": false, "applicationUrl": "http://localhost:5000", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs index d5f11fa..b6079c9 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1CompensateErrorActivity.cs @@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public Task Compensate(Transaction1Dto data,CancellationToken cancellationToken) + public Task Compensate(Transaction1Dto data) { throw new NotImplementedException(); } diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs index 8defe62..a206e37 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ExceptionActivity.cs @@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken) + public async Task Compensate(Transaction1Dto data) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs index 10f1c3d..2b11690 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1OkActivity.cs @@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken) + public async Task Compensate(Transaction1Dto data) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs index 66dfd26..cf5f8f1 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction1ReturnFalseActivity.cs @@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken) + public async Task Compensate(Transaction1Dto data) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs index 5786bc3..af5fc56 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2CompensateErrorActivity.cs @@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) + public Task Compensate(Transaction2Dto data) { throw new NotImplementedException(); } diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs index 385d74d..2cc40b3 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ExceptionActivity.cs @@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) + public async Task Compensate(Transaction2Dto data) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs index baad6cd..4f573ab 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2OkActivity.cs @@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities { this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) + public async Task Compensate(Transaction2Dto data) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs index 8b1a86f..9b8f6f3 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction2ReturnFalseActivity.cs @@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities this.httpClientFactory = httpClientFactory; } - public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken) + public async Task Compensate(Transaction2Dto data) { var httpclient = httpClientFactory.CreateClient(); httpclient.BaseAddress = new Uri("http://localhost:5000"); diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs index 50e2a59..619e5a8 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ExceptionActivity.cs @@ -11,7 +11,7 @@ namespace SagasTest.Api.Activities { public class Transaction3ExceptionActivity : IActivity { - public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken) + public Task Compensate(Transaction3Dto data) { Console.WriteLine("Transaction3 Rollback"); return Task.CompletedTask; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs index 2bbc783..4911ac1 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3HasResultActivity.cs @@ -11,7 +11,7 @@ namespace SagasTest.Api.Activities { public class Transaction3HasResultActivity : IActivity { - public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken) + public Task Compensate(Transaction3Dto data) { Console.WriteLine("Transaction3 Rollback"); return Task.CompletedTask; diff --git a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs index 84bed97..5b4756b 100644 --- a/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs +++ b/samples/apis/SagasTest.Api/Activities/Transaction3ReturnFalseActivity.cs @@ -11,7 +11,7 @@ namespace SagasTest.Api.Activities { public class Transaction3ReturnFalseActivity : IActivity { - public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken) + public Task Compensate(Transaction3Dto data) { Console.WriteLine("Transaction3 Rollback"); return Task.CompletedTask; diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index d1dcb27..6abf9d8 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -85,6 +85,7 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN var result = new List(); using (var connection = new NpgsqlConnection(options.ConnectionString)) { + await connection.OpenAsync(); using (var transaction = await connection.BeginTransactionAsync()) { var reader = await connection.ExecuteReaderAsync(sql); diff --git a/src/Pole.Sagas.Client/Abstraction/IActivity.cs b/src/Pole.Sagas.Client/Abstraction/IActivity.cs index c5995e8..0d771cb 100644 --- a/src/Pole.Sagas.Client/Abstraction/IActivity.cs +++ b/src/Pole.Sagas.Client/Abstraction/IActivity.cs @@ -10,6 +10,6 @@ namespace Pole.Sagas.Client.Abstraction public interface IActivity { Task Execute(TData data ,CancellationToken cancellationToken); - Task Compensate(TData data, CancellationToken cancellationToken); + Task Compensate(TData data); } } diff --git a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs index b077b65..90bd750 100644 --- a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs @@ -7,7 +7,7 @@ namespace Pole.Sagas.Client.Abstraction { Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime); + Task ActivityExecuting(string activityId,string activityName, string sagaId, string parameterData, int order, DateTime addTime); Task ActivityExecuteAborted(string activityId); /// /// @@ -19,7 +19,7 @@ namespace Pole.Sagas.Client.Abstraction Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensated(string activityId); Task ActivityOvertimeCompensated(string activityId,bool compensated); - Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime); + Task ActivityExecuteOvertime(string activityId); Task ActivityRevoked(string activityId); } } diff --git a/src/Pole.Sagas.Client/ActivityWapper.cs b/src/Pole.Sagas.Client/ActivityWapper.cs index 21a1d63..85b2597 100644 --- a/src/Pole.Sagas.Client/ActivityWapper.cs +++ b/src/Pole.Sagas.Client/ActivityWapper.cs @@ -47,15 +47,14 @@ namespace Pole.Sagas.Client var activityParams = Expression.Convert(activityObjParams, ActivityType); var dataObjParams = Expression.Parameter(typeof(object), "data"); var dataParams = Expression.Convert(dataObjParams, ActivityDataType); - var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct"); - var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType, typeof(CancellationToken) }); - var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams); - var func = Expression.Lambda>(body, activityObjParams, dataObjParams, cancellationTokenParams).Compile(); + var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType}); + var body = Expression.Call(activityParams, method, dataParams); + var func = Expression.Lambda>(body, activityObjParams, dataObjParams).Compile(); using (var scope = ServiceProvider.CreateScope()) { var activity = scope.ServiceProvider.GetRequiredService(ActivityType); - return func(activity, DataObj, CancellationTokenSource.Token); + return func(activity, DataObj); } } } diff --git a/src/Pole.Sagas.Client/EventSender.cs b/src/Pole.Sagas.Client/EventSender.cs index 035f134..e7290b0 100644 --- a/src/Pole.Sagas.Client/EventSender.cs +++ b/src/Pole.Sagas.Client/EventSender.cs @@ -56,7 +56,7 @@ namespace Pole.Sagas.Client } } - public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, string parameterData, int order, DateTime addTime) { var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest { @@ -64,7 +64,7 @@ namespace Pole.Sagas.Client ActivityName = activityName, AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), Order = order, - ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), + ParameterData = parameterData, SagaId = sagaId, }); if (!result.IsSuccess) @@ -100,7 +100,7 @@ namespace Pole.Sagas.Client } } - public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime) + public async Task ActivityExecuteOvertime(string activityId) { var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest { diff --git a/src/Pole.Sagas.Client/PoleSagasOption.cs b/src/Pole.Sagas.Client/PoleSagasOption.cs index 416ad7d..f7b6fb6 100644 --- a/src/Pole.Sagas.Client/PoleSagasOption.cs +++ b/src/Pole.Sagas.Client/PoleSagasOption.cs @@ -9,7 +9,7 @@ namespace Pole.Sagas.Client public string ServiceName { get; set; } public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20; public int MaxCompensateTimes { get; set; } = 10; - public int MaxOvertimeCompensateTimes { get; set; } = 10; + public int MaxOvertimeCompensateTimes { get; set; } = 3; public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; public int SagasTimeOutSeconds { get; set; } = 60; public string SagasServerHost { get; set; } diff --git a/src/Pole.Sagas.Client/Saga.cs b/src/Pole.Sagas.Client/Saga.cs index 7bbebe7..14d8ede 100644 --- a/src/Pole.Sagas.Client/Saga.cs +++ b/src/Pole.Sagas.Client/Saga.cs @@ -25,6 +25,10 @@ namespace Pole.Sagas.Client { get { return activities.Count; } } + private bool IsCompensated + { + get { return this.activities.All(m => m.ActivityStatus == ActivityStatus.Compensated); } + } /// /// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义 /// @@ -75,13 +79,13 @@ namespace Pole.Sagas.Client ActivityStatus = ActivityStatus.NotStarted, ActivityType = targetActivityType, DataObj = data, - Order = CurrentMaxOrder, + Order = CurrentMaxOrder + 1, ServiceProvider = serviceProvider, - TimeOutSeconds = 2, + TimeOutSeconds = timeOutSeconds, }; activities.Add(activityWapper); } - internal void AddActivity(string activityName, string activityStatus, object data, int order, int timeOutSeconds = 2) + internal void AddActivity(string id, string activityName, string activityStatus, string data, int order, int compensateTimes,int overtimeCompensateTimes) { var targetActivityType = activityFinder.FindType(activityName); @@ -91,16 +95,19 @@ namespace Pole.Sagas.Client throw new ActivityNotFoundWhenCompensateRetryException(activityName); } var dataType = activityInterface.GetGenericArguments()[0]; + var dataParameter = serializer.Deserialize(data, dataType); ActivityWapper activityWapper = new ActivityWapper { Name = activityName, ActivityDataType = dataType, ActivityStatus = (ActivityStatus)Enum.Parse(typeof(ActivityStatus), activityStatus), ActivityType = targetActivityType, - DataObj = data, + DataObj = dataParameter, Order = order, ServiceProvider = serviceProvider, - TimeOutSeconds = 2, + Id = id, + CompensateTimes = compensateTimes, + OvertimeCompensateTimes= overtimeCompensateTimes }; activities.Add(activityWapper); } @@ -131,11 +138,12 @@ namespace Pole.Sagas.Client return true; } await RecursiveCompensateActivity(compensateActivity); - if (activities.Any(m => m.ActivityStatus != ActivityStatus.Compensated|| m.ActivityStatus != ActivityStatus.CompensateAborted)) + // 如果补偿成功 这里返回 true + if (activities.All(m => m.ActivityStatus == ActivityStatus.Compensated)) { - return false; + return true; } - return true; + return false; } private ActivityWapper GetNextExecuteActivity() @@ -170,15 +178,24 @@ namespace Pole.Sagas.Client } try { + if (activityWapper.ActivityStatus != ActivityStatus.ExecutingOvertime) + { + activityWapper.ActivityStatus = ActivityStatus.Compensating; + } await activityWapper.InvokeCompensate(); if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime) { // 超时 补偿次数已到 - var isCompensated = activityWapper.CompensateTimes == poleSagasOption.MaxOvertimeCompensateTimes; + var isCompensated = activityWapper.OvertimeCompensateTimes >= poleSagasOption.MaxOvertimeCompensateTimes; + if (isCompensated) + { + activityWapper.ActivityStatus = ActivityStatus.Compensated; + } await eventSender.ActivityOvertimeCompensated(activityId, isCompensated); } else { + activityWapper.ActivityStatus = ActivityStatus.Compensated; await eventSender.ActivityCompensated(activityId); } var compensateActivity = GetNextCompensateActivity(); @@ -190,8 +207,9 @@ namespace Pole.Sagas.Client } catch (Exception exception) { + activityWapper.ActivityStatus = ActivityStatus.CompensateAborted; // todo: 超时操作 如果出错可能 减少 补偿次数 这里 先不做处理 - if (activityWapper.CompensateTimes == poleSagasOption.MaxCompensateTimes) + if (activityWapper.CompensateTimes >= poleSagasOption.MaxCompensateTimes) { // 此时 结束 saga 并且设置状态 为 Error await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace); @@ -206,21 +224,28 @@ namespace Pole.Sagas.Client { var activityId = snowflakeIdGenerator.NextId(); activityWapper.Id = activityId; - activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); + activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(activityWapper.TimeOutSeconds * 1000); try { - var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow); + var content = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType); + activityWapper.ActivityStatus = ActivityStatus.Executing; + await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, content, activityWapper.Order, DateTime.UtcNow); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { + activityWapper.ActivityStatus = ActivityStatus.Revoked; await eventSender.ActivityRevoked(activityId); await CompensateActivity(result, currentExecuteOrder); + var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds); + await eventSender.SagaEnded(Id, expiresAt); return result; } + activityWapper.ActivityStatus = ActivityStatus.Executed; var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) { + var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds); + await eventSender.SagaEnded(Id, expiresAt); return result; } else @@ -239,7 +264,9 @@ namespace Pole.Sagas.Client Errors = errors }; var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent, DateTime.UtcNow); + + activityWapper.ActivityStatus = ActivityStatus.ExecutingOvertime; + await eventSender.ActivityExecuteOvertime(activityId); // 超时的时候 需要首先补偿这个超时的操作 return await CompensateActivity(result, currentExecuteOrder + 1); } @@ -251,9 +278,17 @@ namespace Pole.Sagas.Client IsSuccess = false, Errors = errors }; + activityWapper.ActivityStatus = ActivityStatus.ExecuteAborted; await eventSender.ActivityExecuteAborted(activityId); // 出错的时候 需要首先补偿这个出错的操作 - return await CompensateActivity(result, currentExecuteOrder + 1); + var executeResult = await CompensateActivity(result, currentExecuteOrder + 1); + + var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds); + if (IsCompensated) + { + await eventSender.SagaEnded(Id, expiresAt); + } + return executeResult; } } } diff --git a/src/Pole.Sagas.Client/SagaRestorer.cs b/src/Pole.Sagas.Client/SagaRestorer.cs index 2e46c4a..8e72259 100644 --- a/src/Pole.Sagas.Client/SagaRestorer.cs +++ b/src/Pole.Sagas.Client/SagaRestorer.cs @@ -5,6 +5,7 @@ using Pole.Sagas.Client.Abstraction; using Pole.Sagas.Core; using System; using System.Collections.Generic; +using System.Linq; using System.Text; namespace Pole.Sagas.Client @@ -29,9 +30,9 @@ namespace Pole.Sagas.Client internal Saga CreateSaga(SagaEntity sagaEntity) { var saga = new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder, sagaEntity.Id); - foreach (var activity in sagaEntity.ActivityEntities) + foreach (var activity in sagaEntity.ActivityEntities.OrderBy(m=>m.Order)) { - saga.AddActivity(activity.Name, activity.Status, activity.ParameterData, activity.Order); + saga.AddActivity(activity.Id,activity.Name, activity.Status, activity.ParameterData, activity.Order, activity.CompensateTimes,activity.OvertimeCompensateTimes); } return saga; } diff --git a/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs index d5e521f..8d588c4 100644 --- a/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs +++ b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs @@ -17,7 +17,7 @@ using static Pole.Sagas.Server.Grpc.Saga; namespace Pole.Sagas.Client { - public class SagasCompensateRetryBackgroundService : IHostedService + public class SagasCompensateRetryBackgroundService : BackgroundService { private readonly PoleSagasOption options; private readonly SagaClient sagaClient; @@ -33,25 +33,6 @@ namespace Pole.Sagas.Client this.logger = logger; } - public async Task StartAsync(CancellationToken cancellationToken) - { - while (true) - { - try - { - await GrpcGetSagasCore(cancellationToken); - } - catch (Exception ex) - { - logger.LogError(ex, "Errors in grpc get sagas"); - } - finally - { - await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000); - } - } - } - private async Task GrpcGetSagasCore(CancellationToken cancellationToken) { using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) @@ -73,9 +54,9 @@ namespace Pole.Sagas.Client CompensateTimes = n.CompensateTimes, OvertimeCompensateTimes = n.ExecuteTimes, Id = n.Id, - Name = n.Id, + Name = n.Name, Order = n.Order, - ParameterData = n.ParameterData.ToByteArray(), + ParameterData = n.ParameterData, SagaId = n.SagaId, Status = n.Status }).ToList(); @@ -101,9 +82,24 @@ namespace Pole.Sagas.Client } } - public Task StopAsync(CancellationToken cancellationToken) + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - return Task.CompletedTask; + while (true) + { + try + { + await GrpcGetSagasCore(stoppingToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Errors in grpc get sagas"); + } + finally + { + await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000); + } + } } } } diff --git a/src/Pole.Sagas.Server/SagasBuffer.cs b/src/Pole.Sagas.Server/SagasBuffer.cs index 3995137..c733542 100644 --- a/src/Pole.Sagas.Server/SagasBuffer.cs +++ b/src/Pole.Sagas.Server/SagasBuffer.cs @@ -56,7 +56,7 @@ namespace Pole.Sagas.Server await semaphoreSlim.WaitAsync(); if (Sagas.TryGetValue(serviceName, out List sagaList)) { - var result = sagaList.Take(limit); + var result = sagaList.Take(limit).ToList(); sagaList.RemoveAll(m => result.Select(n => n.Id).Contains(m.Id)); Sagas[serviceName] = sagaList; return result; diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index a00b4a3..b54ff4d 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -70,7 +70,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.Name, request.ParameterData.ToByteArray(), Convert.ToDateTime(request.AddTime)); + await sagaStorage.ActivityExecuteOvertime(request.ActivityId); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -84,7 +84,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime)); + await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData, request.Order, Convert.ToDateTime(request.AddTime)); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -139,12 +139,16 @@ namespace Pole.Sagas.Server.Services { while (!context.CancellationToken.IsCancellationRequested) { - await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds*1000); + await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds * 1000); GetSagasResponse getSagasResponse = new GetSagasResponse(); try { var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, request.Limit); + if (sagaEntities.Count() == 0) + { + continue; + } var sagaDtoes = sagaEntities.Select(m => { var result = new GetSagasResponse.Types.Saga @@ -156,9 +160,9 @@ namespace Pole.Sagas.Server.Services CompensateTimes = n.CompensateTimes, ExecuteTimes = n.OvertimeCompensateTimes, Id = n.Id, - Name = n.Id, + Name = n.Name, Order = n.Order, - ParameterData = ByteString.CopyFrom(n.ParameterData), + ParameterData = n.ParameterData, SagaId = n.SagaId, Status = n.Status })); diff --git a/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs b/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs index e1e04db..8c8d68a 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs @@ -6,14 +6,14 @@ namespace Pole.Sagas.Storage.PostgreSql { public class ActivityAndSagaEntity { - public string Id { get; set; } + public string ActivityId { get; set; } public string SagaId { get; set; } public string ServiceName { get; set; } public int Order { get; set; } public string Status { get; set; } - public byte[] ParameterData { get; set; } + public string ParameterData { get; set; } public int OvertimeCompensateTimes { get; set; } public int CompensateTimes { get; set; } - public int Name { get; set; } + public string ActivityName { get; set; } } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index cec72c9..6e1959c 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -29,10 +29,11 @@ namespace Pole.Sagas.Storage.PostgreSql { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { + await connection.OpenAsync(); using (var tansaction = await connection.BeginTransactionAsync()) { var updateActivitySql = -$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id"; +$"UPDATE {activityTableName} SET \"Status\"=@Status,\"CompensateErrors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id"; await connection.ExecuteAsync(updateActivitySql, new { Id = activityId, @@ -42,7 +43,7 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"Compen if (!string.IsNullOrEmpty(sagaId)) { var updateSagaSql = -$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; +$"UPDATE {sagaTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; await connection.ExecuteAsync(updateSagaSql, new { Id = sagaId, @@ -83,7 +84,7 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime) + public async Task ActivityExecuteOvertime(string activityId) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { @@ -97,19 +98,20 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, string ParameterData, int order, DateTime addTime) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { string sql = - $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" + - $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);"; + $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"Order\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" + + $"VALUES(@Id,@Name,@SagaId,@Status,@Order,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);"; _ = await connection.ExecuteAsync(sql, new { Id = activityId, Name = activityName, SagaId = sagaId, Status = nameof(ActivityStatus.Executing), + Order = order, ExecutingOvertimeRetries = 0, ParameterData = ParameterData, OvertimeCompensateTimes = 0, @@ -143,7 +145,7 @@ $"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE { Id = sagaId, ExpiresAt = ExpiresAt, - Status = nameof(ActivityStatus.Revoked) + Status = nameof(SagaStatus.Ended) }); } } @@ -160,7 +162,7 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + Id = sagaId, AddTime = addTime, ServiceName = serviceName, - Status = nameof(ActivityStatus.Revoked) + Status = nameof(SagaStatus.Started) }); } } @@ -170,7 +172,7 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { var sql = -$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"OvertimeCompensateTimes\",activities.\"CompensateTimes\",activities.\"Name\" from {activityTableName} as activities inner join(select \"Id\",\"ServiceName\" from {sagaTableName} where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2"; +$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"OvertimeCompensateTimes\",activities.\"CompensateTimes\",activities.\"Name\" as ActivityName from {activityTableName} as activities inner join(select \"Id\",\"ServiceName\" from {sagaTableName} where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2"; var activities = await connection.QueryAsync(sql, new { AddTime = dateTime, @@ -198,11 +200,12 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I { CompensateTimes = activity.CompensateTimes, OvertimeCompensateTimes = activity.OvertimeCompensateTimes, - Id = activity.Id, + Id = activity.ActivityId, Order = activity.Order, ParameterData = activity.ParameterData, SagaId = activity.SagaId, Status = activity.Status, + Name= activity.ActivityName }; sagaEntity.ActivityEntities.Add(activityEntity); } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index 1241899..a2963fc 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -63,9 +63,9 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL, ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, ""Order"" int4 NOT NULL, - ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Status"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, ""OvertimeCompensateTimes"" int4 NOT NULL, - ""ParameterData"" bytea NOT NULL, + ""ParameterData"" text NOT NULL, ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", ""CompensateTimes"" int4 NOT NULL, ""AddTime"" timestamp NOT NULL, diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs index 771846e..81f6c83 100644 --- a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs @@ -12,11 +12,11 @@ namespace Pole.Sagas.Core.Abstraction { Task SagaStarted(string sagaId, string serviceName,DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime); + Task ActivityExecuting(string activityId, string activityName,string sagaId, string ParameterData, int order,DateTime addTime); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime); + Task ActivityExecuteOvertime(string activityId); Task ActivityRevoked(string activityId); IAsyncEnumerable GetSagas(DateTime dateTime, int limit); Task DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount); diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs index 9121c6a..73d7abd 100644 --- a/src/Pole.Sagas/Core/ActivityEntity.cs +++ b/src/Pole.Sagas/Core/ActivityEntity.cs @@ -11,12 +11,8 @@ namespace Pole.Sagas.Core public string SagaId { get; set; } public int Order { get; set; } public string Status { get; set; } - public int TimeOutSeconds { get; set; } - public Byte[] ParameterData { get; set; } - public Byte[] ResultData { get; set; } - public string Errors { get; set; } + public string ParameterData { get; set; } public int OvertimeCompensateTimes { get; set; } public int CompensateTimes { get; set; } - public DateTime AddTime { get; set; } } } diff --git a/src/Pole.Sagas/Core/Exceptions/CompensateActivityException.cs b/src/Pole.Sagas/Core/Exceptions/CompensateActivityException.cs new file mode 100644 index 0000000..99ae932 --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/CompensateActivityException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class CompensateActivityException : Exception + { + public CompensateActivityException(string activityName,Exception innerException):base($"activity: {activityName} compensate error", innerException) + { + + } + } +} diff --git a/src/Pole.Sagas/Core/SagaEntity.cs b/src/Pole.Sagas/Core/SagaEntity.cs index a683231..732891d 100644 --- a/src/Pole.Sagas/Core/SagaEntity.cs +++ b/src/Pole.Sagas/Core/SagaEntity.cs @@ -8,7 +8,7 @@ namespace Pole.Sagas.Core { public string Id { get; set; } public string ServiceName { get; set; } - public List ActivityEntities { get; set; } + public List ActivityEntities { get; set; } = new List(); public string Status { get; set; } public DateTime? ExpiresAt { get; set; } public DateTime AddTime { get; set; } diff --git a/src/Pole.Sagas/Core/SagasGroupEntity.cs b/src/Pole.Sagas/Core/SagasGroupEntity.cs index 5392af6..6c3b903 100644 --- a/src/Pole.Sagas/Core/SagasGroupEntity.cs +++ b/src/Pole.Sagas/Core/SagasGroupEntity.cs @@ -8,6 +8,6 @@ namespace Pole.Sagas.Core public class SagasGroupEntity { public string ServiceName { get; set; } - public List SagaEntities { get; set; } + public List SagaEntities { get; set; } = new List(); } } diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index 2ebde92..2492dc3 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -34,7 +34,7 @@ message SagaEndedRequest { message ActivityExecutingRequest { string activityId = 1; string sagaId = 2; - bytes parameterData = 3; + string parameterData = 3; int32 order = 4; string addTime = 5; string activityName = 6; @@ -55,9 +55,6 @@ message ActivityCompensatedRequest { } message ActivityExecuteOvertimeRequest { string activityId = 1; - string name = 2; - bytes parameterData = 3; - string addTime = 4; } message ActivityRevokedRequest { string activityId = 1; @@ -82,7 +79,7 @@ message GetSagasResponse{ string sagaId = 2; int32 order = 3; string status = 4; - bytes parameterData = 5; + string parameterData = 5; int32 executeTimes = 6; int32 compensateTimes = 7; string name = 8;