From 7ef7a1a7cb91cb4170d65bbbc8153f88c362b9cd Mon Sep 17 00:00:00 2001
From: dingsongjie <dingsongjie@DESKTOP-0AS088R>
Date: Wed, 11 Mar 2020 18:02:57 +0800
Subject: [PATCH] 完成 90%

---
 src/Pole.Sagas.Client/Abstraction/IEventSender.cs                      |  12 +++++++++---
 src/Pole.Sagas.Client/ActivityWapper.cs                                |   2 +-
 src/Pole.Sagas.Client/EventSender.cs                                   |  19 ++++---------------
 src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs |   2 +-
 src/Pole.Sagas.Client/PoleSagasOption.cs                               |   2 ++
 src/Pole.Sagas.Client/Saga.cs                                          |  46 +++++++++++++++++++++++++++++++++++++---------
 src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs      |   2 +-
 src/Pole.Sagas.Server/Services/SagaService.cs                          |  47 +++++++++++++++++------------------------------
 src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs             | 130 +++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------------------------------------------
 src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs  |  19 +------------------
 src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs                        |   5 ++---
 src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs             |   1 -
 src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs     |  13 -------------
 src/Pole.Sagas/Protos/saga.proto                                       |   8 +++++---
 14 files changed, 129 insertions(+), 179 deletions(-)
 delete mode 100644 src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs

diff --git a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs
index 31befd7..b077b65 100644
--- a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs
+++ b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs
@@ -7,13 +7,19 @@ namespace Pole.Sagas.Client.Abstraction
     {
         Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
         Task SagaEnded(string sagaId, DateTime ExpiresAt);
-        Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes);
+        Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime);
         Task ActivityExecuteAborted(string activityId);
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="activityId"></param>
+        /// <param name="sagaId">sagaId 不为空 服务端会set saga.status=ended</param>
+        /// <param name="errors"></param>
+        /// <returns></returns>
         Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
-        Task ActivityExecuted(string activityId);
         Task ActivityCompensated(string activityId);
+        Task ActivityOvertimeCompensated(string activityId,bool compensated);
         Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime);
         Task ActivityRevoked(string activityId);
-        Task ActivityCompensating(string activityId,int compensateTimes);
     }
 }
diff --git a/src/Pole.Sagas.Client/ActivityWapper.cs b/src/Pole.Sagas.Client/ActivityWapper.cs
index aaf6d1b..21a1d63 100644
--- a/src/Pole.Sagas.Client/ActivityWapper.cs
+++ b/src/Pole.Sagas.Client/ActivityWapper.cs
@@ -17,7 +17,7 @@ namespace Pole.Sagas.Client
         public Type ActivityDataType { get; set; }
         public object DataObj { get; set; }
         public int Order { get; set; }
-        public int ExecuteTimes { get; set; }
+        public int OvertimeCompensateTimes { get; set; }
         public int CompensateTimes { get; set; }
         public ActivityStatus ActivityStatus { get; set; }
         public IServiceProvider ServiceProvider { get; set; }
diff --git a/src/Pole.Sagas.Client/EventSender.cs b/src/Pole.Sagas.Client/EventSender.cs
index 95bd85b..035f134 100644
--- a/src/Pole.Sagas.Client/EventSender.cs
+++ b/src/Pole.Sagas.Client/EventSender.cs
@@ -44,18 +44,6 @@ namespace Pole.Sagas.Client
             }
         }
 
-        public async Task ActivityExecuted(string activityId)
-        {
-            var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest
-            {
-                ActivityId = activityId,
-            });
-            if (!result.IsSuccess)
-            {
-                throw new SagasServerException(result.Errors);
-            }
-        }
-
         public async Task ActivityExecuteAborted(string activityId)
         {
             var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest
@@ -68,7 +56,7 @@ namespace Pole.Sagas.Client
             }
         }
 
-        public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes)
+        public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime)
         {
             var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
             {
@@ -136,11 +124,12 @@ namespace Pole.Sagas.Client
             }
         }
 
-        public async Task ActivityCompensating(string activityId, int compensateTimes)
+        public async Task ActivityOvertimeCompensated(string activityId, bool compensated)
         {
-            var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest
+            var result = await sagaClient.ActivityOvertimeCompensatedAsync(new Server.Grpc.ActivityOvertimeCompensatedRequest
             {
                 ActivityId = activityId,
+                Compensated= compensated
             });
             if (!result.IsSuccess)
             {
diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs
index 24a795b..edb93c4 100644
--- a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs
+++ b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs
@@ -58,7 +58,7 @@ namespace Pole.Sagas.Client
                         sagas.ForEach(async sagaEntity =>
                        {
                            var saga = sagaRestorer.CreateSaga(sagaEntity);
-                           await saga.Compensate();
+                           await saga.CompensateWhenRetry();
                        });
                     }
                 }
diff --git a/src/Pole.Sagas.Client/PoleSagasOption.cs b/src/Pole.Sagas.Client/PoleSagasOption.cs
index 77a25d5..6f3ff31 100644
--- a/src/Pole.Sagas.Client/PoleSagasOption.cs
+++ b/src/Pole.Sagas.Client/PoleSagasOption.cs
@@ -8,6 +8,8 @@ namespace Pole.Sagas.Client
     {
         public string ServiceName { get; set; }
         public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20;
+        public int MaxCompensateTimes { get; set; } = 10;
+        public int MaxOvertimeCompensateTimes { get; set; } = 10;
         public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
         public int SagasTimeOutSeconds { get; set; } = 60;
         public string SagasServerHost { get; set; }
diff --git a/src/Pole.Sagas.Client/Saga.cs b/src/Pole.Sagas.Client/Saga.cs
index 9110042..a79e9b4 100644
--- a/src/Pole.Sagas.Client/Saga.cs
+++ b/src/Pole.Sagas.Client/Saga.cs
@@ -118,15 +118,20 @@ namespace Pole.Sagas.Client
             var result = await RecursiveExecuteActivity(executeActivity);
             return result;
         }
-        internal async Task Compensate()
+        /// <summary>
+        /// if true should ended this sagas ,if false do nothing continue retry
+        /// </summary>
+        /// <returns></returns>
+        internal async Task<bool> CompensateWhenRetry()
         {
-            this.currentCompensateOrder = CurrentMaxOrder+1;
+            this.currentCompensateOrder = CurrentMaxOrder + 1;
             var compensateActivity = GetNextCompensateActivity();
             if (compensateActivity == null)
             {
-                return ;
+                return true;
             }
             await RecursiveCompensateActivity(compensateActivity);
+            return true;
         }
 
         private ActivityWapper GetNextExecuteActivity()
@@ -151,11 +156,27 @@ namespace Pole.Sagas.Client
         private async Task RecursiveCompensateActivity(ActivityWapper activityWapper)
         {
             var activityId = activityWapper.Id;
+            if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
+            {
+                activityWapper.OvertimeCompensateTimes++;
+            }
+            else
+            {
+                activityWapper.CompensateTimes++;
+            }
             try
             {
-                await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes);
                 await activityWapper.InvokeCompensate();
-                await eventSender.ActivityCompensated(activityId);
+                if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
+                {
+                    // 超时 补偿次数已到
+                    var isCompensated = activityWapper.CompensateTimes == poleSagasOption.MaxOvertimeCompensateTimes;
+                    await eventSender.ActivityOvertimeCompensated(activityId, isCompensated);
+                }
+                else
+                {
+                    await eventSender.ActivityCompensated(activityId);
+                }
                 var compensateActivity = GetNextCompensateActivity();
                 if (compensateActivity == null)
                 {
@@ -165,19 +186,27 @@ namespace Pole.Sagas.Client
             }
             catch (Exception exception)
             {
-                await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
+                // todo: 超时操作 如果出错可能 减少 补偿次数 这里 先不做处理
+                if (activityWapper.CompensateTimes == poleSagasOption.MaxCompensateTimes)
+                {
+                    // 此时 结束 saga 并且设置状态 为 Error
+                    await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
+                }
+                else
+                {
+                    await eventSender.ActivityCompensateAborted(activityId, string.Empty, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
+                }
             }
         }
         private async Task<ActivityExecuteResult> RecursiveExecuteActivity(ActivityWapper activityWapper)
         {
             var activityId = snowflakeIdGenerator.NextId();
             activityWapper.Id = activityId;
-            activityWapper.ExecuteTimes++;
             activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
             try
             {
                 var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
-                await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes);
+                await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow);
                 var result = await activityWapper.InvokeExecute();
                 if (!result.IsSuccess)
                 {
@@ -185,7 +214,6 @@ namespace Pole.Sagas.Client
                     await CompensateActivity(result, currentExecuteOrder);
                     return result;
                 }
-                await eventSender.ActivityExecuted(activityId);
                 var executeActivity = GetNextExecuteActivity();
                 if (executeActivity == null)
                 {
diff --git a/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs b/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs
index 384b82b..4ee084c 100644
--- a/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs
+++ b/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs
@@ -44,7 +44,7 @@ namespace Pole.Sagas.Server.Processor
 
         private async Task ProcessInternal()
         {
-            var tables = new[] { sagaStorageInitializer.GetSagaTableName(), sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName() };
+            var tables = new[] { sagaStorageInitializer.GetSagaTableName() };
 
             foreach (var table in tables)
             {
diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs
index dfd3a2a..9979d3f 100644
--- a/src/Pole.Sagas.Server/Services/SagaService.cs
+++ b/src/Pole.Sagas.Server/Services/SagaService.cs
@@ -51,20 +51,6 @@ namespace Pole.Sagas.Server.Services
             }
             return commonResponse;
         }
-        public override async Task<CommonResponse> ActivityExecuted(ActivityExecutedRequest request, ServerCallContext context)
-        {
-            CommonResponse commonResponse = new CommonResponse();
-            try
-            {
-                await sagaStorage.ActivityExecuted(request.ActivityId);
-                commonResponse.IsSuccess = true;
-            }
-            catch (Exception ex)
-            {
-                commonResponse.Errors = CombineError(ex);
-            }
-            return commonResponse;
-        }
         public override async Task<CommonResponse> ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context)
         {
             CommonResponse commonResponse = new CommonResponse();
@@ -98,7 +84,7 @@ namespace Pole.Sagas.Server.Services
             CommonResponse commonResponse = new CommonResponse();
             try
             {
-                await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime), request.ExecuteTimes);
+                await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
                 commonResponse.IsSuccess = true;
             }
             catch (Exception ex)
@@ -149,20 +135,6 @@ namespace Pole.Sagas.Server.Services
             }
             return commonResponse;
         }
-        public override async Task<CommonResponse> ActivityCompensating(ActivityCompensatingRequest request, ServerCallContext context)
-        {
-            CommonResponse commonResponse = new CommonResponse();
-            try
-            {
-                await sagaStorage.ActivityCompensating(request.ActivityId, request.CompensateTimes);
-                commonResponse.IsSuccess = true;
-            }
-            catch (Exception ex)
-            {
-                commonResponse.Errors = CombineError(ex);
-            }
-            return commonResponse;
-        }
         public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter<GetSagasResponse> responseStream, ServerCallContext context)
         {
             while (!context.CancellationToken.IsCancellationRequested)
@@ -202,7 +174,22 @@ namespace Pole.Sagas.Server.Services
                 await responseStream.WriteAsync(getSagasResponse);
             }
         }
-       
+
+        public override async Task<CommonResponse> ActivityOvertimeCompensated(ActivityOvertimeCompensatedRequest request, ServerCallContext context)
+        {
+            CommonResponse commonResponse = new CommonResponse();
+            try
+            {
+                await sagaStorage.ActivityOvertimeCompensated(request.ActivityId, request.Compensated);
+                commonResponse.IsSuccess = true;
+            }
+            catch (Exception ex)
+            {
+                commonResponse.Errors = CombineError(ex);
+            }
+            return commonResponse;
+        }
+
         private string CombineError(Exception exception)
         {
             return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
index 4db0c32..39afdb5 100644
--- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
+++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
@@ -16,7 +16,6 @@ namespace Pole.Sagas.Storage.PostgreSql
     {
         private readonly string sagaTableName;
         private readonly string activityTableName;
-        private readonly string overtimeCompensationGuaranteeTableName;
         private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
         private readonly ISagaStorageInitializer sagaStorageInitializer;
         public PostgreSqlSagaStorage(IOptions<PoleSagasStoragePostgreSqlOption> poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer)
@@ -25,7 +24,6 @@ namespace Pole.Sagas.Storage.PostgreSql
             this.sagaStorageInitializer = sagaStorageInitializer;
             sagaTableName = sagaStorageInitializer.GetSagaTableName();
             activityTableName = sagaStorageInitializer.GetActivityTableName();
-            overtimeCompensationGuaranteeTableName = sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName();
         }
         public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
         {
@@ -71,7 +69,7 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
             }
         }
 
-        public async Task ActivityExecuted(string activityId)
+        public async Task ActivityExecuteAborted(string activityId)
         {
             using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
             {
@@ -80,12 +78,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status  WHERE \"Id\" = @Id";
                 await connection.ExecuteAsync(updateActivitySql, new
                 {
                     Id = activityId,
-                    Status = nameof(ActivityStatus.Executed)
+                    Status = nameof(ActivityStatus.ExecuteAborted)
                 });
             }
         }
 
-        public async Task ActivityExecuteAborted(string activityId)
+        public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
         {
             using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
             {
@@ -94,73 +92,30 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status  WHERE \"Id\" = @Id";
                 await connection.ExecuteAsync(updateActivitySql, new
                 {
                     Id = activityId,
-                    Status = nameof(ActivityStatus.ExecuteAborted)
+                    Status = nameof(ActivityStatus.ExecutingOvertime)
                 });
             }
         }
 
-        public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
-        {
-            using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
-            {
-                using (var tansaction = await connection.BeginTransactionAsync())
-                {
-                    var updateActivitySql =
-$"UPDATE {activityTableName} SET \"Status\"=@Status  WHERE \"Id\" = @Id";
-                    await connection.ExecuteAsync(updateActivitySql, new
-                    {
-                        Id = activityId,
-                        Status = nameof(ActivityStatus.ExecuteAborted)
-                    }, tansaction);
-
-                    var addOCGActivity =
-$"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status\",\"ParameterData\",\"CompensateTimes\",\"AddTime\")" +
-               $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,,@CompensateTimes,@AddTime);";
-                    await connection.ExecuteAsync(updateActivitySql, new
-                    {
-                        Id = activityId,
-                        Name = name,
-                        ParameterData = parameterData,
-                        CompensateTimes = 0,
-                        AddTime = addTime,
-                        Status = nameof(OvertimeCompensationGuaranteeActivityStatus.Executing)
-                    }, tansaction);
-                }
-            }
-        }
-
-        public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime, int executeTimes)
+        public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime)
         {
             using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
             {
-                string sql = string.Empty;
-                if (executeTimes == 1)
+                string sql =
+           $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" +
+           $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);";
+                _ = await connection.ExecuteAsync(sql, new
                 {
-                    sql =
-               $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"ExecuteTimes\",\"CompensateTimes\",\"AddTime\")" +
-               $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@ExecuteTimes,@CompensateTimes,@AddTime);";
-                    _ = await connection.ExecuteAsync(sql, new
-                    {
-                        Id = activityId,
-                        Name = activityName,
-                        SagaId = sagaId,
-                        Status = nameof(ActivityStatus.Executing),
-                        ExecutingOvertimeRetries = 0,
-                        ParameterData = ParameterData,
-                        ExecuteTimes = executeTimes,
-                        CompensateTimes = 0,
-                        AddTime = addTime
-                    });
-                }
-                else
-                {
-                    sql = $"UPDATE {activityTableName} SET \"ExecuteTimes\"=@ExecuteTimes  WHERE \"Id\" = @Id";
-                    await connection.ExecuteAsync(sql, new
-                    {
-                        Id = activityId,
-                        ExecuteTimes = executeTimes
-                    });
-                }
+                    Id = activityId,
+                    Name = activityName,
+                    SagaId = sagaId,
+                    Status = nameof(ActivityStatus.Executing),
+                    ExecutingOvertimeRetries = 0,
+                    ParameterData = ParameterData,
+                    OvertimeCompensateTimes = 0,
+                    CompensateTimes = 0,
+                    AddTime = addTime
+                });
             }
         }
 
@@ -210,32 +165,18 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
             }
         }
 
-        public async Task ActivityCompensating(string activityId, int compensateTimes)
-        {
-            using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
-            {
-                var updateActivitySql =
-$"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@CompensateTimes WHERE \"Id\" = @Id";
-                await connection.ExecuteAsync(updateActivitySql, new
-                {
-                    Id = activityId,
-                    Status = nameof(ActivityStatus.Compensating),
-                    CompensateTimes = compensateTimes,
-                });
-            }
-        }
-
         public async IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit)
         {
             using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
             {
                 var updateActivitySql =
-$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities  inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status ";
+$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities  inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2";
                 var activities = await connection.QueryAsync<ActivityAndSagaEntity>(updateActivitySql, new
                 {
                     AddTime = dateTime,
                     Limit = limit,
-                    Status = nameof(ActivityStatus.Compensated)
+                    Status1 = nameof(ActivityStatus.Compensated),
+                    Status2 = nameof(ActivityStatus.Revoked)
                 });
                 var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName);
                 foreach (var groupedByServiceName in groupedByServiceNameActivities)
@@ -285,5 +226,32 @@ $"delete {tableName}   WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"
                 });
             }
         }
+
+        public async Task ActivityOvertimeCompensated(string activityId, bool Compensated)
+        {
+            using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
+            {
+                string updateActivitySql = string.Empty;
+                if (!Compensated)
+                {
+                    updateActivitySql =
+$"UPDATE {activityTableName} SET \"OvertimeCompensateTimes\"=\"OvertimeCompensateTimes\"+1 WHERE \"Id\" = @Id";
+                    await connection.ExecuteAsync(updateActivitySql, new
+                    {
+                        Id = activityId
+                    });
+                }
+                else
+                {
+                    updateActivitySql =
+$"UPDATE {activityTableName} SET \"OvertimeCompensateTimes\"=\"OvertimeCompensateTimes\"+1 ,\"Status\"=@Status WHERE \"Id\" = @Id";
+                    await connection.ExecuteAsync(updateActivitySql, new
+                    {
+                        Id = activityId,
+                        Status = nameof(ActivityStatus.Compensated)
+                    });
+                }
+            }
+        }
     }
 }
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs
index 5baddf7..d9e6f55 100644
--- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs
+++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs
@@ -31,10 +31,6 @@ namespace Pole.Sagas.Storage.PostgreSql
         {
             return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
         }
-        public string GetOvertimeCompensationGuaranteeTableName()
-        {
-            return $"\"{options.SchemaName}\".\"{options.OvertimeCompensationGuaranteeTableName}\"";
-        }
 
         public async Task InitializeAsync(CancellationToken cancellationToken)
         {
@@ -69,7 +65,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
   ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
   ""Order"" int4 NOT NULL,
   ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
-  ""ExecuteTimes"" int4 NOT NULL,
+  ""OvertimeCompensateTimes"" int4 NOT NULL,
   ""ParameterData"" bytea NOT NULL,
   ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
   ""CompensateTimes"" int4 NOT NULL,
@@ -84,19 +80,6 @@ ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIM
 
 
 ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
-
-CREATE TABLE IF NOT EXISTS {GetOvertimeCompensationGuaranteeTableName()}(
-  ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
-  ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
-  ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
-  ""CompensateTimes"" int4 NOT NULL,
-  ""ParameterData"" bytea NOT NULL,
-  ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
-  ""AddTime"" timestamp NOT NULL,
-  ""ExpiresAt"" timestamp,
-);
-
-ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""OCG - Activities_pkey"" PRIMARY KEY (""Id"");
             ";
             return batchSql;
         }
diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs
index 8a7aca0..771846e 100644
--- a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs
+++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs
@@ -12,15 +12,14 @@ namespace Pole.Sagas.Core.Abstraction
     {
         Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
         Task SagaEnded(string sagaId, DateTime ExpiresAt);
-        Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes);
+        Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime);
         Task ActivityExecuteAborted(string activityId);
         Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
-        Task ActivityExecuted(string activityId);
         Task ActivityCompensated(string activityId);
         Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime);
         Task ActivityRevoked(string activityId);
-        Task ActivityCompensating(string activityId, int compensateTimes);
         IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit);
         Task<int> DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount);
+        Task ActivityOvertimeCompensated(string activityId, bool compensated);
     }
 }
diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
index 920d67e..1d47d3a 100644
--- a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
+++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
@@ -11,6 +11,5 @@ namespace Pole.Sagas.Core.Abstraction
         Task InitializeAsync(CancellationToken cancellationToken);
         string GetSagaTableName();
         string GetActivityTableName();
-        string GetOvertimeCompensationGuaranteeTableName();
     }
 }
diff --git a/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs b/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs
deleted file mode 100644
index 4d0eba5..0000000
--- a/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace Pole.Sagas.Core
-{
-    public enum OvertimeCompensationGuaranteeActivityStatus
-    {
-        Executing,
-        Executed,
-        Error
-    }
-}
diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto
index 2f42d05..2ebde92 100644
--- a/src/Pole.Sagas/Protos/saga.proto
+++ b/src/Pole.Sagas/Protos/saga.proto
@@ -10,12 +10,11 @@ service Saga {
   rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse);
   rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
   rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
-  rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse);
   rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
   rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
   rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
-  rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse);
   rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse);
+  rpc ActivityOvertimeCompensated (ActivityOvertimeCompensatedRequest) returns (CommonResponse);
 }
 
 message CommonResponse{
@@ -39,7 +38,6 @@ message ActivityExecutingRequest {
   int32 order = 4;
   string addTime = 5;
   string activityName = 6;
-  int32 executeTimes = 7;
 }
 message ActivityExecuteAbortedRequest {
   string activityId = 1;
@@ -91,4 +89,8 @@ message GetSagasResponse{
 	}
   }
 }
+message ActivityOvertimeCompensatedRequest {
+  string activityId = 1;
+  bool compensated = 2;
+}
 
--
libgit2 0.25.0