diff --git a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs index 0ebe8ef..78c601c 100644 --- a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs +++ b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs @@ -17,7 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection { public static class PoleServiceCollectionExtensions { - public static IServiceCollection AddPole(this IServiceCollection services,Action config) + public static IServiceCollection AddPole(this IServiceCollection services, Action config) { StartupConfig startupOption = new StartupConfig(services); if (startupOption.PoleOptionsConfig == null) @@ -30,7 +30,12 @@ namespace Microsoft.Extensions.DependencyInjection services.AddScoped(); services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); + using (var serviceProvider = services.BuildServiceProvider()) + { + var generatorIdSolver = serviceProvider.GetService(); + services.AddSingleton(typeof(ISnowflakeIdGenerator), factory => new SnowflakeIdGenerator(new DateTime(2020, 1, 1), 16, generatorIdSolver.GetGeneratorId())); + } services.AddSingleton(); services.AddSingleton(); diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs index c2f10a9..2fefae9 100644 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -17,17 +17,18 @@ namespace Pole.Core.Processor { private readonly IEventStorage eventStorage; private readonly PoleOptions options; - private readonly IProducer producer; + private readonly IProducerContainer producerContainer; private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly ILogger logger; private readonly ProducerOptions producerOptions; - public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions producerOptions) + public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, + IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions producerOptions) { this.eventStorage = eventStorage; this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); this.logger = logger; - this.producer = producer; + this.producerContainer = producerContainer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); @@ -70,6 +71,7 @@ namespace Pole.Core.Processor pendingMessage.ExpiresAt = DateTime.UtcNow; } pendingMessage.Retries++; + var producer = await producerContainer.GetProducer(eventType); await producer.Publish(bytes); pendingMessage.StatusName = nameof(EventStatus.Published); pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs index 5b2d049..ca19828 100644 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -18,15 +18,15 @@ namespace Pole.Core.UnitOfWork { class UnitOfWork : IUnitOfWork { - private readonly IProducer producer; + private readonly IProducerContainer producerContainer; private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly IEventStorage eventStorage; private readonly PoleOptions options; private IBus bus; - public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions options) + public UnitOfWork(IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions options) { - this.producer = producer; + this.producerContainer = producerContainer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; this.eventStorage = eventStorage; @@ -45,6 +45,7 @@ namespace Pole.Core.UnitOfWork var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); + var producer = await producerContainer.GetProducer(eventType); await producer.Publish(bytes); @event.StatusName = nameof(EventStatus.Published); @event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); diff --git a/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs b/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs index abadccf..5a892de 100644 --- a/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs +++ b/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs @@ -24,8 +24,10 @@ namespace Pole.Core.Utils .Select(c => c.Address) .FirstOrDefault(); - var bytes = firstIpV4Address.GetAddressBytes(); - generatorId = BitConverter.ToInt32(bytes, 2); + var bytes = firstIpV4Address.GetAddressBytes().TakeLast(2).Reverse().ToList(); + bytes.Add(0); + bytes.Add(0); + generatorId = BitConverter.ToInt32(bytes.ToArray()); } public int GetGeneratorId() { diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs index 9a4fbd4..0d6f797 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs @@ -15,7 +15,7 @@ namespace Pole.EventStorage.PostgreSql { private readonly PostgreSqlOptions options; private readonly ILogger logger; - public PostgreSqlEventStorageInitializer(IOptions options,Logger logger) + public PostgreSqlEventStorageInitializer(IOptions options,ILogger logger) { this.options = options.Value; this.logger = logger;