diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 9a71d2e..75a1a94 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -90,22 +90,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuting(request.ActivityId, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime)); - commonResponse.IsSuccess = true; - } - catch (Exception ex) - { - commonResponse.Errors = CombineError(ex); - } - return commonResponse; - } - public override async Task ActivityRetried(ActivityRetriedRequest request, ServerCallContext context) - { - CommonResponse commonResponse = new CommonResponse(); - try - { - var targetActivityRetryType = request.ActivityRetryType == Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate ? ActivityRetryType.Compensate : ActivityRetryType.Execute; - await sagaStorage.ActivityRetried(request.ActivityId, request.Status, request.Retries, targetActivityRetryType); + await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime), request.ExecuteTimes); commonResponse.IsSuccess = true; } catch (Exception ex) diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs index 3c236e9..ee31358 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs @@ -12,7 +12,7 @@ namespace Pole.Sagas.Storage.PostgreSql public static IServiceCollection AddPostgreSqlStorage(IServiceCollection services,Action config) { services.Configure(config); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); return services; } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 40666f4..90d3fbe 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -109,34 +109,100 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime, int executeTimes) { - throw new NotImplementedException(); - } + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + string sql = string.Empty; + if (executeTimes == 1) + { + sql = + $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"ExecuteTimes\",\"CompensateTimes\",\"AddTime\")" + + $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@ExecuteTimes,@CompensateTimes,@AddTime);"; + _ = await connection.ExecuteAsync(sql, new + { + Id = activityId, + Name = activityName, + SagaId = sagaId, + Status = nameof(ActivityStatus.Executing), + ExecutingOvertimeRetries = 0, + ParameterData = ParameterData, + ExecuteTimes = executeTimes, + CompensateTimes = 0, + AddTime = addTime + }); + } + else + { + sql = $"UPDATE {activityTableName} SET \"ExecuteTimes\"=@ExecuteTimes WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(sql, new + { + Id = activityId, + ExecuteTimes = executeTimes + }); + } - public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) - { - throw new NotImplementedException(); + } } - public Task ActivityRevoked(string activityId) + public async Task ActivityRevoked(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.Revoked) + }); + } } - public Task SagaEnded(string sagaId, DateTime ExpiresAt) + public async Task SagaEnded(string sagaId, DateTime ExpiresAt) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = sagaId, + ExpiresAt= ExpiresAt, + Status = nameof(ActivityStatus.Revoked) + }); + } } - public Task SagaStarted(string sagaId, string serviceName, DateTime addTime) + public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var sql = +$"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + + $"VALUES(@Id,@ServiceName,@Status,@AddTime);"; + await connection.ExecuteAsync(sql, new + { + Id = sagaId, + AddTime = addTime, + ServiceName=serviceName, + Status = nameof(ActivityStatus.Revoked) + }); + } } - public Task ActivityCompensating(string activityId) + public async Task ActivityCompensating(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.Compensating) + }); + } } } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs similarity index 91% rename from src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs rename to src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index 5505281..c608c0f 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -13,11 +13,11 @@ using System.Threading.Tasks; namespace Pole.Sagas.Storage.PostgreSql { - class PostgreSqlEventStorageInitializer : ISagaStorageInitializer + class PostgreSqlSagaStorageInitializer : ISagaStorageInitializer { private PoleSagasStoragePostgreSqlOption options; private readonly ILogger logger; - public PostgreSqlEventStorageInitializer(IOptions poleSagaServerOption, ILogger logger) + public PostgreSqlSagaStorageInitializer(IOptions poleSagaServerOption, ILogger logger) { this.options = poleSagaServerOption.Value; this.logger = logger; @@ -57,19 +57,19 @@ CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( ""ExpiresAt"" timestamp, ""AddTime"" timestamp NOT NULL ); -ALTER TABLE ""{options.SchemaName}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); +ALTER TABLE ""{GetSagaTableName()}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL, ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, ""Order"" int4 NOT NULL, ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""TimeOutSeconds"" int4 NOT NULL, + ""ExecuteTimes"" int4 NOT NULL, ""ParameterData"" bytea NOT NULL, ""ResultData"" bytea, ""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"", - ""ExecuteRetries"" int4 NOT NULL, - ""CompensateRetries"" int4 NOT NULL, + ""CompensateTimes"" int4 NOT NULL, ""AddTime"" timestamp NOT NULL ) ; diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 3ac7806..4d29c96 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -7,8 +7,7 @@ namespace Pole.Sagas.Core.Abstraction { Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime); - Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); + Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId,byte[] resultData); diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs index c3ecd75..fae7597 100644 --- a/src/Pole.Sagas/Core/ActivityWapper.cs +++ b/src/Pole.Sagas/Core/ActivityWapper.cs @@ -11,12 +11,16 @@ namespace Pole.Sagas.Core public class ActivityWapper { public string Id { get; set; } + public string Name { get; set; } public Type ActivityType { get; set; } public Type ActivityDataType { get; set; } public object DataObj { get; set; } public int Order { get; set; } + public int ExecuteTimes { get; set; } + public int CompensateTimes { get; set; } public ActivityStatus ActivityStatus { get; set; } public IServiceProvider ServiceProvider { get; set; } + public int TimeOutSeconds { get; set; } public CancellationTokenSource CancellationTokenSource { get; set; } public Task InvokeExecute() diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 003c29e..4de898a 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -68,26 +68,12 @@ namespace Pole.Sagas.Core } } - public async Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) - { - Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType activityRetryType = retryType == ActivityRetryType.Compensate ? Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate : Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Execute; - - var result = await sagaClient.ActivityRetriedAsync(new Server.Grpc.ActivityRetriedRequest - { - ActivityId = activityId, - ActivityRetryType = activityRetryType - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes) { var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest { ActivityId = activityId, + ActivityName = activityName, AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), Order = order, ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs index 799a6e1..f2730cb 100644 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/ISagaStorage.cs @@ -10,8 +10,7 @@ namespace Pole.Sagas.Core { Task SagaStarted(string sagaId, string serviceName,DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order,DateTime addTime); - Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); + Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId, byte[] resultData); diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 4af2fa4..fe41b55 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -25,7 +25,7 @@ namespace Pole.Sagas.Core /// /// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义 /// - private int currentExecuteOrder = 0; + private int currentExecuteOrder = 0; /// /// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义 /// @@ -43,7 +43,7 @@ namespace Pole.Sagas.Core this.activityFinder = activityFinder; Id = snowflakeIdGenerator.NextId(); } - internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder, List activities) + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, int currentExecuteOrder, int currentCompensateOrder, List activities) { this.snowflakeIdGenerator = snowflakeIdGenerator; this.serviceProvider = serviceProvider; @@ -69,6 +69,7 @@ namespace Pole.Sagas.Core var dataType = activityInterface.GetGenericArguments()[0]; ActivityWapper activityWapper = new ActivityWapper { + Name = activityName, ActivityDataType = dataType, ActivityStatus = ActivityStatus.NotStarted, ActivityType = targetActivityType, @@ -82,7 +83,7 @@ namespace Pole.Sagas.Core public async Task GetResult() { - await eventSender.SagaStarted(Id, poleSagasOption.ServiceName,DateTime.UtcNow); + await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow); var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) @@ -138,16 +139,17 @@ namespace Pole.Sagas.Core { var activityId = snowflakeIdGenerator.NextId(); activityWapper.Id = activityId; + activityWapper.ExecuteTimes++; activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); try { var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuting(activityId, Id, bytesContent, activityWapper.Order,DateTime.UtcNow); + await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { await eventSender.ActivityRevoked(activityId); - await CompensateActivity(result,currentExecuteOrder); + await CompensateActivity(result, currentExecuteOrder); return result; } await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty)); @@ -173,7 +175,7 @@ namespace Pole.Sagas.Core }; await eventSender.ActivityExecuteOvertime(activityId); // 超时的时候 需要首先补偿这个超时的操作 - return await CompensateActivity(result,currentExecuteOrder+1); + return await CompensateActivity(result, currentExecuteOrder + 1); } else { @@ -190,7 +192,7 @@ namespace Pole.Sagas.Core } } - private async Task CompensateActivity(ActivityExecuteResult result,int currentCompensateOrder) + private async Task CompensateActivity(ActivityExecuteResult result, int currentCompensateOrder) { this.currentCompensateOrder = currentCompensateOrder; currentExecuteOrder = -1; diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index b4eb80c..46350e1 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -8,7 +8,6 @@ service Saga { rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); rpc SagaEnded (SagaEndedRequest) returns (CommonResponse); rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse); - rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse); @@ -38,16 +37,8 @@ message ActivityExecutingRequest { bytes parameterData = 3; int32 order = 4; string addTime = 5; -} -message ActivityRetriedRequest { - string activityId = 1; - string status = 2; - int32 retries = 3; - ActivityRetryType activityRetryType = 4; - enum ActivityRetryType{ - Execute = 0; - Compensate = 1; - } + string activityName = 6; + int32 executeTimes = 7; } message ActivityExecuteAbortedRequest { string activityId = 1;