Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.Sagas.Server
  • SagasBuffer.cs
Find file
BlameHistoryPermalink
  • dingsongjie's avatar
    完善功能 · 92eeb084
    dingsongjie committed 5 years ago
    92eeb084
SagasBuffer.cs 2.4 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
using Microsoft.Extensions.Logging;
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Pole.Sagas.Server
{
    class SagasBuffer : ISagasBuffer
    {
        private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
        private readonly Dictionary<string, List<SagaEntity>> Sagas = new Dictionary<string, List<SagaEntity>>();
        private readonly ILogger logger;
        public SagasBuffer(ILogger logger)
        {
            this.logger = logger;
        }
        public async Task<bool> AddSagas(IAsyncEnumerable<SagasGroupEntity> sagasGroupEntities)
        {
            try
            {
                await semaphoreSlim.WaitAsync();
                await foreach (var sagasGroupEntity in sagasGroupEntities)
                {
                    if (!Sagas.ContainsKey(sagasGroupEntity.ServiceName))
                    {
                        Sagas.TryAdd(sagasGroupEntity.ServiceName, sagasGroupEntity.SagaEntities);
                    }
                    else
                    {
                        // 这里必然为true
                        Sagas.TryGetValue(sagasGroupEntity.ServiceName, out List<SagaEntity> sagaList);
                        sagaList.AddRange(sagasGroupEntity.SagaEntities);
                    }
                }
                return true;
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                semaphoreSlim.Release();
            }
        }

        public async Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, int limit)
        {
            try
            {
                await semaphoreSlim.WaitAsync();
                if (Sagas.TryGetValue(serviceName, out List<SagaEntity> sagaList))
                {
                    var result = sagaList.Take(limit);
                    sagaList.RemoveAll(m => result.Select(n => n.Id).Contains(m.Id));
                    Sagas[serviceName] = sagaList;
                    return result;
                }
                return Enumerable.Empty<SagaEntity>();
            }
            catch (Exception ex)
            {
                throw ex;
            }
            finally
            {
                semaphoreSlim.Release();
            }
        }
    }
}