SagasBuffer.cs
2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
using Microsoft.Extensions.Logging;
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Server
{
class SagasBuffer : ISagasBuffer
{
private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
private readonly Dictionary<string, List<SagaEntity>> Sagas = new Dictionary<string, List<SagaEntity>>();
private readonly ILogger logger;
public SagasBuffer(ILogger logger)
{
this.logger = logger;
}
public async Task<bool> AddSagas(IAsyncEnumerable<SagasGroupEntity> sagasGroupEntities)
{
try
{
await semaphoreSlim.WaitAsync();
await foreach (var sagasGroupEntity in sagasGroupEntities)
{
if (!Sagas.ContainsKey(sagasGroupEntity.ServiceName))
{
Sagas.TryAdd(sagasGroupEntity.ServiceName, sagasGroupEntity.SagaEntities);
}
else
{
// 这里必然为true
Sagas.TryGetValue(sagasGroupEntity.ServiceName, out List<SagaEntity> sagaList);
sagaList.AddRange(sagasGroupEntity.SagaEntities);
}
}
return true;
}
catch (Exception ex)
{
throw ex;
}
finally
{
semaphoreSlim.Release();
}
}
public async Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, int limit)
{
try
{
await semaphoreSlim.WaitAsync();
if (Sagas.TryGetValue(serviceName, out List<SagaEntity> sagaList))
{
var result = sagaList.Take(limit);
sagaList.RemoveAll(m => result.Select(n => n.Id).Contains(m.Id));
Sagas[serviceName] = sagaList;
return result;
}
return Enumerable.Empty<SagaEntity>();
}
catch (Exception ex)
{
throw ex;
}
finally
{
semaphoreSlim.Release();
}
}
}
}