Bus.cs
2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
{
class Bus : IBus
{
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IEventStorage eventStorage;
public IDbTransactionAdapter Transaction { get; set; }
public IServiceProvider ServiceProvider { get; }
public BlockingCollection<EventEntity> PrePublishEventBuffer { get; } =
new BlockingCollection<EventEntity>(new ConcurrentQueue<EventEntity>());
public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage)
{
ServiceProvider = serviceProvider;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.eventStorage = eventStorage;
}
public async Task<bool> Publish(object @event, CancellationToken cancellationToken = default)
{
var eventType = @event.GetType();
var eventTypeCode = eventTypeFinder.GetCode(eventType);
var eventId = snowflakeIdGenerator.NextId();
var eventContent = serializer.Serialize(@event, eventType);
var eventEntity = new EventEntity
{
Added = DateTime.UtcNow,
Content = eventContent,
ExpiresAt = null,
Id = eventId,
Name = eventTypeCode,
Retries = 0,
StatusName = nameof(EventStatus.Pending)
};
if (Transaction?.DbTransaction == null)
{
var mediumMessage = await eventStorage.StoreMessage(eventEntity);
}
else
{
var mediumMessage = await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction);
}
PrePublishEventBuffer.Add(eventEntity);
return true;
}
}
}