Commit 74cb469b by dingsongjie

完成 重试消息 获取 与 过时数据删除

parent 82d581dc
......@@ -12,7 +12,7 @@ using System.Threading.Tasks;
namespace Pole.Core.Processor
{
public class PendingMessageRetryProcessor : ProcessorBase
class PendingMessageRetryProcessor : ProcessorBase
{
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
......
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Server
{
public interface ISagasBuffer
{
Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, DateTime dateTime, int limit);
Task<bool> AddSagas(IAsyncEnumerable<SagasGroupEntity> sagasGroupEntities);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Server
{
public class PoleSagasServerOption
{
public int NotEndedSagasFetchIntervalSeconds { get; set; } = 10;
public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60;
public int ExpiredDataDeleteBatchCount { get; set; } = 1000;
public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3;
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Processor;
using Pole.Sagas.Server.Processor;
using System;
using System.Collections.Generic;
using System.Text;
......@@ -11,6 +13,9 @@ namespace Pole.Sagas.Server
{
services.AddGrpc();
services.AddSingleton<IProcessor, NotEndedSagasFetchProcessor>();
services.AddSingleton<IProcessor, ExpiredSagasCollectorProcessor>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>();
return services;
}
}
......
using Microsoft.Extensions.Hosting;
using Pole.Core.Processor;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Linq;
namespace Pole.Sagas.Server.Processor
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
{
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task Start(CancellationToken stoppingToken)
{
var eventStorageInitializer = _serviceProvider.GetService<ISagaStorageInitializer>();
await eventStorageInitializer.InitializeAsync(stoppingToken);
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
foreach (var innerProcessor in innerProcessors)
{
LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory);
loopProcessors.Add(processor);
}
var tasks = loopProcessors.Select(p => p.Process(processingContext));
_compositeTask = Task.WhenAll(tasks);
await _compositeTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Start(stoppingToken);
}
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.Processor;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Server.Processor
{
class ExpiredSagasCollectorProcessor : ProcessorBase
{
private readonly ISagaStorage sagaStorage;
private readonly PoleSagasServerOption options;
private readonly ILogger logger;
private readonly ISagaStorageInitializer sagaStorageInitializer;
public ExpiredSagasCollectorProcessor(ISagaStorage sagaStorage, IOptions<PoleSagasServerOption> options, ILogger<ExpiredSagasCollectorProcessor> logger, ISagaStorageInitializer sagaStorageInitializer)
{
this.sagaStorage = sagaStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleSagasServerOption)} Must be injected");
this.logger = logger;
this.sagaStorageInitializer = sagaStorageInitializer;
}
public override string Name => nameof(NotEndedSagasFetchProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
await ProcessInternal();
}
catch (Exception ex)
{
logger.LogError(ex, $"{nameof(NotEndedSagasFetchProcessor)} Process Error");
}
finally
{
await Task.Delay(options.ExpiredDataBulkDeleteIntervalSeconds * 1000);
}
}
private async Task ProcessInternal()
{
var tables = new[] { sagaStorageInitializer.GetSagaTableName(), sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName() };
foreach (var table in tables)
{
logger.LogDebug($"Collecting expired data from table: {table}");
int deletedCount;
var time = DateTime.UtcNow;
do
{
deletedCount = await sagaStorage.DeleteExpiredData(table, time, options.ExpiredDataDeleteBatchCount);
if (deletedCount == options.ExpiredDataDeleteBatchCount)
{
await Task.Delay(options.ExpiredDataPreBulkDeleteDelaySeconds * 1000);
}
} while (deletedCount == options.ExpiredDataDeleteBatchCount);
}
}
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.Processor;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Server.Processor
{
class NotEndedSagasFetchProcessor : ProcessorBase
{
private readonly ISagaStorage sagaStorage;
private readonly PoleSagasServerOption options;
private readonly ILogger logger;
private readonly ISagasBuffer sagasBuffer;
public NotEndedSagasFetchProcessor(ISagaStorage sagaStorage, IOptions<PoleSagasServerOption> options, ILogger<NotEndedSagasFetchProcessor> logger,
ISagasBuffer sagasBuffer)
{
this.sagaStorage = sagaStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleSagasServerOption)} Must be injected");
this.logger = logger;
this.sagasBuffer = sagasBuffer;
}
public override string Name => nameof(NotEndedSagasFetchProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
await ProcessInternal();
}
catch (Exception ex)
{
logger.LogError(ex, $"{nameof(NotEndedSagasFetchProcessor)} Process Error");
}
finally
{
await Task.Delay(options.NotEndedSagasFetchIntervalSeconds * 1000);
}
}
private async Task ProcessInternal()
{
var addTimeFilter = DateTime.UtcNow.AddMinutes(-4);
var sagas = sagaStorage.GetSagas(addTimeFilter, 500);
await sagasBuffer.AddSagas(sagas);
}
}
}
using Microsoft.Extensions.Logging;
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Server
{
class SagasBuffer : ISagasBuffer
{
private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
private readonly Dictionary<string, List<SagaEntity>> Sagas = new Dictionary<string, List<SagaEntity>>();
private readonly ILogger logger;
public SagasBuffer(ILogger logger)
{
this.logger = logger;
}
public async Task<bool> AddSagas(IAsyncEnumerable<SagasGroupEntity> sagasGroupEntities)
{
try
{
await semaphoreSlim.WaitAsync();
await foreach (var sagasGroupEntity in sagasGroupEntities)
{
if (!Sagas.ContainsKey(sagasGroupEntity.ServiceName))
{
Sagas.TryAdd(sagasGroupEntity.ServiceName, sagasGroupEntity.SagaEntities);
}
else
{
// 这里必然为true
Sagas.TryGetValue(sagasGroupEntity.ServiceName, out List<SagaEntity> sagaList);
sagaList.AddRange(sagasGroupEntity.SagaEntities);
}
}
return true;
}
catch (Exception ex)
{
throw ex;
}
finally
{
semaphoreSlim.Release();
}
}
public async Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, DateTime dateTime, int limit)
{
try
{
await semaphoreSlim.WaitAsync();
if (Sagas.TryGetValue(serviceName, out List<SagaEntity> sagaList))
{
var result = sagaList.Take(limit);
sagaList.RemoveAll(m => result.Select(n => n.Id).Contains(m.Id));
Sagas[serviceName] = sagaList;
return result;
}
return Enumerable.Empty<SagaEntity>();
}
catch (Exception ex)
{
throw ex;
}
finally
{
semaphoreSlim.Release();
}
}
}
}
using Grpc.Core;
using Google.Protobuf;
using Grpc.Core;
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
......@@ -11,9 +13,11 @@ namespace Pole.Sagas.Server.Services
public class SagaService : Pole.Sagas.Server.Grpc.Saga.SagaBase
{
private readonly ISagaStorage sagaStorage;
public SagaService(ISagaStorage sagaStorage)
private readonly ISagasBuffer sagasBuffer;
public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer)
{
this.sagaStorage = sagaStorage;
this.sagasBuffer = sagasBuffer;
}
public override async Task<CommonResponse> ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context)
{
......@@ -76,7 +80,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteOvertime(request.ActivityId);
await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.Name, request.ParameterData.ToByteArray(), Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -118,7 +122,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.SagaEnded(request.SagaId,Convert.ToDateTime(request.ExpiresAt));
await sagaStorage.SagaEnded(request.SagaId, Convert.ToDateTime(request.ExpiresAt));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -132,7 +136,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.SagaStarted(request.SagaId,request.ServiceName,Convert.ToDateTime( request.AddTime));
await sagaStorage.SagaStarted(request.SagaId, request.ServiceName, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -155,6 +159,40 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
public override async Task<GetSagasResponse> GetSagas(GetSagasRequest request, ServerCallContext context)
{
GetSagasResponse getSagasResponse = new GetSagasResponse();
try
{
var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, Convert.ToDateTime(request.AddTime), request.Limit);
var sagaDtoes = sagaEntities.Select(m =>
{
var result = new GetSagasResponse.Types.Saga
{
Id = m.Id,
};
result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity
{
CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.ExecuteTimes,
Id = n.Id,
Name = n.Id,
Order = n.Order,
ParameterData = ByteString.CopyFrom(n.ParameterData),
SagaId = n.SagaId,
Status = n.Status
}));
return result;
});
getSagasResponse.Sagas.Add(sagaDtoes);
getSagasResponse.IsSuccess = true;
}
catch (Exception ex)
{
getSagasResponse.Errors = CombineError(ex);
}
return getSagasResponse;
}
private string CombineError(Exception exception)
{
return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Storage.PostgreSql
{
public class ActivityAndSagaEntity
{
public string Id { get; set; }
public string SagaId { get; set; }
public string ServiceName { get; set; }
public int Order { get; set; }
public string Status { get; set; }
public byte[] ParameterData { get; set; }
public int ExecuteTimes { get; set; }
public int CompensateTimes { get; set; }
public int Name { get; set; }
}
}
......@@ -6,9 +6,10 @@ namespace Pole.Sagas.Storage.PostgreSql
{
public class PoleSagasStoragePostgreSqlOption
{
public string SagaTableName { get; set; }
public string SchemaName { get; set; }
public string ActivityTableName { get; set; }
public string SagaTableName { get; set; } = "Sagas";
public string SchemaName { get; set; } = "pole-sagas";
public string ActivityTableName { get; set; } = "Activities";
public string OvertimeCompensationGuaranteeTableName { get; set; } = "OCG-Activities";
public int SagasRecoveryIntervalSecond { get; set; }
public string ConnectionString { get; set; }
}
......
......@@ -3,8 +3,10 @@ using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
......@@ -14,6 +16,7 @@ namespace Pole.Sagas.Storage.PostgreSql
{
private readonly string sagaTableName;
private readonly string activityTableName;
private readonly string overtimeCompensationGuaranteeTableName;
private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
private readonly ISagaStorageInitializer sagaStorageInitializer;
public PostgreSqlSagaStorage(IOptions<PoleSagasStoragePostgreSqlOption> poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer)
......@@ -22,6 +25,7 @@ namespace Pole.Sagas.Storage.PostgreSql
this.sagaStorageInitializer = sagaStorageInitializer;
sagaTableName = sagaStorageInitializer.GetSagaTableName();
activityTableName = sagaStorageInitializer.GetActivityTableName();
overtimeCompensationGuaranteeTableName = sagaStorageInitializer.GetOvertimeCompensationGuaranteeTableName();
}
public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
......@@ -95,17 +99,33 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
}
}
public async Task ActivityExecuteOvertime(string activityId)
public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
using (var tansaction = await connection.BeginTransactionAsync())
{
Id = activityId,
Status = nameof(ActivityStatus.ExecutingOvertime)
});
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.ExecuteAborted)
}, tansaction);
var addOCGActivity =
$"INSERT INTO {overtimeCompensationGuaranteeTableName} (\"Id\",\"Name\",\"Status\",\"ParameterData\",\"CompensateTimes\",\"AddTime\")" +
$"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,,@CompensateTimes,@AddTime);";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Name = name,
ParameterData = parameterData,
CompensateTimes = 0,
AddTime = addTime,
Status = nameof(OvertimeCompensationGuaranteeActivityStatus.Executing)
}, tansaction);
}
}
}
......@@ -167,7 +187,7 @@ $"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE
await connection.ExecuteAsync(updateActivitySql, new
{
Id = sagaId,
ExpiresAt= ExpiresAt,
ExpiresAt = ExpiresAt,
Status = nameof(ActivityStatus.Revoked)
});
}
......@@ -184,7 +204,7 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
{
Id = sagaId,
AddTime = addTime,
ServiceName=serviceName,
ServiceName = serviceName,
Status = nameof(ActivityStatus.Revoked)
});
}
......@@ -200,7 +220,67 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@Compen
{
Id = activityId,
Status = nameof(ActivityStatus.Compensating),
CompensateTimes= compensateTimes,
CompensateTimes = compensateTimes,
});
}
}
public async IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\"";
var activities = await connection.QueryAsync<ActivityAndSagaEntity>(updateActivitySql, new
{
AddTime = dateTime,
Limit = limit,
});
var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName);
foreach (var groupedByServiceName in groupedByServiceNameActivities)
{
SagasGroupEntity sagasGroupEntity = new SagasGroupEntity
{
ServiceName = groupedByServiceName.Key,
};
var groupedBySagaIds = groupedByServiceName.GroupBy(m => m.SagaId);
foreach (var groupedBySagaId in groupedBySagaIds)
{
SagaEntity sagaEntity = new SagaEntity
{
Id = groupedBySagaId.Key
};
foreach (var activity in groupedBySagaId)
{
ActivityEntity activityEntity = new ActivityEntity
{
CompensateTimes = activity.CompensateTimes,
ExecuteTimes = activity.ExecuteTimes,
Id = activity.Id,
Order = activity.Order,
ParameterData = activity.ParameterData,
SagaId = activity.SagaId,
Status = activity.Status,
};
sagaEntity.ActivityEntities.Add(activityEntity);
}
sagasGroupEntity.SagaEntities.Add(sagaEntity);
}
yield return sagasGroupEntity;
}
}
}
public Task<int> DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var sql =
$"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"Id\" FROM {tableName} LIMIT @BatchCount);";
return connection.ExecuteAsync(sql, new
{
ExpiredAt = ExpiredAt,
BatchCount = batchCount,
});
}
}
......
......@@ -31,6 +31,10 @@ namespace Pole.Sagas.Storage.PostgreSql
{
return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
}
public string GetOvertimeCompensationGuaranteeTableName()
{
return $"\"{options.SchemaName}\".\"{options.OvertimeCompensationGuaranteeTableName}\"";
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
......@@ -70,12 +74,29 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL
)
;
);
CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree (
""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
);
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
CREATE TABLE IF NOT EXISTS {GetOvertimeCompensationGuaranteeTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""CompensateTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""AddTime"" timestamp NOT NULL,
""ExpiresAt"" timestamp,
);
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""OCG - Activities_pkey"" PRIMARY KEY (""Id"");
";
return batchSql;
}
......
......@@ -12,7 +12,7 @@ namespace Pole.Sagas.Core.Abstraction
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId);
Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime);
Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId,int compensateTimes);
}
......
......@@ -11,5 +11,6 @@ namespace Pole.Sagas.Core.Abstraction
Task InitializeAsync(CancellationToken cancellationToken);
string GetSagaTableName();
string GetActivityTableName();
string GetOvertimeCompensationGuaranteeTableName();
}
}
......@@ -14,8 +14,8 @@ namespace Pole.Sagas.Core
public Byte[] ParameterData { get; set; }
public Byte[] ResultData { get; set; }
public string Errors { get; set; }
public int ExecuteRetries { get; set; }
public int CompensateRetries { get; set; }
public int ExecuteTimes { get; set; }
public int CompensateTimes { get; set; }
public DateTime AddTime { get; set; }
}
}
......@@ -111,7 +111,7 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityExecuteOvertime(string activityId)
public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
{
var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest
{
......
using Pole.Sagas.Core;
using Google.Protobuf.Collections;
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Text;
......@@ -15,8 +17,10 @@ namespace Pole.Sagas.Core
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId);
Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime);
Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId, int compensateTimes);
IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit);
Task<int> DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public enum OvertimeCompensationGuaranteeActivityStatus
{
Executing,
Executed,
Error
}
}
......@@ -8,6 +8,7 @@ namespace Pole.Sagas.Core
{
public string ServiceName { get; set; }
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public int SagasTimeOutSeconds { get; set; } = 60;
public string SagasServerHost { get; set; }
}
}
......@@ -6,6 +6,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
......@@ -84,7 +85,6 @@ namespace Pole.Sagas.Core
public async Task<SagaResult> GetResult()
{
await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
......@@ -173,7 +173,8 @@ namespace Pole.Sagas.Core
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteOvertime(activityId);
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent,DateTime.UtcNow);
// 超时的时候 需要首先补偿这个超时的操作
return await CompensateActivity(result, currentExecuteOrder + 1);
}
......
......@@ -6,7 +6,7 @@ namespace Pole.Sagas.Core
{
public class SagaEntity
{
public int Id { get; set; }
public string Id { get; set; }
public string ServiceName { get; set; }
public List<ActivityEntity> ActivityEntities { get; set; }
public string Status { get; set; }
......
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class SagasGroupEntity
{
public string ServiceName { get; set; }
public List<SagaEntity> SagaEntities { get; set; }
}
}
......@@ -15,6 +15,7 @@ service Saga {
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse);
rpc GetSagas (GetSagasRequest) returns (GetSagasResponse);
}
message CommonResponse{
......@@ -56,13 +57,39 @@ message ActivityCompensatedRequest {
}
message ActivityExecuteOvertimeRequest {
string activityId = 1;
string name = 2;
bytes parameterData = 3;
string addTime = 4;
}
message ActivityRevokedRequest {
string activityId = 1;
}
message ActivityCompensatingRequest {
string activityId = 1;
int32 CompensateTimes = 2;
int32 compensateTimes = 2;
}
message GetSagasRequest{
string serviceName = 1;
string addTime = 2;
int32 limit = 3;
}
message GetSagasResponse{
bool isSuccess = 1;
string errors = 2;
repeated Saga Sagas = 3;
message Saga{
string id = 1;
repeated Activity Activities = 2;
message Activity{
string id = 1;
string sagaId = 2;
int32 order = 3;
string status = 4;
bytes parameterData = 5;
int32 executeTimes = 6;
int32 compensateTimes = 7;
string name = 8;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment