Commit d6a21d40 by dingsongjie

grains WriteStateAsync 添加 发送 domaindvent的 操作

parent e2141433
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using Backet.Api.Grains;
using Backet.Api.Infrastructure;
using Microsoft.AspNetCore.Builder;
......@@ -39,9 +34,6 @@ namespace Backet.Api
.Include(box => box.BacketItems));
options.IsRelatedData = true;
});
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......
using Orleans;
using Pole.Core.Domain;
using Pole.Core.EventBus.Event;
using System;
using System.Collections.Generic;
using System.Text;
......
......@@ -14,6 +14,6 @@ namespace Pole.Core.UnitOfWork
{
Task CompeleteAsync(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus);
IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus);
}
}
......@@ -57,10 +57,9 @@ namespace Pole.Core.UnitOfWork
bus = null;
}
public IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus)
public IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus)
{
bus.Transaction = bus.ServiceProvider.GetService<IDbTransactionAdapter>();
bus.Transaction.DbTransaction = dbTransaction;
bus.Transaction = dbTransactionAdapter;
this.bus = bus;
return this;
}
......
......@@ -8,7 +8,10 @@ using Orleans;
using Orleans.Runtime;
using Orleans.Storage;
using Pole.Core.Domain;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.Transaction;
using Pole.Core.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -95,11 +98,36 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
try
{
await context.SaveChangesAsync()
.ConfigureAwait(false);
if (entity.DomainEvents.Count != 0)
{
using (var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>())
{
using (var dbTransactionAdapter = scope.ServiceProvider.GetRequiredService<IDbTransactionAdapter>())
{
var bus = scope.ServiceProvider.GetRequiredService<IBus>();
using (var transaction = await context.Database.BeginTransactionAsync())
{
dbTransactionAdapter.DbTransaction = transaction;
unitOfWork.Enlist(dbTransactionAdapter, bus);
var publishTasks = entity.DomainEvents.Select(m => bus.Publish(m));
await Task.WhenAll(publishTasks);
await context.SaveChangesAsync().ConfigureAwait(false);
if (_options.CheckForETag)
grainState.ETag = _options.GetETagFunc(entity);
await unitOfWork.CompeleteAsync();
}
}
};
}
else
{
await context.SaveChangesAsync().ConfigureAwait(false);
if (_options.CheckForETag)
grainState.ETag = _options.GetETagFunc(entity);
if (_options.CheckForETag)
grainState.ETag = _options.GetETagFunc(entity);
}
}
catch (DbUpdateConcurrencyException e)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment