Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.Sagas.Client
  • SagasCompensateRetryBackgroundService.cs
Find file
BlameHistoryPermalink
  • dingsongjie's avatar
    优化宿主服务 · 2d910fc1
    dingsongjie committed 5 years ago
    2d910fc1
SagasCompensateRetryBackgroundService.cs 4.64 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static Pole.Sagas.Server.Grpc.Saga;

namespace Pole.Sagas.Client
{
    public class SagasCompensateRetryBackgroundService : IHostedService
    {
        private readonly PoleSagasOption options;
        private readonly SagaClient sagaClient;
        private readonly SagaRestorer sagaRestorer;
        private readonly IEventSender eventSender;
        private readonly ILogger logger;
        public SagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger<SagasCompensateRetryBackgroundService> logger)
        {
            this.options = options.Value;
            this.sagaClient = sagaClient;
            sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService<ISnowflakeIdGenerator>(), serviceProvider, serviceProvider.GetRequiredService<IEventSender>(), this.options, serviceProvider.GetRequiredService<ISerializer>(), serviceProvider.GetRequiredService<IActivityFinder>());
            this.eventSender = eventSender;
            this.logger = logger;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            while (true)
            {
                try
                {
                    await GrpcGetSagasCore(cancellationToken);
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "Errors in grpc get sagas");
                }
                finally
                {
                    await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000);
                }
            }
        }

        private async Task GrpcGetSagasCore(CancellationToken cancellationToken)
        {
            using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName }))
            {
                while (await stream.ResponseStream.MoveNext(cancellationToken))
                {
                    if (stream.ResponseStream.Current.IsSuccess)
                    {
                        try
                        {
                            var sagas = stream.ResponseStream.Current.Sagas.Select(m =>
                            {
                                var result = new SagaEntity
                                {
                                    Id = m.Id,
                                };
                                result.ActivityEntities = m.Activities.Select(n => new ActivityEntity
                                {
                                    CompensateTimes = n.CompensateTimes,
                                    ExecuteTimes = n.ExecuteTimes,
                                    Id = n.Id,
                                    Name = n.Id,
                                    Order = n.Order,
                                    ParameterData = n.ParameterData.ToByteArray(),
                                    SagaId = n.SagaId,
                                    Status = n.Status
                                }).ToList();
                                return result;
                            }).ToList();
                            sagas.ForEach(async sagaEntity =>
                            {
                                var saga = sagaRestorer.CreateSaga(sagaEntity);
                                var compensateResult = await saga.CompensateWhenRetry();
                                if (compensateResult)
                                {
                                    var expiresAt = DateTime.UtcNow.AddSeconds(options.CompeletedSagaExpiredAfterSeconds);
                                    await eventSender.SagaEnded(sagaEntity.Id, expiresAt);
                                }
                            });
                        }
                        catch (Exception ex)
                        {
                            logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry");
                        }
                    }
                }
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }
}