diff --git a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs
index 31befd7..b077b65 100644
--- a/src/Pole.Sagas.Client/Abstraction/IEventSender.cs
+++ b/src/Pole.Sagas.Client/Abstraction/IEventSender.cs
@@ -7,13 +7,19 @@ namespace Pole.Sagas.Client.Abstraction
{
Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
- Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes);
+ Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime);
Task ActivityExecuteAborted(string activityId);
+ ///
+ ///
+ ///
+ ///
+ /// sagaId 不为空 服务端会set saga.status=ended
+ ///
+ ///
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
- Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId);
+ Task ActivityOvertimeCompensated(string activityId,bool compensated);
Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime);
Task ActivityRevoked(string activityId);
- Task ActivityCompensating(string activityId,int compensateTimes);
}
}
diff --git a/src/Pole.Sagas.Client/ActivityWapper.cs b/src/Pole.Sagas.Client/ActivityWapper.cs
index aaf6d1b..21a1d63 100644
--- a/src/Pole.Sagas.Client/ActivityWapper.cs
+++ b/src/Pole.Sagas.Client/ActivityWapper.cs
@@ -17,7 +17,7 @@ namespace Pole.Sagas.Client
public Type ActivityDataType { get; set; }
public object DataObj { get; set; }
public int Order { get; set; }
- public int ExecuteTimes { get; set; }
+ public int OvertimeCompensateTimes { get; set; }
public int CompensateTimes { get; set; }
public ActivityStatus ActivityStatus { get; set; }
public IServiceProvider ServiceProvider { get; set; }
diff --git a/src/Pole.Sagas.Client/EventSender.cs b/src/Pole.Sagas.Client/EventSender.cs
index 95bd85b..035f134 100644
--- a/src/Pole.Sagas.Client/EventSender.cs
+++ b/src/Pole.Sagas.Client/EventSender.cs
@@ -44,18 +44,6 @@ namespace Pole.Sagas.Client
}
}
- public async Task ActivityExecuted(string activityId)
- {
- var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest
- {
- ActivityId = activityId,
- });
- if (!result.IsSuccess)
- {
- throw new SagasServerException(result.Errors);
- }
- }
-
public async Task ActivityExecuteAborted(string activityId)
{
var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest
@@ -68,7 +56,7 @@ namespace Pole.Sagas.Client
}
}
- public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes)
+ public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime)
{
var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
{
@@ -136,11 +124,12 @@ namespace Pole.Sagas.Client
}
}
- public async Task ActivityCompensating(string activityId, int compensateTimes)
+ public async Task ActivityOvertimeCompensated(string activityId, bool compensated)
{
- var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest
+ var result = await sagaClient.ActivityOvertimeCompensatedAsync(new Server.Grpc.ActivityOvertimeCompensatedRequest
{
ActivityId = activityId,
+ Compensated= compensated
});
if (!result.IsSuccess)
{
diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs
index 24a795b..edb93c4 100644
--- a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs
+++ b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs
@@ -58,7 +58,7 @@ namespace Pole.Sagas.Client
sagas.ForEach(async sagaEntity =>
{
var saga = sagaRestorer.CreateSaga(sagaEntity);
- await saga.Compensate();
+ await saga.CompensateWhenRetry();
});
}
}
diff --git a/src/Pole.Sagas.Client/PoleSagasOption.cs b/src/Pole.Sagas.Client/PoleSagasOption.cs
index 77a25d5..6f3ff31 100644
--- a/src/Pole.Sagas.Client/PoleSagasOption.cs
+++ b/src/Pole.Sagas.Client/PoleSagasOption.cs
@@ -8,6 +8,8 @@ namespace Pole.Sagas.Client
{
public string ServiceName { get; set; }
public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20;
+ public int MaxCompensateTimes { get; set; } = 10;
+ public int MaxOvertimeCompensateTimes { get; set; } = 10;
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public int SagasTimeOutSeconds { get; set; } = 60;
public string SagasServerHost { get; set; }
diff --git a/src/Pole.Sagas.Client/Saga.cs b/src/Pole.Sagas.Client/Saga.cs
index 9110042..a79e9b4 100644
--- a/src/Pole.Sagas.Client/Saga.cs
+++ b/src/Pole.Sagas.Client/Saga.cs
@@ -118,15 +118,20 @@ namespace Pole.Sagas.Client
var result = await RecursiveExecuteActivity(executeActivity);
return result;
}
- internal async Task Compensate()
+ ///
+ /// if true should ended this sagas ,if false do nothing continue retry
+ ///
+ ///
+ internal async Task CompensateWhenRetry()
{
- this.currentCompensateOrder = CurrentMaxOrder+1;
+ this.currentCompensateOrder = CurrentMaxOrder + 1;
var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null)
{
- return ;
+ return true;
}
await RecursiveCompensateActivity(compensateActivity);
+ return true;
}
private ActivityWapper GetNextExecuteActivity()
@@ -151,11 +156,27 @@ namespace Pole.Sagas.Client
private async Task RecursiveCompensateActivity(ActivityWapper activityWapper)
{
var activityId = activityWapper.Id;
+ if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
+ {
+ activityWapper.OvertimeCompensateTimes++;
+ }
+ else
+ {
+ activityWapper.CompensateTimes++;
+ }
try
{
- await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes);
await activityWapper.InvokeCompensate();
- await eventSender.ActivityCompensated(activityId);
+ if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
+ {
+ // 超时 补偿次数已到
+ var isCompensated = activityWapper.CompensateTimes == poleSagasOption.MaxOvertimeCompensateTimes;
+ await eventSender.ActivityOvertimeCompensated(activityId, isCompensated);
+ }
+ else
+ {
+ await eventSender.ActivityCompensated(activityId);
+ }
var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null)
{
@@ -165,19 +186,27 @@ namespace Pole.Sagas.Client
}
catch (Exception exception)
{
- await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
+ // todo: 超时操作 如果出错可能 减少 补偿次数 这里 先不做处理
+ if (activityWapper.CompensateTimes == poleSagasOption.MaxCompensateTimes)
+ {
+ // 此时 结束 saga 并且设置状态 为 Error
+ await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
+ }
+ else
+ {
+ await eventSender.ActivityCompensateAborted(activityId, string.Empty, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
+ }
}
}
private async Task RecursiveExecuteActivity(ActivityWapper activityWapper)
{
var activityId = snowflakeIdGenerator.NextId();
activityWapper.Id = activityId;
- activityWapper.ExecuteTimes++;
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
try
{
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
- await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes);
+ await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
@@ -185,7 +214,6 @@ namespace Pole.Sagas.Client
await CompensateActivity(result, currentExecuteOrder);
return result;
}
- await eventSender.ActivityExecuted(activityId);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
diff --git a/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs b/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs
index 384b82b..4ee084c 100644
--- a/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs
+++ b/src/Pole.Sagas.Server/Processor/ExpiredSagasCollectorProcessor.cs
@@ -44,7 +44,7 @@ namespace Pole.Sagas.Server.Processor
private async Task ProcessInternal()
{
- var tables = new[] { sagaStorageInitializer.GetSagaTableName(), sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName() };
+ var tables = new[] { sagaStorageInitializer.GetSagaTableName() };
foreach (var table in tables)
{
diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs
index dfd3a2a..9979d3f 100644
--- a/src/Pole.Sagas.Server/Services/SagaService.cs
+++ b/src/Pole.Sagas.Server/Services/SagaService.cs
@@ -51,20 +51,6 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
- public override async Task ActivityExecuted(ActivityExecutedRequest request, ServerCallContext context)
- {
- CommonResponse commonResponse = new CommonResponse();
- try
- {
- await sagaStorage.ActivityExecuted(request.ActivityId);
- commonResponse.IsSuccess = true;
- }
- catch (Exception ex)
- {
- commonResponse.Errors = CombineError(ex);
- }
- return commonResponse;
- }
public override async Task ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
@@ -98,7 +84,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
- await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime), request.ExecuteTimes);
+ await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
@@ -149,20 +135,6 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
- public override async Task ActivityCompensating(ActivityCompensatingRequest request, ServerCallContext context)
- {
- CommonResponse commonResponse = new CommonResponse();
- try
- {
- await sagaStorage.ActivityCompensating(request.ActivityId, request.CompensateTimes);
- commonResponse.IsSuccess = true;
- }
- catch (Exception ex)
- {
- commonResponse.Errors = CombineError(ex);
- }
- return commonResponse;
- }
public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter responseStream, ServerCallContext context)
{
while (!context.CancellationToken.IsCancellationRequested)
@@ -202,7 +174,22 @@ namespace Pole.Sagas.Server.Services
await responseStream.WriteAsync(getSagasResponse);
}
}
-
+
+ public override async Task ActivityOvertimeCompensated(ActivityOvertimeCompensatedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityOvertimeCompensated(request.ActivityId, request.Compensated);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+
private string CombineError(Exception exception)
{
return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
index 4db0c32..39afdb5 100644
--- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
+++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
@@ -16,7 +16,6 @@ namespace Pole.Sagas.Storage.PostgreSql
{
private readonly string sagaTableName;
private readonly string activityTableName;
- private readonly string overtimeCompensationGuaranteeTableName;
private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
private readonly ISagaStorageInitializer sagaStorageInitializer;
public PostgreSqlSagaStorage(IOptions poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer)
@@ -25,7 +24,6 @@ namespace Pole.Sagas.Storage.PostgreSql
this.sagaStorageInitializer = sagaStorageInitializer;
sagaTableName = sagaStorageInitializer.GetSagaTableName();
activityTableName = sagaStorageInitializer.GetActivityTableName();
- overtimeCompensationGuaranteeTableName = sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName();
}
public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
@@ -71,7 +69,7 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
}
}
- public async Task ActivityExecuted(string activityId)
+ public async Task ActivityExecuteAborted(string activityId)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
@@ -80,12 +78,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
- Status = nameof(ActivityStatus.Executed)
+ Status = nameof(ActivityStatus.ExecuteAborted)
});
}
}
- public async Task ActivityExecuteAborted(string activityId)
+ public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
@@ -94,73 +92,30 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
- Status = nameof(ActivityStatus.ExecuteAborted)
+ Status = nameof(ActivityStatus.ExecutingOvertime)
});
}
}
- public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
- {
- using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
- {
- using (var tansaction = await connection.BeginTransactionAsync())
- {
- var updateActivitySql =
-$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
- await connection.ExecuteAsync(updateActivitySql, new
- {
- Id = activityId,
- Status = nameof(ActivityStatus.ExecuteAborted)
- }, tansaction);
-
- var addOCGActivity =
-$"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status\",\"ParameterData\",\"CompensateTimes\",\"AddTime\")" +
- $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,,@CompensateTimes,@AddTime);";
- await connection.ExecuteAsync(updateActivitySql, new
- {
- Id = activityId,
- Name = name,
- ParameterData = parameterData,
- CompensateTimes = 0,
- AddTime = addTime,
- Status = nameof(OvertimeCompensationGuaranteeActivityStatus.Executing)
- }, tansaction);
- }
- }
- }
-
- public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime, int executeTimes)
+ public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
- string sql = string.Empty;
- if (executeTimes == 1)
+ string sql =
+ $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" +
+ $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);";
+ _ = await connection.ExecuteAsync(sql, new
{
- sql =
- $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"ExecuteTimes\",\"CompensateTimes\",\"AddTime\")" +
- $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@ExecuteTimes,@CompensateTimes,@AddTime);";
- _ = await connection.ExecuteAsync(sql, new
- {
- Id = activityId,
- Name = activityName,
- SagaId = sagaId,
- Status = nameof(ActivityStatus.Executing),
- ExecutingOvertimeRetries = 0,
- ParameterData = ParameterData,
- ExecuteTimes = executeTimes,
- CompensateTimes = 0,
- AddTime = addTime
- });
- }
- else
- {
- sql = $"UPDATE {activityTableName} SET \"ExecuteTimes\"=@ExecuteTimes WHERE \"Id\" = @Id";
- await connection.ExecuteAsync(sql, new
- {
- Id = activityId,
- ExecuteTimes = executeTimes
- });
- }
+ Id = activityId,
+ Name = activityName,
+ SagaId = sagaId,
+ Status = nameof(ActivityStatus.Executing),
+ ExecutingOvertimeRetries = 0,
+ ParameterData = ParameterData,
+ OvertimeCompensateTimes = 0,
+ CompensateTimes = 0,
+ AddTime = addTime
+ });
}
}
@@ -210,32 +165,18 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
}
}
- public async Task ActivityCompensating(string activityId, int compensateTimes)
- {
- using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
- {
- var updateActivitySql =
-$"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@CompensateTimes WHERE \"Id\" = @Id";
- await connection.ExecuteAsync(updateActivitySql, new
- {
- Id = activityId,
- Status = nameof(ActivityStatus.Compensating),
- CompensateTimes = compensateTimes,
- });
- }
- }
-
public async IAsyncEnumerable GetSagas(DateTime dateTime, int limit)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
-$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status ";
+$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2";
var activities = await connection.QueryAsync(updateActivitySql, new
{
AddTime = dateTime,
Limit = limit,
- Status = nameof(ActivityStatus.Compensated)
+ Status1 = nameof(ActivityStatus.Compensated),
+ Status2 = nameof(ActivityStatus.Revoked)
});
var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName);
foreach (var groupedByServiceName in groupedByServiceNameActivities)
@@ -285,5 +226,32 @@ $"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"
});
}
}
+
+ public async Task ActivityOvertimeCompensated(string activityId, bool Compensated)
+ {
+ using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
+ {
+ string updateActivitySql = string.Empty;
+ if (!Compensated)
+ {
+ updateActivitySql =
+$"UPDATE {activityTableName} SET \"OvertimeCompensateTimes\"=\"OvertimeCompensateTimes\"+1 WHERE \"Id\" = @Id";
+ await connection.ExecuteAsync(updateActivitySql, new
+ {
+ Id = activityId
+ });
+ }
+ else
+ {
+ updateActivitySql =
+$"UPDATE {activityTableName} SET \"OvertimeCompensateTimes\"=\"OvertimeCompensateTimes\"+1 ,\"Status\"=@Status WHERE \"Id\" = @Id";
+ await connection.ExecuteAsync(updateActivitySql, new
+ {
+ Id = activityId,
+ Status = nameof(ActivityStatus.Compensated)
+ });
+ }
+ }
+ }
}
}
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs
index 5baddf7..d9e6f55 100644
--- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs
+++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs
@@ -31,10 +31,6 @@ namespace Pole.Sagas.Storage.PostgreSql
{
return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
}
- public string GetOvertimeCompensationGuaranteeTableName()
- {
- return $"\"{options.SchemaName}\".\"{options.OvertimeCompensationGuaranteeTableName}\"";
- }
public async Task InitializeAsync(CancellationToken cancellationToken)
{
@@ -69,7 +65,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Order"" int4 NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
- ""ExecuteTimes"" int4 NOT NULL,
+ ""OvertimeCompensateTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL,
@@ -84,19 +80,6 @@ ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIM
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
-
-CREATE TABLE IF NOT EXISTS {GetOvertimeCompensationGuaranteeTableName()}(
- ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
- ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
- ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
- ""CompensateTimes"" int4 NOT NULL,
- ""ParameterData"" bytea NOT NULL,
- ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
- ""AddTime"" timestamp NOT NULL,
- ""ExpiresAt"" timestamp,
-);
-
-ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""OCG - Activities_pkey"" PRIMARY KEY (""Id"");
";
return batchSql;
}
diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs
index 8a7aca0..771846e 100644
--- a/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs
+++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorage.cs
@@ -12,15 +12,14 @@ namespace Pole.Sagas.Core.Abstraction
{
Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
- Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes);
+ Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime);
Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
- Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime);
Task ActivityRevoked(string activityId);
- Task ActivityCompensating(string activityId, int compensateTimes);
IAsyncEnumerable GetSagas(DateTime dateTime, int limit);
Task DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount);
+ Task ActivityOvertimeCompensated(string activityId, bool compensated);
}
}
diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
index 920d67e..1d47d3a 100644
--- a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
+++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
@@ -11,6 +11,5 @@ namespace Pole.Sagas.Core.Abstraction
Task InitializeAsync(CancellationToken cancellationToken);
string GetSagaTableName();
string GetActivityTableName();
- string GetOvertimeCompensationGuaranteeTableName();
}
}
diff --git a/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs b/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs
deleted file mode 100644
index 4d0eba5..0000000
--- a/src/Pole.Sagas/Core/OvertimeCompensationGuaranteeActivityStatus.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace Pole.Sagas.Core
-{
- public enum OvertimeCompensationGuaranteeActivityStatus
- {
- Executing,
- Executed,
- Error
- }
-}
diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto
index 2f42d05..2ebde92 100644
--- a/src/Pole.Sagas/Protos/saga.proto
+++ b/src/Pole.Sagas/Protos/saga.proto
@@ -10,12 +10,11 @@ service Saga {
rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
- rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse);
rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
- rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse);
rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse);
+ rpc ActivityOvertimeCompensated (ActivityOvertimeCompensatedRequest) returns (CommonResponse);
}
message CommonResponse{
@@ -39,7 +38,6 @@ message ActivityExecutingRequest {
int32 order = 4;
string addTime = 5;
string activityName = 6;
- int32 executeTimes = 7;
}
message ActivityExecuteAbortedRequest {
string activityId = 1;
@@ -91,4 +89,8 @@ message GetSagasResponse{
}
}
}
+message ActivityOvertimeCompensatedRequest {
+ string activityId = 1;
+ bool compensated = 2;
+}