From 824f076bfdd8617111aad237db0c44d5ee24b041 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Tue, 10 Mar 2020 15:13:00 +0800 Subject: [PATCH] 完成部分存储功能 --- src/Pole.Sagas.Server/Services/SagaService.cs | 17 +---------------- src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs | 2 +- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs | 84 ------------------------------------------------------------------------------------ src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas/Core/Abstraction/IEventSender.cs | 3 +-- src/Pole.Sagas/Core/ActivityWapper.cs | 4 ++++ src/Pole.Sagas/Core/EventSender.cs | 18 ++---------------- src/Pole.Sagas/Core/ISagaStorage.cs | 3 +-- src/Pole.Sagas/Core/Saga.cs | 16 +++++++++------- src/Pole.Sagas/Protos/saga.proto | 13 ++----------- 11 files changed, 185 insertions(+), 153 deletions(-) delete mode 100644 src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs create mode 100644 src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 9a71d2e..75a1a94 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -90,22 +90,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuting(request.ActivityId, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime)); - commonResponse.IsSuccess = true; - } - catch (Exception ex) - { - commonResponse.Errors = CombineError(ex); - } - return commonResponse; - } - public override async Task ActivityRetried(ActivityRetriedRequest request, ServerCallContext context) - { - CommonResponse commonResponse = new CommonResponse(); - try - { - var targetActivityRetryType = request.ActivityRetryType == Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate ? ActivityRetryType.Compensate : ActivityRetryType.Execute; - await sagaStorage.ActivityRetried(request.ActivityId, request.Status, request.Retries, targetActivityRetryType); + await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime), request.ExecuteTimes); commonResponse.IsSuccess = true; } catch (Exception ex) diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs index 3c236e9..ee31358 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs @@ -12,7 +12,7 @@ namespace Pole.Sagas.Storage.PostgreSql public static IServiceCollection AddPostgreSqlStorage(IServiceCollection services,Action config) { services.Configure(config); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); return services; } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs deleted file mode 100644 index 5505281..0000000 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs +++ /dev/null @@ -1,84 +0,0 @@ -using Dapper; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Npgsql; -using Pole.Core.EventBus.EventStorage; -using Pole.Sagas.Core; -using Pole.Sagas.Core.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Sagas.Storage.PostgreSql -{ - class PostgreSqlEventStorageInitializer : ISagaStorageInitializer - { - private PoleSagasStoragePostgreSqlOption options; - private readonly ILogger logger; - public PostgreSqlEventStorageInitializer(IOptions poleSagaServerOption, ILogger logger) - { - this.options = poleSagaServerOption.Value; - this.logger = logger; - } - public string GetActivityTableName() - { - return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\""; - } - - public string GetSagaTableName() - { - return $"\"{options.SchemaName}\".\"{options.SagaTableName}\""; - } - - public async Task InitializeAsync(CancellationToken cancellationToken) - { - if (cancellationToken.IsCancellationRequested) return; - - var sql = CreateDbTablesScript(options.SchemaName); - using (var connection = new NpgsqlConnection(options.ConnectionString)) - { - await connection.ExecuteAsync(sql); - } - - logger.LogDebug("Ensuring all create database tables script are applied."); - } - - private string CreateDbTablesScript(string schemaName) - { - var batchSql = $@" -CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; - -CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( - ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""ExpiresAt"" timestamp, - ""AddTime"" timestamp NOT NULL -); -ALTER TABLE ""{options.SchemaName}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); - -CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( - ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""Order"" int4 NOT NULL, - ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, - ""TimeOutSeconds"" int4 NOT NULL, - ""ParameterData"" bytea NOT NULL, - ""ResultData"" bytea, - ""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"", - ""ExecuteRetries"" int4 NOT NULL, - ""CompensateRetries"" int4 NOT NULL, - ""AddTime"" timestamp NOT NULL -) -; -CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree ( - ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST -); -ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""); - "; - return batchSql; - } - } -} diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 40666f4..90d3fbe 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -109,34 +109,100 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime, int executeTimes) { - throw new NotImplementedException(); - } + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + string sql = string.Empty; + if (executeTimes == 1) + { + sql = + $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"ExecuteTimes\",\"CompensateTimes\",\"AddTime\")" + + $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@ExecuteTimes,@CompensateTimes,@AddTime);"; + _ = await connection.ExecuteAsync(sql, new + { + Id = activityId, + Name = activityName, + SagaId = sagaId, + Status = nameof(ActivityStatus.Executing), + ExecutingOvertimeRetries = 0, + ParameterData = ParameterData, + ExecuteTimes = executeTimes, + CompensateTimes = 0, + AddTime = addTime + }); + } + else + { + sql = $"UPDATE {activityTableName} SET \"ExecuteTimes\"=@ExecuteTimes WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(sql, new + { + Id = activityId, + ExecuteTimes = executeTimes + }); + } - public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) - { - throw new NotImplementedException(); + } } - public Task ActivityRevoked(string activityId) + public async Task ActivityRevoked(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.Revoked) + }); + } } - public Task SagaEnded(string sagaId, DateTime ExpiresAt) + public async Task SagaEnded(string sagaId, DateTime ExpiresAt) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = sagaId, + ExpiresAt= ExpiresAt, + Status = nameof(ActivityStatus.Revoked) + }); + } } - public Task SagaStarted(string sagaId, string serviceName, DateTime addTime) + public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var sql = +$"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + + $"VALUES(@Id,@ServiceName,@Status,@AddTime);"; + await connection.ExecuteAsync(sql, new + { + Id = sagaId, + AddTime = addTime, + ServiceName=serviceName, + Status = nameof(ActivityStatus.Revoked) + }); + } } - public Task ActivityCompensating(string activityId) + public async Task ActivityCompensating(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.Compensating) + }); + } } } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs new file mode 100644 index 0000000..c608c0f --- /dev/null +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -0,0 +1,84 @@ +using Dapper; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Npgsql; +using Pole.Core.EventBus.EventStorage; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Storage.PostgreSql +{ + class PostgreSqlSagaStorageInitializer : ISagaStorageInitializer + { + private PoleSagasStoragePostgreSqlOption options; + private readonly ILogger logger; + public PostgreSqlSagaStorageInitializer(IOptions poleSagaServerOption, ILogger logger) + { + this.options = poleSagaServerOption.Value; + this.logger = logger; + } + public string GetActivityTableName() + { + return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\""; + } + + public string GetSagaTableName() + { + return $"\"{options.SchemaName}\".\"{options.SagaTableName}\""; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + + var sql = CreateDbTablesScript(options.SchemaName); + using (var connection = new NpgsqlConnection(options.ConnectionString)) + { + await connection.ExecuteAsync(sql); + } + + logger.LogDebug("Ensuring all create database tables script are applied."); + } + + private string CreateDbTablesScript(string schemaName) + { + var batchSql = $@" +CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; + +CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( + ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""ExpiresAt"" timestamp, + ""AddTime"" timestamp NOT NULL +); +ALTER TABLE ""{GetSagaTableName()}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); + +CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( + ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Order"" int4 NOT NULL, + ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""ExecuteTimes"" int4 NOT NULL, + ""ParameterData"" bytea NOT NULL, + ""ResultData"" bytea, + ""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"", + ""CompensateTimes"" int4 NOT NULL, + ""AddTime"" timestamp NOT NULL +) +; +CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree ( + ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST +); +ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""); + "; + return batchSql; + } + } +} diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 3ac7806..4d29c96 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -7,8 +7,7 @@ namespace Pole.Sagas.Core.Abstraction { Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime); - Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); + Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId,byte[] resultData); diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs index c3ecd75..fae7597 100644 --- a/src/Pole.Sagas/Core/ActivityWapper.cs +++ b/src/Pole.Sagas/Core/ActivityWapper.cs @@ -11,12 +11,16 @@ namespace Pole.Sagas.Core public class ActivityWapper { public string Id { get; set; } + public string Name { get; set; } public Type ActivityType { get; set; } public Type ActivityDataType { get; set; } public object DataObj { get; set; } public int Order { get; set; } + public int ExecuteTimes { get; set; } + public int CompensateTimes { get; set; } public ActivityStatus ActivityStatus { get; set; } public IServiceProvider ServiceProvider { get; set; } + public int TimeOutSeconds { get; set; } public CancellationTokenSource CancellationTokenSource { get; set; } public Task InvokeExecute() diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 003c29e..4de898a 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -68,26 +68,12 @@ namespace Pole.Sagas.Core } } - public async Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) - { - Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType activityRetryType = retryType == ActivityRetryType.Compensate ? Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate : Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Execute; - - var result = await sagaClient.ActivityRetriedAsync(new Server.Grpc.ActivityRetriedRequest - { - ActivityId = activityId, - ActivityRetryType = activityRetryType - }); - if (!result.IsSuccess) - { - throw new SagasServerException(result.Errors); - } - } - - public async Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes) { var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest { ActivityId = activityId, + ActivityName = activityName, AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), Order = order, ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs index 799a6e1..f2730cb 100644 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/ISagaStorage.cs @@ -10,8 +10,7 @@ namespace Pole.Sagas.Core { Task SagaStarted(string sagaId, string serviceName,DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order,DateTime addTime); - Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); + Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId, byte[] resultData); diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 4af2fa4..fe41b55 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -25,7 +25,7 @@ namespace Pole.Sagas.Core /// /// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义 /// - private int currentExecuteOrder = 0; + private int currentExecuteOrder = 0; /// /// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义 /// @@ -43,7 +43,7 @@ namespace Pole.Sagas.Core this.activityFinder = activityFinder; Id = snowflakeIdGenerator.NextId(); } - internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder, List activities) + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, int currentExecuteOrder, int currentCompensateOrder, List activities) { this.snowflakeIdGenerator = snowflakeIdGenerator; this.serviceProvider = serviceProvider; @@ -69,6 +69,7 @@ namespace Pole.Sagas.Core var dataType = activityInterface.GetGenericArguments()[0]; ActivityWapper activityWapper = new ActivityWapper { + Name = activityName, ActivityDataType = dataType, ActivityStatus = ActivityStatus.NotStarted, ActivityType = targetActivityType, @@ -82,7 +83,7 @@ namespace Pole.Sagas.Core public async Task GetResult() { - await eventSender.SagaStarted(Id, poleSagasOption.ServiceName,DateTime.UtcNow); + await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow); var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) @@ -138,16 +139,17 @@ namespace Pole.Sagas.Core { var activityId = snowflakeIdGenerator.NextId(); activityWapper.Id = activityId; + activityWapper.ExecuteTimes++; activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); try { var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuting(activityId, Id, bytesContent, activityWapper.Order,DateTime.UtcNow); + await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { await eventSender.ActivityRevoked(activityId); - await CompensateActivity(result,currentExecuteOrder); + await CompensateActivity(result, currentExecuteOrder); return result; } await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty)); @@ -173,7 +175,7 @@ namespace Pole.Sagas.Core }; await eventSender.ActivityExecuteOvertime(activityId); // 超时的时候 需要首先补偿这个超时的操作 - return await CompensateActivity(result,currentExecuteOrder+1); + return await CompensateActivity(result, currentExecuteOrder + 1); } else { @@ -190,7 +192,7 @@ namespace Pole.Sagas.Core } } - private async Task CompensateActivity(ActivityExecuteResult result,int currentCompensateOrder) + private async Task CompensateActivity(ActivityExecuteResult result, int currentCompensateOrder) { this.currentCompensateOrder = currentCompensateOrder; currentExecuteOrder = -1; diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index b4eb80c..46350e1 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -8,7 +8,6 @@ service Saga { rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); rpc SagaEnded (SagaEndedRequest) returns (CommonResponse); rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse); - rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse); @@ -38,16 +37,8 @@ message ActivityExecutingRequest { bytes parameterData = 3; int32 order = 4; string addTime = 5; -} -message ActivityRetriedRequest { - string activityId = 1; - string status = 2; - int32 retries = 3; - ActivityRetryType activityRetryType = 4; - enum ActivityRetryType{ - Execute = 0; - Compensate = 1; - } + string activityName = 6; + int32 executeTimes = 7; } message ActivityExecuteAbortedRequest { string activityId = 1; -- libgit2 0.25.0