Commit fb922525 by dingsongjie

fix bug

parent 3806f429
......@@ -17,7 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleServiceCollectionExtensions
{
public static IServiceCollection AddPole(this IServiceCollection services,Action<StartupConfig> config)
public static IServiceCollection AddPole(this IServiceCollection services, Action<StartupConfig> config)
{
StartupConfig startupOption = new StartupConfig(services);
if (startupOption.PoleOptionsConfig == null)
......@@ -30,7 +30,12 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddScoped<IUnitOfWork, Pole.Core.UnitOfWork.UnitOfWork>();
services.AddSingleton<ISerializer, DefaultJsonSerializer>();
services.AddSingleton<IGeneratorIdSolver, InstanceIPV4_16IdGeneratorIdSolver>();
services.AddSingleton<ISnowflakeIdGenerator, SnowflakeIdGenerator>();
services.AddSingleton<IObserverUnitContainer, ObserverUnitContainer>();
using (var serviceProvider = services.BuildServiceProvider())
{
var generatorIdSolver = serviceProvider.GetService<IGeneratorIdSolver>();
services.AddSingleton(typeof(ISnowflakeIdGenerator), factory => new SnowflakeIdGenerator(new DateTime(2020, 1, 1), 16, generatorIdSolver.GetGeneratorId()));
}
services.AddSingleton<IProcessor, PendingMessageRetryProcessor>();
services.AddSingleton<IProcessor, ExpiredEventsCollectorProcessor>();
......
......@@ -17,17 +17,18 @@ namespace Pole.Core.Processor
{
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private readonly IProducer producer;
private readonly IProducerContainer producerContainer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ILogger<PendingMessageRetryProcessor> logger;
private readonly ProducerOptions producerOptions;
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions<ProducerOptions> producerOptions)
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger,
IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions<ProducerOptions> producerOptions)
{
this.eventStorage = eventStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected");
this.logger = logger;
this.producer = producer;
this.producerContainer = producerContainer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected");
......@@ -70,6 +71,7 @@ namespace Pole.Core.Processor
pendingMessage.ExpiresAt = DateTime.UtcNow;
}
pendingMessage.Retries++;
var producer = await producerContainer.GetProducer(eventType);
await producer.Publish(bytes);
pendingMessage.StatusName = nameof(EventStatus.Published);
pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
......
......@@ -18,15 +18,15 @@ namespace Pole.Core.UnitOfWork
{
class UnitOfWork : IUnitOfWork
{
private readonly IProducer producer;
private readonly IProducerContainer producerContainer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private IBus bus;
public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions<PoleOptions> options)
public UnitOfWork(IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions<PoleOptions> options)
{
this.producer = producer;
this.producerContainer = producerContainer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.eventStorage = eventStorage;
......@@ -45,6 +45,7 @@ namespace Pole.Core.UnitOfWork
var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType);
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
var producer = await producerContainer.GetProducer(eventType);
await producer.Publish(bytes);
@event.StatusName = nameof(EventStatus.Published);
@event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
......
......@@ -24,8 +24,10 @@ namespace Pole.Core.Utils
.Select(c => c.Address)
.FirstOrDefault();
var bytes = firstIpV4Address.GetAddressBytes();
generatorId = BitConverter.ToInt32(bytes, 2);
var bytes = firstIpV4Address.GetAddressBytes().TakeLast(2).Reverse().ToList();
bytes.Add(0);
bytes.Add(0);
generatorId = BitConverter.ToInt32(bytes.ToArray());
}
public int GetGeneratorId()
{
......
......@@ -15,7 +15,7 @@ namespace Pole.EventStorage.PostgreSql
{
private readonly PostgreSqlOptions options;
private readonly ILogger logger;
public PostgreSqlEventStorageInitializer(IOptions<PostgreSqlOptions> options,Logger<PostgreSqlEventStorageInitializer> logger)
public PostgreSqlEventStorageInitializer(IOptions<PostgreSqlOptions> options,ILogger<PostgreSqlEventStorageInitializer> logger)
{
this.options = options.Value;
this.logger = logger;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment