diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs index edb93c4..b1f2e9b 100644 --- a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs +++ b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs @@ -1,6 +1,7 @@ using Grpc.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Pole.Core.Serialization; using Pole.Core.Utils.Abstraction; @@ -21,11 +22,15 @@ namespace Pole.Sagas.Client private readonly PoleSagasOption options; private readonly SagaClient sagaClient; private readonly SagaRestorer sagaRestorer; - public NotEndedSagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider) + private readonly IEventSender eventSender; + private readonly ILogger logger; + public NotEndedSagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger) { this.options = options.Value; this.sagaClient = sagaClient; sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService(), serviceProvider, serviceProvider.GetRequiredService(), this.options, serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService()); + this.eventSender = eventSender; + this.logger = logger; } public async Task StartAsync(CancellationToken cancellationToken) @@ -36,30 +41,43 @@ namespace Pole.Sagas.Client { if (stream.ResponseStream.Current.IsSuccess) { - var sagas = stream.ResponseStream.Current.Sagas.Select(m => + try { - var result = new SagaEntity + var sagas = stream.ResponseStream.Current.Sagas.Select(m => { - Id = m.Id, - }; - result.ActivityEntities = m.Activities.Select(n => new ActivityEntity - { - CompensateTimes = n.CompensateTimes, - ExecuteTimes = n.ExecuteTimes, - Id = n.Id, - Name = n.Id, - Order = n.Order, - ParameterData = n.ParameterData.ToByteArray(), - SagaId = n.SagaId, - Status = n.Status + var result = new SagaEntity + { + Id = m.Id, + }; + result.ActivityEntities = m.Activities.Select(n => new ActivityEntity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = n.ParameterData.ToByteArray(), + SagaId = n.SagaId, + Status = n.Status + }).ToList(); + return result; }).ToList(); - return result; - }).ToList(); - sagas.ForEach(async sagaEntity => - { - var saga = sagaRestorer.CreateSaga(sagaEntity); - await saga.CompensateWhenRetry(); - }); + sagas.ForEach(async sagaEntity => + { + var saga = sagaRestorer.CreateSaga(sagaEntity); + var compensateResult = await saga.CompensateWhenRetry(); + if (compensateResult) + { + var expiresAt = DateTime.UtcNow.AddSeconds(options.CompeletedSagaExpiredAfterSeconds); + await eventSender.SagaEnded(sagaEntity.Id, expiresAt); + } + }); + } + catch(Exception ex) + { + logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry"); + } + } } //await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken)) diff --git a/src/Pole.Sagas.Client/Saga.cs b/src/Pole.Sagas.Client/Saga.cs index a79e9b4..7bbebe7 100644 --- a/src/Pole.Sagas.Client/Saga.cs +++ b/src/Pole.Sagas.Client/Saga.cs @@ -131,6 +131,10 @@ namespace Pole.Sagas.Client return true; } await RecursiveCompensateActivity(compensateActivity); + if (activities.Any(m => m.ActivityStatus != ActivityStatus.Compensated|| m.ActivityStatus != ActivityStatus.CompensateAborted)) + { + return false; + } return true; }