diff --git a/samples/apis/Backet.Api/Startup.cs b/samples/apis/Backet.Api/Startup.cs index 98c92e5..9436c30 100644 --- a/samples/apis/Backet.Api/Startup.cs +++ b/samples/apis/Backet.Api/Startup.cs @@ -1,8 +1,3 @@ -using System; -using System.Collections.Generic; -using System.Data.SqlClient; -using System.Linq; -using System.Threading.Tasks; using Backet.Api.Grains; using Backet.Api.Infrastructure; using Microsoft.AspNetCore.Builder; @@ -39,9 +34,6 @@ namespace Backet.Api .Include(box => box.BacketItems)); options.IsRelatedData = true; }); - - - } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. diff --git a/src/Pole.Core/Grains/PoleGrain.cs b/src/Pole.Core/Grains/PoleGrain.cs index f8a3030..1dadcbe 100644 --- a/src/Pole.Core/Grains/PoleGrain.cs +++ b/src/Pole.Core/Grains/PoleGrain.cs @@ -1,5 +1,6 @@ using Orleans; using Pole.Core.Domain; +using Pole.Core.EventBus.Event; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.Core/UnitOfWork/IUnitOfWork.cs b/src/Pole.Core/UnitOfWork/IUnitOfWork.cs index 24f5b7a..7664276 100644 --- a/src/Pole.Core/UnitOfWork/IUnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/IUnitOfWork.cs @@ -14,6 +14,6 @@ namespace Pole.Core.UnitOfWork { Task CompeleteAsync(CancellationToken cancellationToken = default); Task Rollback(CancellationToken cancellationToken = default); - IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus); + IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus); } } diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs index 6fe9604..5b2d049 100644 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -57,10 +57,9 @@ namespace Pole.Core.UnitOfWork bus = null; } - public IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus) + public IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus) { - bus.Transaction = bus.ServiceProvider.GetService(); - bus.Transaction.DbTransaction = dbTransaction; + bus.Transaction = dbTransactionAdapter; this.bus = bus; return this; } diff --git a/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs b/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs index 8f64cac..682ddea 100644 --- a/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs +++ b/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs @@ -8,7 +8,10 @@ using Orleans; using Orleans.Runtime; using Orleans.Storage; using Pole.Core.Domain; +using Pole.Core.EventBus; using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.Transaction; +using Pole.Core.UnitOfWork; using System; using System.Collections.Generic; using System.Linq; @@ -95,11 +98,36 @@ namespace Pole.Orleans.Provider.EntityframeworkCore try { - await context.SaveChangesAsync() - .ConfigureAwait(false); + if (entity.DomainEvents.Count != 0) + { + using (var unitOfWork = scope.ServiceProvider.GetRequiredService()) + { + using (var dbTransactionAdapter = scope.ServiceProvider.GetRequiredService()) + { + var bus = scope.ServiceProvider.GetRequiredService(); + using (var transaction = await context.Database.BeginTransactionAsync()) + { + dbTransactionAdapter.DbTransaction = transaction; + unitOfWork.Enlist(dbTransactionAdapter, bus); + var publishTasks = entity.DomainEvents.Select(m => bus.Publish(m)); + await Task.WhenAll(publishTasks); + await context.SaveChangesAsync().ConfigureAwait(false); + + if (_options.CheckForETag) + grainState.ETag = _options.GetETagFunc(entity); + + await unitOfWork.CompeleteAsync(); + } + } + }; + } + else + { + await context.SaveChangesAsync().ConfigureAwait(false); - if (_options.CheckForETag) - grainState.ETag = _options.GetETagFunc(entity); + if (_options.CheckForETag) + grainState.ETag = _options.GetETagFunc(entity); + } } catch (DbUpdateConcurrencyException e) {