Commit 7ef7a1a7 by dingsongjie

完成 90%

parent 92eeb084
...@@ -7,13 +7,19 @@ namespace Pole.Sagas.Client.Abstraction ...@@ -7,13 +7,19 @@ namespace Pole.Sagas.Client.Abstraction
{ {
Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt); Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime);
Task ActivityExecuteAborted(string activityId); Task ActivityExecuteAborted(string activityId);
/// <summary>
///
/// </summary>
/// <param name="activityId"></param>
/// <param name="sagaId">sagaId 不为空 服务端会set saga.status=ended</param>
/// <param name="errors"></param>
/// <returns></returns>
Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId); Task ActivityCompensated(string activityId);
Task ActivityOvertimeCompensated(string activityId,bool compensated);
Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime); Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime);
Task ActivityRevoked(string activityId); Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId,int compensateTimes);
} }
} }
...@@ -17,7 +17,7 @@ namespace Pole.Sagas.Client ...@@ -17,7 +17,7 @@ namespace Pole.Sagas.Client
public Type ActivityDataType { get; set; } public Type ActivityDataType { get; set; }
public object DataObj { get; set; } public object DataObj { get; set; }
public int Order { get; set; } public int Order { get; set; }
public int ExecuteTimes { get; set; } public int OvertimeCompensateTimes { get; set; }
public int CompensateTimes { get; set; } public int CompensateTimes { get; set; }
public ActivityStatus ActivityStatus { get; set; } public ActivityStatus ActivityStatus { get; set; }
public IServiceProvider ServiceProvider { get; set; } public IServiceProvider ServiceProvider { get; set; }
......
...@@ -44,18 +44,6 @@ namespace Pole.Sagas.Client ...@@ -44,18 +44,6 @@ namespace Pole.Sagas.Client
} }
} }
public async Task ActivityExecuted(string activityId)
{
var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest
{
ActivityId = activityId,
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
}
public async Task ActivityExecuteAborted(string activityId) public async Task ActivityExecuteAborted(string activityId)
{ {
var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest
...@@ -68,7 +56,7 @@ namespace Pole.Sagas.Client ...@@ -68,7 +56,7 @@ namespace Pole.Sagas.Client
} }
} }
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes) public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime)
{ {
var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
{ {
...@@ -136,11 +124,12 @@ namespace Pole.Sagas.Client ...@@ -136,11 +124,12 @@ namespace Pole.Sagas.Client
} }
} }
public async Task ActivityCompensating(string activityId, int compensateTimes) public async Task ActivityOvertimeCompensated(string activityId, bool compensated)
{ {
var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest var result = await sagaClient.ActivityOvertimeCompensatedAsync(new Server.Grpc.ActivityOvertimeCompensatedRequest
{ {
ActivityId = activityId, ActivityId = activityId,
Compensated= compensated
}); });
if (!result.IsSuccess) if (!result.IsSuccess)
{ {
......
...@@ -58,7 +58,7 @@ namespace Pole.Sagas.Client ...@@ -58,7 +58,7 @@ namespace Pole.Sagas.Client
sagas.ForEach(async sagaEntity => sagas.ForEach(async sagaEntity =>
{ {
var saga = sagaRestorer.CreateSaga(sagaEntity); var saga = sagaRestorer.CreateSaga(sagaEntity);
await saga.Compensate(); await saga.CompensateWhenRetry();
}); });
} }
} }
......
...@@ -8,6 +8,8 @@ namespace Pole.Sagas.Client ...@@ -8,6 +8,8 @@ namespace Pole.Sagas.Client
{ {
public string ServiceName { get; set; } public string ServiceName { get; set; }
public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20; public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20;
public int MaxCompensateTimes { get; set; } = 10;
public int MaxOvertimeCompensateTimes { get; set; } = 10;
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public int SagasTimeOutSeconds { get; set; } = 60; public int SagasTimeOutSeconds { get; set; } = 60;
public string SagasServerHost { get; set; } public string SagasServerHost { get; set; }
......
...@@ -118,15 +118,20 @@ namespace Pole.Sagas.Client ...@@ -118,15 +118,20 @@ namespace Pole.Sagas.Client
var result = await RecursiveExecuteActivity(executeActivity); var result = await RecursiveExecuteActivity(executeActivity);
return result; return result;
} }
internal async Task Compensate() /// <summary>
/// if true should ended this sagas ,if false do nothing continue retry
/// </summary>
/// <returns></returns>
internal async Task<bool> CompensateWhenRetry()
{ {
this.currentCompensateOrder = CurrentMaxOrder+1; this.currentCompensateOrder = CurrentMaxOrder + 1;
var compensateActivity = GetNextCompensateActivity(); var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null) if (compensateActivity == null)
{ {
return ; return true;
} }
await RecursiveCompensateActivity(compensateActivity); await RecursiveCompensateActivity(compensateActivity);
return true;
} }
private ActivityWapper GetNextExecuteActivity() private ActivityWapper GetNextExecuteActivity()
...@@ -151,11 +156,27 @@ namespace Pole.Sagas.Client ...@@ -151,11 +156,27 @@ namespace Pole.Sagas.Client
private async Task RecursiveCompensateActivity(ActivityWapper activityWapper) private async Task RecursiveCompensateActivity(ActivityWapper activityWapper)
{ {
var activityId = activityWapper.Id; var activityId = activityWapper.Id;
if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
{
activityWapper.OvertimeCompensateTimes++;
}
else
{
activityWapper.CompensateTimes++;
}
try try
{ {
await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes);
await activityWapper.InvokeCompensate(); await activityWapper.InvokeCompensate();
if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
{
// 超时 补偿次数已到
var isCompensated = activityWapper.CompensateTimes == poleSagasOption.MaxOvertimeCompensateTimes;
await eventSender.ActivityOvertimeCompensated(activityId, isCompensated);
}
else
{
await eventSender.ActivityCompensated(activityId); await eventSender.ActivityCompensated(activityId);
}
var compensateActivity = GetNextCompensateActivity(); var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null) if (compensateActivity == null)
{ {
...@@ -165,19 +186,27 @@ namespace Pole.Sagas.Client ...@@ -165,19 +186,27 @@ namespace Pole.Sagas.Client
} }
catch (Exception exception) catch (Exception exception)
{ {
// todo: 超时操作 如果出错可能 减少 补偿次数 这里 先不做处理
if (activityWapper.CompensateTimes == poleSagasOption.MaxCompensateTimes)
{
// 此时 结束 saga 并且设置状态 为 Error
await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace); await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
} }
else
{
await eventSender.ActivityCompensateAborted(activityId, string.Empty, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
}
}
} }
private async Task<ActivityExecuteResult> RecursiveExecuteActivity(ActivityWapper activityWapper) private async Task<ActivityExecuteResult> RecursiveExecuteActivity(ActivityWapper activityWapper)
{ {
var activityId = snowflakeIdGenerator.NextId(); var activityId = snowflakeIdGenerator.NextId();
activityWapper.Id = activityId; activityWapper.Id = activityId;
activityWapper.ExecuteTimes++;
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
try try
{ {
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes); await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow);
var result = await activityWapper.InvokeExecute(); var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess) if (!result.IsSuccess)
{ {
...@@ -185,7 +214,6 @@ namespace Pole.Sagas.Client ...@@ -185,7 +214,6 @@ namespace Pole.Sagas.Client
await CompensateActivity(result, currentExecuteOrder); await CompensateActivity(result, currentExecuteOrder);
return result; return result;
} }
await eventSender.ActivityExecuted(activityId);
var executeActivity = GetNextExecuteActivity(); var executeActivity = GetNextExecuteActivity();
if (executeActivity == null) if (executeActivity == null)
{ {
......
...@@ -44,7 +44,7 @@ namespace Pole.Sagas.Server.Processor ...@@ -44,7 +44,7 @@ namespace Pole.Sagas.Server.Processor
private async Task ProcessInternal() private async Task ProcessInternal()
{ {
var tables = new[] { sagaStorageInitializer.GetSagaTableName(), sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName() }; var tables = new[] { sagaStorageInitializer.GetSagaTableName() };
foreach (var table in tables) foreach (var table in tables)
{ {
......
...@@ -51,20 +51,6 @@ namespace Pole.Sagas.Server.Services ...@@ -51,20 +51,6 @@ namespace Pole.Sagas.Server.Services
} }
return commonResponse; return commonResponse;
} }
public override async Task<CommonResponse> ActivityExecuted(ActivityExecutedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuted(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context) public override async Task<CommonResponse> ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context)
{ {
CommonResponse commonResponse = new CommonResponse(); CommonResponse commonResponse = new CommonResponse();
...@@ -98,7 +84,7 @@ namespace Pole.Sagas.Server.Services ...@@ -98,7 +84,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse(); CommonResponse commonResponse = new CommonResponse();
try try
{ {
await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime), request.ExecuteTimes); await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true; commonResponse.IsSuccess = true;
} }
catch (Exception ex) catch (Exception ex)
...@@ -149,20 +135,6 @@ namespace Pole.Sagas.Server.Services ...@@ -149,20 +135,6 @@ namespace Pole.Sagas.Server.Services
} }
return commonResponse; return commonResponse;
} }
public override async Task<CommonResponse> ActivityCompensating(ActivityCompensatingRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityCompensating(request.ActivityId, request.CompensateTimes);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter<GetSagasResponse> responseStream, ServerCallContext context) public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter<GetSagasResponse> responseStream, ServerCallContext context)
{ {
while (!context.CancellationToken.IsCancellationRequested) while (!context.CancellationToken.IsCancellationRequested)
...@@ -203,6 +175,21 @@ namespace Pole.Sagas.Server.Services ...@@ -203,6 +175,21 @@ namespace Pole.Sagas.Server.Services
} }
} }
public override async Task<CommonResponse> ActivityOvertimeCompensated(ActivityOvertimeCompensatedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityOvertimeCompensated(request.ActivityId, request.Compensated);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
private string CombineError(Exception exception) private string CombineError(Exception exception)
{ {
return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
......
...@@ -16,7 +16,6 @@ namespace Pole.Sagas.Storage.PostgreSql ...@@ -16,7 +16,6 @@ namespace Pole.Sagas.Storage.PostgreSql
{ {
private readonly string sagaTableName; private readonly string sagaTableName;
private readonly string activityTableName; private readonly string activityTableName;
private readonly string overtimeCompensationGuaranteeTableName;
private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption; private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
private readonly ISagaStorageInitializer sagaStorageInitializer; private readonly ISagaStorageInitializer sagaStorageInitializer;
public PostgreSqlSagaStorage(IOptions<PoleSagasStoragePostgreSqlOption> poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer) public PostgreSqlSagaStorage(IOptions<PoleSagasStoragePostgreSqlOption> poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer)
...@@ -25,7 +24,6 @@ namespace Pole.Sagas.Storage.PostgreSql ...@@ -25,7 +24,6 @@ namespace Pole.Sagas.Storage.PostgreSql
this.sagaStorageInitializer = sagaStorageInitializer; this.sagaStorageInitializer = sagaStorageInitializer;
sagaTableName = sagaStorageInitializer.GetSagaTableName(); sagaTableName = sagaStorageInitializer.GetSagaTableName();
activityTableName = sagaStorageInitializer.GetActivityTableName(); activityTableName = sagaStorageInitializer.GetActivityTableName();
overtimeCompensationGuaranteeTableName = sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName();
} }
public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{ {
...@@ -71,20 +69,6 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; ...@@ -71,20 +69,6 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
} }
} }
public async Task ActivityExecuted(string activityId)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Executed)
});
}
}
public async Task ActivityExecuteAborted(string activityId) public async Task ActivityExecuteAborted(string activityId)
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
...@@ -103,42 +87,23 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; ...@@ -103,42 +87,23 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
using (var tansaction = await connection.BeginTransactionAsync())
{
var updateActivitySql = var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new await connection.ExecuteAsync(updateActivitySql, new
{ {
Id = activityId, Id = activityId,
Status = nameof(ActivityStatus.ExecuteAborted) Status = nameof(ActivityStatus.ExecutingOvertime)
}, tansaction); });
var addOCGActivity =
$"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status\",\"ParameterData\",\"CompensateTimes\",\"AddTime\")" +
$"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,,@CompensateTimes,@AddTime);";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Name = name,
ParameterData = parameterData,
CompensateTimes = 0,
AddTime = addTime,
Status = nameof(OvertimeCompensationGuaranteeActivityStatus.Executing)
}, tansaction);
}
} }
} }
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime, int executeTimes) public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime)
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
string sql = string.Empty; string sql =
if (executeTimes == 1) $"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" +
{ $"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);";
sql =
$"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"ExecuteTimes\",\"CompensateTimes\",\"AddTime\")" +
$"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@ExecuteTimes,@CompensateTimes,@AddTime);";
_ = await connection.ExecuteAsync(sql, new _ = await connection.ExecuteAsync(sql, new
{ {
Id = activityId, Id = activityId,
...@@ -147,21 +112,11 @@ $"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status ...@@ -147,21 +112,11 @@ $"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status
Status = nameof(ActivityStatus.Executing), Status = nameof(ActivityStatus.Executing),
ExecutingOvertimeRetries = 0, ExecutingOvertimeRetries = 0,
ParameterData = ParameterData, ParameterData = ParameterData,
ExecuteTimes = executeTimes, OvertimeCompensateTimes = 0,
CompensateTimes = 0, CompensateTimes = 0,
AddTime = addTime AddTime = addTime
}); });
} }
else
{
sql = $"UPDATE {activityTableName} SET \"ExecuteTimes\"=@ExecuteTimes WHERE \"Id\" = @Id";
await connection.ExecuteAsync(sql, new
{
Id = activityId,
ExecuteTimes = executeTimes
});
}
}
} }
public async Task ActivityRevoked(string activityId) public async Task ActivityRevoked(string activityId)
...@@ -210,32 +165,18 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + ...@@ -210,32 +165,18 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
} }
} }
public async Task ActivityCompensating(string activityId, int compensateTimes)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@CompensateTimes WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Compensating),
CompensateTimes = compensateTimes,
});
}
}
public async IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit) public async IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit)
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
var updateActivitySql = var updateActivitySql =
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status "; $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2";
var activities = await connection.QueryAsync<ActivityAndSagaEntity>(updateActivitySql, new var activities = await connection.QueryAsync<ActivityAndSagaEntity>(updateActivitySql, new
{ {
AddTime = dateTime, AddTime = dateTime,
Limit = limit, Limit = limit,
Status = nameof(ActivityStatus.Compensated) Status1 = nameof(ActivityStatus.Compensated),
Status2 = nameof(ActivityStatus.Revoked)
}); });
var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName); var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName);
foreach (var groupedByServiceName in groupedByServiceNameActivities) foreach (var groupedByServiceName in groupedByServiceNameActivities)
...@@ -285,5 +226,32 @@ $"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \" ...@@ -285,5 +226,32 @@ $"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"
}); });
} }
} }
public async Task ActivityOvertimeCompensated(string activityId, bool Compensated)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
string updateActivitySql = string.Empty;
if (!Compensated)
{
updateActivitySql =
$"UPDATE {activityTableName} SET \"OvertimeCompensateTimes\"=\"OvertimeCompensateTimes\"+1 WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId
});
}
else
{
updateActivitySql =
$"UPDATE {activityTableName} SET \"OvertimeCompensateTimes\"=\"OvertimeCompensateTimes\"+1 ,\"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Compensated)
});
}
}
}
} }
} }
...@@ -31,10 +31,6 @@ namespace Pole.Sagas.Storage.PostgreSql ...@@ -31,10 +31,6 @@ namespace Pole.Sagas.Storage.PostgreSql
{ {
return $"\"{options.SchemaName}\".\"{options.SagaTableName}\""; return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
} }
public string GetOvertimeCompensationGuaranteeTableName()
{
return $"\"{options.SchemaName}\".\"{options.OvertimeCompensationGuaranteeTableName}\"";
}
public async Task InitializeAsync(CancellationToken cancellationToken) public async Task InitializeAsync(CancellationToken cancellationToken)
{ {
...@@ -69,7 +65,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ...@@ -69,7 +65,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Order"" int4 NOT NULL, ""Order"" int4 NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ExecuteTimes"" int4 NOT NULL, ""OvertimeCompensateTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL, ""ParameterData"" bytea NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL, ""CompensateTimes"" int4 NOT NULL,
...@@ -84,19 +80,6 @@ ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIM ...@@ -84,19 +80,6 @@ ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIM
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION; ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
CREATE TABLE IF NOT EXISTS {GetOvertimeCompensationGuaranteeTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""CompensateTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""AddTime"" timestamp NOT NULL,
""ExpiresAt"" timestamp,
);
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""OCG - Activities_pkey"" PRIMARY KEY (""Id"");
"; ";
return batchSql; return batchSql;
} }
......
...@@ -12,15 +12,14 @@ namespace Pole.Sagas.Core.Abstraction ...@@ -12,15 +12,14 @@ namespace Pole.Sagas.Core.Abstraction
{ {
Task SagaStarted(string sagaId, string serviceName,DateTime addTime); Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt); Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime);
Task ActivityExecuteAborted(string activityId); Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId); Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime); Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime);
Task ActivityRevoked(string activityId); Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId, int compensateTimes);
IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit); IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit);
Task<int> DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount); Task<int> DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount);
Task ActivityOvertimeCompensated(string activityId, bool compensated);
} }
} }
...@@ -11,6 +11,5 @@ namespace Pole.Sagas.Core.Abstraction ...@@ -11,6 +11,5 @@ namespace Pole.Sagas.Core.Abstraction
Task InitializeAsync(CancellationToken cancellationToken); Task InitializeAsync(CancellationToken cancellationToken);
string GetSagaTableName(); string GetSagaTableName();
string GetActivityTableName(); string GetActivityTableName();
string GetOvertimeCompensationGuaranteeTableName();
} }
} }
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public enum OvertimeCompensationGuaranteeActivityStatus
{
Executing,
Executed,
Error
}
}
...@@ -10,12 +10,11 @@ service Saga { ...@@ -10,12 +10,11 @@ service Saga {
rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse); rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse);
rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse); rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse);
rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse); rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse);
rpc ActivityOvertimeCompensated (ActivityOvertimeCompensatedRequest) returns (CommonResponse);
} }
message CommonResponse{ message CommonResponse{
...@@ -39,7 +38,6 @@ message ActivityExecutingRequest { ...@@ -39,7 +38,6 @@ message ActivityExecutingRequest {
int32 order = 4; int32 order = 4;
string addTime = 5; string addTime = 5;
string activityName = 6; string activityName = 6;
int32 executeTimes = 7;
} }
message ActivityExecuteAbortedRequest { message ActivityExecuteAbortedRequest {
string activityId = 1; string activityId = 1;
...@@ -91,4 +89,8 @@ message GetSagasResponse{ ...@@ -91,4 +89,8 @@ message GetSagasResponse{
} }
} }
} }
message ActivityOvertimeCompensatedRequest {
string activityId = 1;
bool compensated = 2;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment