Commit c6d46a0b by dingsongjie

添加 更多功能

parent b9422953
......@@ -51,24 +51,15 @@ namespace Backet.Api.Controllers
{
var newId = Guid.NewGuid().ToString("N").ToLower();
backet.Id = newId;
//var entity = await backetDbContext.Backets.AsNoTracking().Include(box => box.BacketItems).SingleOrDefaultAsync(m => m.Id == "222");
////using (NpgsqlConnection conn = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600"))
////{
//// await conn.OpenAsync();
//// var teams = await conn.QueryAsync<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>("SELECT * FROM \"public\".\"Backet\" where \"Id\" =@Id", new { Id = newId });
//// //var teams = await conn.ExecuteAsync("SELECT 1");
////}
var grain = clusterClient.GetGrain<IBacketGrain>(newId);
var grain = clusterClient.GetGrain<IAddBacketGrain>(newId);
return grain.AddBacket(backet);
//return true;
}
[HttpPost("api/backet/UpdateBacket")]
public Task<bool> UpdateBacket()
{
var id = "da8a489fa7b4409294ee1b358fbbfba5";
var id = "67bbf594246441a18d7b6c74a277d06a";
var grain = clusterClient.GetGrain<IBacketGrain>(id);
return grain.UpdateBacket("88");
return grain.UpdateBacket("99");
}
[HttpPost("api/backet/AddItem")]
public Task<bool> AddItem()
......
......@@ -33,7 +33,7 @@ namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
}
public string UserId { get; set; }
public List<BacketItem> BacketItems { get; private set; } = new List<BacketItem>();
public long TotalPrice { get; private set; }
public long TotalPrice { get; set; }
internal void RemoveFirstItem()
{
......
using Orleans;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Grains.Abstraction
{
public interface IAddBacketGrain : IGrainWithStringKey
{
Task<bool> AddBacket(BacketDto backet);
}
}
......@@ -8,7 +8,6 @@ namespace Backet.Api.Grains.Abstraction
{
public interface IBacketGrain: IGrainWithStringKey
{
Task<bool> AddBacket(BacketDto backet);
Task<bool> UpdateBacket(string userId);
Task<bool> AddBacketItem(string productId, string productName, long price);
Task<bool> RemoveFirstItem();
......
using Backet.Api.Grains.Abstraction;
using Backet.Api.Infrastructure;
using Orleans;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Backet.Api.Domain.Event;
using Pole.Core.UnitOfWork;
using Pole.Core.EventBus.Transaction;
using Pole.Core.EventBus;
namespace Backet.Api.Grains
{
public class AddBacketGrain : Grain, IAddBacketGrain
{
public async Task<bool> AddBacket(BacketDto backetDto)
{
using (var scope = ServiceProvider.CreateScope())
{
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
var dbTransactionAdapter = scope.ServiceProvider.GetRequiredService<IDbTransactionAdapter>();
var dbContext = scope.ServiceProvider.GetRequiredService<BacketDbContext>();
var bus = scope.ServiceProvider.GetRequiredService<IBus>();
using (var transaction = await dbContext.Database.BeginTransactionAsync())
{
dbTransactionAdapter.DbTransaction = transaction;
unitOfWork.Enlist(dbTransactionAdapter, bus);
Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet backet = new Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet
{
Id = backetDto.Id,
UserId = backetDto.UserId
};
if (backetDto.BacketItems == null || backetDto.BacketItems.Count == 0) return false;
backetDto.BacketItems.ForEach(item =>
{
backet.AddBacketItem(item.ProductId, item.ProductName, item.Price);
});
dbContext.Backets.Add(backet);
await bus.Publish(new BacketCreatedEvent() { BacketId = backet.Id });
await unitOfWork.CompeleteAsync();
}
return true;
}
}
}
}
using Backet.Api.Domain.Event;
using Backet.Api.Grains.Abstraction;
using Orleans.Providers;
using Pole.Core.Grains;
using System;
using System.Collections.Generic;
......@@ -8,10 +9,12 @@ using System.Threading.Tasks;
namespace Backet.Api.Grains
{
[StorageProvider(ProviderName = "ef")]
public class BacketGrain : PoleGrain<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>, IBacketGrain
{
public async Task<bool> AddBacket(BacketDto backetDto)
{
if (State != null) return false;
Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet backet = new Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet
......@@ -52,6 +55,7 @@ namespace Backet.Api.Grains
{
if (State == null) return false;
State.UserId = userId;
State.TotalPrice++;
State.ModifyItemProductId(userId);
Update(State);
await WriteStateAsync();
......
......@@ -25,7 +25,7 @@ namespace Backet.Api
.UseOrleans(siloBuilder =>
{
siloBuilder.UseLocalhostClustering();
siloBuilder.AddEfGrainStorageAsDefault<BacketDbContext>();
siloBuilder.AddEfGrainStorage<BacketDbContext>("ef");
})
.ConfigureWebHostDefaults(webBuilder =>
{
......
......@@ -27,7 +27,8 @@ namespace Backet.Api
services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql(Configuration["postgres:write"]));
services.AddControllers();
services.AddPole(config => {
services.AddPole(config =>
{
config.AddRabbitMQ(option =>
{
option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
......@@ -40,8 +41,9 @@ namespace Backet.Api
services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>(
options =>
{
options.UseQuery(context => context.Backets.AsNoTracking()
.Include(box => box.BacketItems));
options.UseQuery(context => context.Backets
.Include(box => box.BacketItems), context => context.Backets.AsNoTracking()
.Include(box => box.BacketItems));
options.IsRelatedData = true;
});
}
......
......@@ -109,7 +109,7 @@ namespace Pole.Core.EventBus
}
private async Task ExecuteCore(List<EventEntity> eventEntities)
{
logger.LogError($"Begin ExecuteCore Count:{eventEntities.Count} ");
logger.LogTrace($"Begin ExecuteCore Count:{eventEntities.Count} ");
var events = eventEntities.Select(entity =>
{
var eventContentBytes = Encoding.UTF8.GetBytes(entity.Content);
......@@ -125,9 +125,9 @@ namespace Pole.Core.EventBus
entity.StatusName = nameof(EventStatus.Published);
entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
});
logger.LogError($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} ");
logger.LogTrace($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} ");
await producer.BulkPublish(events);
logger.LogError($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} ");
logger.LogTrace($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} ");
if (eventEntities.Count > 10)
{
await eventStorage.BulkChangePublishStateAsync(eventEntities);
......@@ -137,7 +137,7 @@ namespace Pole.Core.EventBus
await eventStorage.ChangePublishStateAsync(eventEntities);
}
logger.LogError($"End ExecuteCore {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} Count:{eventEntities.Count} ");
logger.LogTrace($"End ExecuteCore {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} Count:{eventEntities.Count} ");
}
}
}
......@@ -60,6 +60,33 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
null,
dbSetPropertyInfo.GetMethod);
// create final delegate which chains dbSet getter and no tracking delegates
return dbSetDelegate;
}
public Func<TContext, IQueryable<TEntity>> CreateDefaultDbSetNoTrackingAccessorFunc<TContext, TEntity>()
where TContext : DbContext
where TEntity : class
{
Type contextType = typeof(TContext);
// Find a dbSet<TEntity> as default
PropertyInfo dbSetPropertyInfo =
contextType
.GetProperties(BindingFlags.Public | BindingFlags.Instance)
.FirstOrDefault(pInfo => pInfo.PropertyType == typeof(DbSet<TEntity>));
if (dbSetPropertyInfo == null)
throw new GrainStorageConfigurationException($"Could not find A property of type \"{typeof(DbSet<TEntity>).FullName}\" " +
$"on context with type \"{typeof(TContext).FullName}\"");
var dbSetDelegate = (Func<TContext, IQueryable<TEntity>>)Delegate.CreateDelegate(
typeof(Func<TContext, IQueryable<TEntity>>),
null,
dbSetPropertyInfo.GetMethod);
// set queries as no tracking
MethodInfo noTrackingMethodInfo = (typeof(GrainStorageConvention).GetMethod(nameof(AsNoTracking))
?? throw new Exception("Impossible"))
......@@ -176,6 +203,101 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
throw new InvalidOperationException($"Unexpected grain type \"{typeof(TGrain).FullName}\"");
}
public Func<TContext, IAddressable, Task<TEntity>> CreatePreCompiledDefaultReadStateNoTrackingFunc<TContext, TGrain, TEntity>(GrainStorageOptions<TContext, TGrain, TEntity> options)
where TContext : DbContext
where TEntity : class
{
if (typeof(IGrainWithGuidKey).IsAssignableFrom(typeof(TGrain)))
{
if (options.GuidKeySelector == null)
throw new GrainStorageConfigurationException($"GuidKeySelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
Func<TContext, Guid, Task<TEntity>> compiledQuery
= CreateCompiledQuery<TContext, TGrain, TEntity, Guid>(options);
return (TContext context, IAddressable grainRef) =>
{
Guid key = grainRef.GetPrimaryKey();
return compiledQuery(context, key);
};
}
if (typeof(IGrainWithGuidCompoundKey).IsAssignableFrom(typeof(TGrain)))
{
if (options.GuidKeySelector == null)
throw new GrainStorageConfigurationException($"GuidKeySelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
if (options.KeyExtSelector == null)
throw new GrainStorageConfigurationException($"KeyExtSelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
Func<TContext, Guid, string, Task<TEntity>> compiledQuery
= CreateCompiledCompoundQuery<TContext, TGrain, TEntity, Guid>(options);
return (TContext context, IAddressable grainRef) =>
{
Guid key = grainRef.GetPrimaryKey(out string keyExt);
return compiledQuery(context, key, keyExt);
};
}
if (typeof(IGrainWithIntegerKey).IsAssignableFrom(typeof(TGrain)))
{
if (options.LongKeySelector == null)
throw new GrainStorageConfigurationException($"LongKeySelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
Func<TContext, long, Task<TEntity>> compiledQuery
= CreateCompiledQuery<TContext, TGrain, TEntity, long>(options);
return (TContext context, IAddressable grainRef) =>
{
long key = grainRef.GetPrimaryKeyLong();
return compiledQuery(context, key);
};
}
if (typeof(IGrainWithIntegerCompoundKey).IsAssignableFrom(typeof(TGrain)))
{
if (options.LongKeySelector == null)
throw new GrainStorageConfigurationException($"LongKeySelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
if (options.KeyExtSelector == null)
throw new GrainStorageConfigurationException($"KeyExtSelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
Func<TContext, long, string, Task<TEntity>> compiledQuery
= CreateCompiledCompoundQuery<TContext, TGrain, TEntity, long>(options);
return (TContext context, IAddressable grainRef) =>
{
long key = grainRef.GetPrimaryKeyLong(out string keyExt);
return compiledQuery(context, key, keyExt);
};
}
if (typeof(IGrainWithStringKey).IsAssignableFrom(typeof(TGrain)))
{
if (options.KeyExtSelector == null)
throw new GrainStorageConfigurationException($"KeyExtSelector is not defined for " +
$"{typeof(GrainStorageOptions<TContext, TGrain, TEntity>).FullName}");
Func<TContext, string, Task<TEntity>> compiledQuery = null;
compiledQuery = CreateCompiledQuery<TContext, TGrain, TEntity, string>(options);
return (TContext context, IAddressable grainRef) =>
{
string keyExt = grainRef.GetPrimaryKeyString();
return compiledQuery(context, keyExt);
};
}
throw new InvalidOperationException($"Unexpected grain type \"{typeof(TGrain).FullName}\"");
}
public virtual Func<TContext, IAddressable, Task<TEntity>>
CreatePreCompiledDefaultReadStateFunc<TContext, TGrain, TEntity>(
......@@ -480,7 +602,36 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
return EF.CompileAsyncQuery(lambdaExpression);
}
private static Func<TContext, TKey, string, Task<TEntity>> CreateCompiledCompoundNoTrackingQuery<TContext, TGrain, TEntity, TKey>(
GrainStorageOptions<TContext, TGrain, TEntity> options)
where TContext : DbContext
where TEntity : class
{
var contextParameter = Expression.Parameter(typeof(TContext), "context");
var keyParameter = Expression.Parameter(typeof(TKey), "grainKey");
var keyExtParameter = Expression.Parameter(typeof(string), "grainKeyExt");
var predicate = CreateCompoundKeyPredicate<TEntity, TKey>(
options,
keyParameter,
keyExtParameter);
var queryable = Expression.Call(
options.DbSetNoTrackingAccessor.Method,
Expression.Constant(options.DbSetNoTrackingAccessor),
contextParameter);
var compiledLambdaBody = Expression.Call(
typeof(Queryable).GetMethods().Single(mi =>
mi.Name == nameof(Queryable.SingleOrDefault) && mi.GetParameters().Count() == 2)
.MakeGenericMethod(typeof(TEntity)),
queryable,
Expression.Quote(predicate));
var lambdaExpression = Expression.Lambda<Func<TContext, TKey, string, TEntity>>(
compiledLambdaBody, contextParameter, keyParameter, keyExtParameter);
return EF.CompileAsyncQuery(lambdaExpression);
}
private static Expression<Func<TEntity, bool>> CreateCompoundKeyPredicate<TEntity, TKey>(
GrainStorageOptions options,
ParameterExpression grainKeyParam,
......@@ -708,6 +859,8 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
return new string(c);
}
#endregion
}
}
......@@ -24,6 +24,18 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
where TContext : DbContext
where TEntity : class;
/// <summary>
/// Creates a method that returns an IQueryable'<typeparam name="TEntity"></typeparam>
/// against <typeparam name="TContext"></typeparam> type.
/// </summary>
/// <typeparam name="TContext"></typeparam>
/// <typeparam name="TEntity"></typeparam>
/// <returns></returns>
Func<TContext, IQueryable<TEntity>>
CreateDefaultDbSetNoTrackingAccessorFunc<TContext, TEntity>()
where TContext : DbContext
where TEntity : class;
Func<TContext, IAddressable, Task<TEntity>>
CreateDefaultReadStateFunc<TContext, TGrain, TEntity>(
GrainStorageOptions<TContext, TGrain, TEntity> options)
......@@ -31,11 +43,17 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
where TEntity : class;
Func<TContext, IAddressable, Task<TEntity>>
CreatePreCompiledDefaultReadStateFunc<TContext, TGrain, TEntity>(
CreatePreCompiledDefaultReadStateNoTrackingFunc<TContext, TGrain, TEntity>(
GrainStorageOptions<TContext, TGrain, TEntity> options)
where TContext : DbContext
where TEntity : class;
Func<TContext, IAddressable, Task<TEntity>>
CreatePreCompiledDefaultReadStateFunc<TContext, TGrain, TEntity>(
GrainStorageOptions<TContext, TGrain, TEntity> options)
where TContext : DbContext
where TEntity : class;
void SetDefaultKeySelectors<TContext, TGrain, TEntity>(
GrainStorageOptions<TContext, TGrain, TEntity> options)
where TContext : DbContext
......@@ -102,6 +120,17 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
Func<TContext, IQueryable<TEntity>>
CreateDefaultDbSetAccessorFunc();
/// <summary>
/// Creates a method that returns an IQueryable'<typeparam name="TGrainState"></typeparam>
/// against <typeparam name="TContext"></typeparam> type.
/// </summary>
/// <typeparam name="TContext"></typeparam>
/// <typeparam name="TGrainState"></typeparam>
/// <returns></returns>
Func<TContext, IQueryable<TEntity>>
CreateDefaultDbSetNoTrackingAccessorFunc();
/// <summary>
/// Creates a method that generates an expression to be used by entity framework to
/// fetch a single state.
......@@ -116,6 +145,9 @@ namespace Pole.Orleans.Provider.EntityframeworkCore.Conventions
Func<TContext, IAddressable, Task<TEntity>> CreatePreCompiledDefaultReadStateFunc(
GrainStorageOptions<TContext, TGrain, TEntity> options);
Func<TContext, IAddressable, Task<TEntity>> CreatePreCompiledDefaultReadStateNoTrackingFunc(
GrainStorageOptions<TContext, TGrain, TEntity> options);
void SetDefaultKeySelector(GrainStorageOptions<TContext, TGrain, TEntity> options);
Action<IGrainState, TEntity> GetSetterFunc();
......
......@@ -13,11 +13,12 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
{
public static GrainStorageOptions<TContext, TGrain, TGrainState> UseQuery<TContext, TGrain, TGrainState>(
this GrainStorageOptions<TContext, TGrain, TGrainState> options,
Func<TContext, IQueryable<TGrainState>>queryFunc)
Func<TContext, IQueryable<TGrainState>>queryFunc, Func<TContext, IQueryable<TGrainState>> noTrackingQueryFunc)
where TContext : DbContext
where TGrainState : class
{
options.DbSetAccessor = queryFunc;
options.DbSetNoTrackingAccessor = noTrackingQueryFunc;
return options;
}
......
......@@ -52,7 +52,7 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
using (IServiceScope scope = _scopeFactory.CreateScope())
using (var context = scope.ServiceProvider.GetRequiredService<TContext>())
{
TEntity entity = await _options.ReadStateAsync(context, grainReference)
TEntity entity = await _options.ReadStateNoTrackingAsync(context, grainReference)
.ConfigureAwait(false);
_options.SetEntity(grainState, entity);
......@@ -97,7 +97,7 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
try
{
if (entity.DomainEvents.Count != 0)
if (entity.DomainEvents!=null&&entity.DomainEvents.Count != 0)
{
using (var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>())
{
......
......@@ -40,6 +40,8 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
{
internal Func<TContext, IQueryable<TEntity>> DbSetAccessor { get; set; }
internal Func<TContext, IQueryable<TEntity>> DbSetNoTrackingAccessor { get; set; }
internal Func<TEntity, bool> IsPersistedFunc { get; set; }
internal Func<TEntity, string> GetETagFunc { get; set; }
......@@ -53,6 +55,7 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
public bool IsRelatedData { get; set; }
internal Func<TContext, IAddressable, Task<TEntity>> ReadStateAsync { get; set; }
internal Func<TContext, IAddressable, Task<TEntity>> ReadStateNoTrackingAsync { get; set; }
internal Action<IGrainState, TEntity> SetEntity { get; set; }
......
......@@ -69,6 +69,32 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
.CreateDefaultReadStateFunc(options);
}
}
if (options.ReadStateNoTrackingAsync == null)
{
if (options.DbSetNoTrackingAccessor == null)
options.DbSetNoTrackingAccessor = Convention?.CreateDefaultDbSetNoTrackingAccessorFunc()
?? DefaultConvention.CreateDefaultDbSetNoTrackingAccessorFunc<TContext, TEntity>();
if (Convention != null)
Convention.SetDefaultKeySelector(options);
else
DefaultConvention.SetDefaultKeySelectors(options);
if (options.PreCompileReadQuery)
{
options.ReadStateNoTrackingAsync
= Convention?.CreatePreCompiledDefaultReadStateNoTrackingFunc(options)
?? DefaultConvention
.CreatePreCompiledDefaultReadStateNoTrackingFunc(options);
}
else
{
options.ReadStateNoTrackingAsync
= Convention?.CreateDefaultReadStateFunc()
?? DefaultConvention
.CreateDefaultReadStateFunc(options);
}
}
if (options.SetEntity == null)
options.SetEntity =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment