Commit dc62bcb8 by dingsongjie

完成所有逻辑

parent 7ef7a1a7
using Grpc.Core; using Grpc.Core;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Pole.Core.Serialization; using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction; using Pole.Core.Utils.Abstraction;
...@@ -21,11 +22,15 @@ namespace Pole.Sagas.Client ...@@ -21,11 +22,15 @@ namespace Pole.Sagas.Client
private readonly PoleSagasOption options; private readonly PoleSagasOption options;
private readonly SagaClient sagaClient; private readonly SagaClient sagaClient;
private readonly SagaRestorer sagaRestorer; private readonly SagaRestorer sagaRestorer;
public NotEndedSagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider) private readonly IEventSender eventSender;
private readonly ILogger logger;
public NotEndedSagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger)
{ {
this.options = options.Value; this.options = options.Value;
this.sagaClient = sagaClient; this.sagaClient = sagaClient;
sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService<ISnowflakeIdGenerator>(), serviceProvider, serviceProvider.GetRequiredService<IEventSender>(), this.options, serviceProvider.GetRequiredService<ISerializer>(), serviceProvider.GetRequiredService<IActivityFinder>()); sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService<ISnowflakeIdGenerator>(), serviceProvider, serviceProvider.GetRequiredService<IEventSender>(), this.options, serviceProvider.GetRequiredService<ISerializer>(), serviceProvider.GetRequiredService<IActivityFinder>());
this.eventSender = eventSender;
this.logger = logger;
} }
public async Task StartAsync(CancellationToken cancellationToken) public async Task StartAsync(CancellationToken cancellationToken)
...@@ -36,6 +41,8 @@ namespace Pole.Sagas.Client ...@@ -36,6 +41,8 @@ namespace Pole.Sagas.Client
{ {
if (stream.ResponseStream.Current.IsSuccess) if (stream.ResponseStream.Current.IsSuccess)
{ {
try
{
var sagas = stream.ResponseStream.Current.Sagas.Select(m => var sagas = stream.ResponseStream.Current.Sagas.Select(m =>
{ {
var result = new SagaEntity var result = new SagaEntity
...@@ -58,9 +65,20 @@ namespace Pole.Sagas.Client ...@@ -58,9 +65,20 @@ namespace Pole.Sagas.Client
sagas.ForEach(async sagaEntity => sagas.ForEach(async sagaEntity =>
{ {
var saga = sagaRestorer.CreateSaga(sagaEntity); var saga = sagaRestorer.CreateSaga(sagaEntity);
await saga.CompensateWhenRetry(); var compensateResult = await saga.CompensateWhenRetry();
if (compensateResult)
{
var expiresAt = DateTime.UtcNow.AddSeconds(options.CompeletedSagaExpiredAfterSeconds);
await eventSender.SagaEnded(sagaEntity.Id, expiresAt);
}
}); });
} }
catch(Exception ex)
{
logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry");
}
}
} }
//await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken)) //await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken))
//{ //{
......
...@@ -131,6 +131,10 @@ namespace Pole.Sagas.Client ...@@ -131,6 +131,10 @@ namespace Pole.Sagas.Client
return true; return true;
} }
await RecursiveCompensateActivity(compensateActivity); await RecursiveCompensateActivity(compensateActivity);
if (activities.Any(m => m.ActivityStatus != ActivityStatus.Compensated|| m.ActivityStatus != ActivityStatus.CompensateAborted))
{
return false;
}
return true; return true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment