Commit 592237d8 by 丁松杰

修复bugs

parent a391ba15
...@@ -5,6 +5,7 @@ using Pole.Core.EventBus.Transaction; ...@@ -5,6 +5,7 @@ using Pole.Core.EventBus.Transaction;
using Pole.Core.Serialization; using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction; using Pole.Core.Utils.Abstraction;
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
...@@ -14,7 +15,6 @@ namespace Pole.Core.EventBus ...@@ -14,7 +15,6 @@ namespace Pole.Core.EventBus
{ {
class Bus : IBus class Bus : IBus
{ {
private readonly IProducer producer;
private readonly IEventTypeFinder eventTypeFinder; private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer; private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator; private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
...@@ -22,11 +22,12 @@ namespace Pole.Core.EventBus ...@@ -22,11 +22,12 @@ namespace Pole.Core.EventBus
public IDbTransactionAdapter Transaction { get; set; } public IDbTransactionAdapter Transaction { get; set; }
public IServiceProvider ServiceProvider { get; } public IServiceProvider ServiceProvider { get; }
public BlockingCollection<EventEntity> PrePublishEventBuffer { get; } =
new BlockingCollection<EventEntity>(new ConcurrentQueue<EventEntity>());
public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage)
{ {
ServiceProvider = serviceProvider; ServiceProvider = serviceProvider;
this.producer = producer;
this.eventTypeFinder = eventTypeFinder; this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer; this.serializer = serializer;
this.snowflakeIdGenerator = snowflakeIdGenerator; this.snowflakeIdGenerator = snowflakeIdGenerator;
...@@ -37,10 +38,10 @@ namespace Pole.Core.EventBus ...@@ -37,10 +38,10 @@ namespace Pole.Core.EventBus
var eventType = @event.GetType(); var eventType = @event.GetType();
var eventTypeCode = eventTypeFinder.GetCode(eventType); var eventTypeCode = eventTypeFinder.GetCode(eventType);
var eventId = snowflakeIdGenerator.NextId(); var eventId = snowflakeIdGenerator.NextId();
var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); //var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType);
var eventContent = serializer.Serialize(@event, eventType); var eventContent = serializer.Serialize(@event, eventType);
var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes); //var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes);
var bytes = bytesTransport.GetBytes(); //var bytes = bytesTransport.GetBytes();
var eventEntity = new EventEntity var eventEntity = new EventEntity
{ {
Added = DateTime.UtcNow, Added = DateTime.UtcNow,
...@@ -59,15 +60,10 @@ namespace Pole.Core.EventBus ...@@ -59,15 +60,10 @@ namespace Pole.Core.EventBus
{ {
var mediumMessage = await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction); var mediumMessage = await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction);
if (Transaction.AutoCommit)
{
await Transaction.CommitAsync();
}
} }
PrePublishEventBuffer.Add(eventEntity);
await producer.Publish(bytes); //await producer.Publish(bytes);
//await eventStorage.ChangePublishStateAsync(eventEntity,EventStatus.Published);
await eventStorage.ChangePublishStateAsync(eventEntity,EventStatus.Published);
return true; return true;
} }
......
using Pole.Core.EventBus.Transaction; using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
...@@ -11,6 +13,7 @@ namespace Pole.Core.EventBus ...@@ -11,6 +13,7 @@ namespace Pole.Core.EventBus
{ {
IServiceProvider ServiceProvider { get; } IServiceProvider ServiceProvider { get; }
IDbTransactionAdapter Transaction { get; set; } IDbTransactionAdapter Transaction { get; set; }
BlockingCollection<EventEntity> PrePublishEventBuffer { get; }
Task<bool> Publish(object @event, CancellationToken cancellationToken = default); Task<bool> Publish(object @event, CancellationToken cancellationToken = default);
} }
} }
using System; using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
...@@ -10,7 +12,6 @@ namespace Pole.Core.EventBus.Transaction ...@@ -10,7 +12,6 @@ namespace Pole.Core.EventBus.Transaction
{ {
Task CommitAsync(CancellationToken cancellationToken = default); Task CommitAsync(CancellationToken cancellationToken = default);
Task RollbackAsync(CancellationToken cancellationToken = default); Task RollbackAsync(CancellationToken cancellationToken = default);
bool AutoCommit { get; set; }
object DbTransaction { get; set; } object DbTransaction { get; set; }
} }
} }
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
namespace Pole.Core.UnitOfWork
{
public interface IUnitOfWork : IDisposable
{
Task CompeleteAsync(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus);
}
}
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Abstraction;
using Pole.Core.Serialization;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
namespace Pole.Core.UnitOfWork
{
class UnitOfWork : IUnitOfWork
{
private readonly IProducer producer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly IEventStorage eventStorage;
private IBus bus;
public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage)
{
this.producer = producer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.eventStorage = eventStorage;
}
public async Task CompeleteAsync(CancellationToken cancellationToken = default)
{
await bus.Transaction.CommitAsync();
var bufferedEvents = bus.PrePublishEventBuffer.ToList();
bufferedEvents.ForEach(async @event =>
{
var eventType = eventTypeFinder.FindType(@event.Name);
var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType);
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
await producer.Publish(bytes);
await eventStorage.ChangePublishStateAsync(@event, EventStatus.Published);
});
}
public void Dispose()
{
bus = null;
}
public IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus)
{
bus.Transaction = bus.ServiceProvider.GetService<IDbTransactionAdapter>();
bus.Transaction.DbTransaction = dbTransaction;
this.bus = bus;
return this;
}
public Task Rollback(CancellationToken cancellationToken = default)
{
return bus.Transaction.RollbackAsync();
}
}
}
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.EntityFrameworkCore.Storage;
namespace Pole.EventStorage.PostgreSql
{
public static class EnlistBusExtensions
{
public static IDbTransaction EnlistBus(this IDbTransaction dbTransaction, IBus bus, bool autoCommit = false)
{
bus.Transaction = bus.ServiceProvider.GetService<IDbTransactionAdapter>();
bus.Transaction.DbTransaction = dbTransaction;
bus.Transaction.AutoCommit = autoCommit;
return dbTransaction;
}
public static IDbContextTransaction EnlistBus(this IDbContextTransaction dbContextTransaction, IBus bus, bool autoCommit = false)
{
bus.Transaction = bus.ServiceProvider.GetService<IDbTransactionAdapter>();
bus.Transaction.DbTransaction = dbContextTransaction;
bus.Transaction.AutoCommit = autoCommit;
return dbContextTransaction;
}
}
}
using Microsoft.EntityFrameworkCore.Storage; using Microsoft.EntityFrameworkCore.Storage;
using Pole.Core.Abstraction;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction; using Pole.Core.EventBus.Transaction;
using Pole.Core.Serialization;
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data; using System.Data;
using System.Linq;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -11,8 +18,7 @@ namespace Pole.EventStorage.PostgreSql ...@@ -11,8 +18,7 @@ namespace Pole.EventStorage.PostgreSql
{ {
class PostgreSqlDbTransactionAdapter : IDbTransactionAdapter class PostgreSqlDbTransactionAdapter : IDbTransactionAdapter
{ {
public bool AutoCommit { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } public object DbTransaction { get; set; }
public object DbTransaction { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public async Task CommitAsync(CancellationToken cancellationToken = default) public async Task CommitAsync(CancellationToken cancellationToken = default)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment