Commit 3ac920dd by dingsongjie

完成 基本测试

parent cedd3b9e
Showing with 287 additions and 295 deletions
...@@ -17,14 +17,18 @@ ...@@ -17,14 +17,18 @@
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="Microsoft.Orleans.Client" Version="3.0.2" /> <PackageReference Include="Microsoft.Orleans.Client" Version="3.1.0" />
<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.0.2"> <!--<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.1.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>-->
<PackageReference Include="Microsoft.Orleans.Core" Version="3.1.0" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator.Build" Version="3.1.0">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="Microsoft.Orleans.Core" Version="3.0.2" /> <PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="3.1.0" />
<PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="3.0.2" /> <PackageReference Include="Microsoft.Orleans.Server" Version="3.1.0" />
<PackageReference Include="Microsoft.Orleans.Server" Version="3.0.2" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="3.1.1" /> <PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="3.1.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
......
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks; using System.Threading.Tasks;
using Backet.Api.EventHandlers.Abstraction;
using Backet.Api.Grains.Abstraction; using Backet.Api.Grains.Abstraction;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Orleans; using Orleans;
using Pole.Core.EventBus;
using Pole.Core.EventBus.EventHandler;
namespace Backet.Api.Controllers namespace Backet.Api.Controllers
{ {
...@@ -21,10 +25,20 @@ namespace Backet.Api.Controllers ...@@ -21,10 +25,20 @@ namespace Backet.Api.Controllers
[HttpPost("api/backet/AddBacket")] [HttpPost("api/backet/AddBacket")]
public Task<bool> AddBacket([FromBody]Backet.Api.Grains.Abstraction.BacketDto backet) public Task<bool> AddBacket([FromBody]Backet.Api.Grains.Abstraction.BacketDto backet)
{ {
var newId = "da8a489fa7b44092eeeeeee"; var newId = Guid.NewGuid().ToString("N").ToLower();
backet.Id = newId; backet.Id = newId;
var grain = clusterClient.GetGrain<IBacketGrain>(newId); var grain = clusterClient.GetGrain<IBacketGrain>(newId);
return grain.AddBacket(backet); return grain.AddBacket(backet);
//var clientType = typeof(IClusterClient);
//var clientParams = Expression.Parameter(clientType, "client");
//var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
//var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
//var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
//var body = Expression.Call(method.MakeGenericMethod(typeof(IToNoticeBacketCreatedEventHandler)), clientParams, primaryKeyParams, grainClassNamePrefixParams);
//var func = Expression.Lambda<Func<IClusterClient, string, string, IPoleEventHandler>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
//var handler = func(clusterClient, newId, null);
//await handler.Invoke(null);
//return true;
} }
[HttpPost("api/backet/UpdateBacket")] [HttpPost("api/backet/UpdateBacket")]
public Task<bool> UpdateBacket() public Task<bool> UpdateBacket()
...@@ -38,13 +52,13 @@ namespace Backet.Api.Controllers ...@@ -38,13 +52,13 @@ namespace Backet.Api.Controllers
{ {
var id = "da8a489fa7b4409294ee1b358fbbfba5"; var id = "da8a489fa7b4409294ee1b358fbbfba5";
var grain = clusterClient.GetGrain<IBacketGrain>(id); var grain = clusterClient.GetGrain<IBacketGrain>(id);
return grain.AddBacketItem("55","测试3",1000); return grain.AddBacketItem("55", "测试3", 1000);
} }
[HttpPost("api/backet/RemoveFirstItem")] [HttpPost("api/backet/RemoveFirstItem")]
public Task<bool> RemoveFirstItem() public Task<bool> RemoveFirstItem()
{ {
var id = "da8a489fa7b4409294ee1b358fbbfba5"; var id = "da8a489fa7b4409294ee1b358fbbfba5";
var grain = clusterClient.GetGrain<IBacketGrain>(id); var grain = clusterClient.GetGrain<IBacketGrain>(id);
return grain.RemoveFirstItem(); return grain.RemoveFirstItem();
} }
} }
......
using Backet.Api.Domain.Event;
using Pole.Core.EventBus.EventHandler;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.EventHandlers.Abstraction
{
public interface IToNoticeBacketCreatedEventHandler : IPoleBulkEventsHandler<BacketCreatedEvent>, IPoleEventHandler<BacketCreatedEvent>
{
}
}
using Backet.Api.Domain.Event; using Backet.Api.Domain.Event;
using Backet.Api.EventHandlers.Abstraction;
using Pole.Core.EventBus.EventHandler; using Pole.Core.EventBus.EventHandler;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
...@@ -7,8 +8,14 @@ using System.Threading.Tasks; ...@@ -7,8 +8,14 @@ using System.Threading.Tasks;
namespace Backet.Api.EventHandlers namespace Backet.Api.EventHandlers
{ {
public class ToNoticeBacketCreatedEventHandler : PoleEventHandler [EventHandler(EventName = "Backet.Api.Domain.Event.BacketCreatedEvent")]
public class ToNoticeBacketCreatedEventHandler : PoleEventHandler<BacketCreatedEvent>, IToNoticeBacketCreatedEventHandler
{ {
public Task BulkEventsHandle(List<BacketCreatedEvent> @event)
{
return Task.CompletedTask;
}
public Task EventHandle(BacketCreatedEvent @event) public Task EventHandle(BacketCreatedEvent @event)
{ {
return Task.CompletedTask; return Task.CompletedTask;
......
...@@ -20,14 +20,14 @@ namespace Backet.Api ...@@ -20,14 +20,14 @@ namespace Backet.Api
public static IHostBuilder CreateHostBuilder(string[] args) => public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args) Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
})
.UseOrleans(siloBuilder => .UseOrleans(siloBuilder =>
{ {
siloBuilder.UseLocalhostClustering(); siloBuilder.UseLocalhostClustering();
siloBuilder.AddEfGrainStorageAsDefault<BacketDbContext>(); siloBuilder.AddEfGrainStorageAsDefault<BacketDbContext>();
}); })
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
} }
} }
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
}, },
"ServiceName": "Backet", "ServiceName": "Backet",
"RabbitmqConfig": { "RabbitmqConfig": {
"HostAddress": "rabbitmq://192.168.0.41/", "HostAddress": "192.168.0.41",
"HostUserName": "test", "HostUserName": "test",
"HostPassword": "test" "HostPassword": "test"
} }
......
...@@ -10,7 +10,7 @@ namespace Pole.Core ...@@ -10,7 +10,7 @@ namespace Pole.Core
public static ValueTask ValueTaskDone = new ValueTask(); public static ValueTask ValueTaskDone = new ValueTask();
public const string ConsumerRetryTimesStr = "pole-consumer-retry-times"; public const string ConsumerRetryTimesStr = "pole-consumer-retry-times";
public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details"; public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details";
public const string EventHandlerMethodName = "EventHandle"; public const string EventHandlerMethodName = "EventHandler";
public const string BatchEventsHandlerMethodName = "BatchEventsHandler"; public const string BatchEventsHandlerMethodName = "BatchEventsHandler";
} }
} }
...@@ -8,6 +8,5 @@ namespace Pole.Core.EventBus.EventHandler ...@@ -8,6 +8,5 @@ namespace Pole.Core.EventBus.EventHandler
public class EventHandlerAttribute: Attribute public class EventHandlerAttribute: Attribute
{ {
public string EventName { get; set; } public string EventName { get; set; }
public string EventHandlerName { get; set; }
} }
} }
using Orleans; using Orleans;
using Orleans.Concurrency;
using Pole.Core.EventBus.Event; using Pole.Core.EventBus.Event;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
...@@ -8,9 +7,17 @@ using System.Threading.Tasks; ...@@ -8,9 +7,17 @@ using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventHandler namespace Pole.Core.EventBus.EventHandler
{ {
public abstract class PoleEventHandlerBase : Grain public interface IPoleEventHandler<TEvent> : IPoleEventHandler
{ {
public abstract Task Invoke(EventBytesTransport transport); Task EventHandle(TEvent @event);
public abstract Task Invoke(List<EventBytesTransport> transports); }
public interface IPoleBulkEventsHandler<TEvent> : IPoleEventHandler
{
Task BulkEventsHandle(List<TEvent> events);
}
public interface IPoleEventHandler : IGrainWithStringKey
{
public Task Invoke(EventBytesTransport transport);
public Task Invoke(List<EventBytesTransport> transports);
} }
} }
...@@ -12,13 +12,14 @@ using System.Reflection.Emit; ...@@ -12,13 +12,14 @@ using System.Reflection.Emit;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Linq; using System.Linq;
using Pole.Core.Exceptions; using Pole.Core.Exceptions;
using Orleans;
namespace Pole.Core.EventBus.EventHandler namespace Pole.Core.EventBus.EventHandler
{ {
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
public class PoleEventHandler : PoleEventHandlerBase public abstract class PoleEventHandler<TEvent> : Grain
{ {
private IEventTypeFinder eventTypeFinder; private IEventTypeFinder eventTypeFinder;
private ISerializer serializer; private ISerializer serializer;
...@@ -28,12 +29,11 @@ namespace Pole.Core.EventBus.EventHandler ...@@ -28,12 +29,11 @@ namespace Pole.Core.EventBus.EventHandler
public PoleEventHandler() public PoleEventHandler()
{ {
grainType = GetType(); grainType = GetType();
DependencyInjection();
} }
public override async Task OnActivateAsync() public override async Task OnActivateAsync()
{ {
await DependencyInjection();
await base.OnActivateAsync(); await base.OnActivateAsync();
await DependencyInjection();
} }
protected virtual Task DependencyInjection() protected virtual Task DependencyInjection()
{ {
...@@ -44,59 +44,52 @@ namespace Pole.Core.EventBus.EventHandler ...@@ -44,59 +44,52 @@ namespace Pole.Core.EventBus.EventHandler
return Task.CompletedTask; return Task.CompletedTask;
} }
public override Task Invoke(EventBytesTransport transport) public Task Invoke(EventBytesTransport transport)
{ {
var eventType = eventTypeFinder.FindType(transport.EventTypeCode); var eventType = eventTypeFinder.FindType(transport.EventTypeCode);
var method = typeof(ClusterClientExtensions).GetMethod(Consts.EventHandlerMethodName, new Type[] { eventType });
if (method == null) var eventObj = serializer.Deserialize(transport.EventBytes, eventType);
if (this is IPoleEventHandler<TEvent> handler)
{ {
throw new EventHandlerTargetMethodNotFoundException(Consts.EventHandlerMethodName, eventType.Name); var result = handler.EventHandle((TEvent)eventObj);
logger.LogTrace($"{nameof(PoleEventHandler<TEvent>)} Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObj));
return result;
}
else
{
throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName);
} }
var data = serializer.Deserialize(transport.EventBytes, eventType);
var eventHandlerType = this.GetType();
var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
var eventObjectParams = Expression.Parameter(typeof(object), "event");
var eventParams = Expression.Convert(eventObjectParams, eventType);
var body = Expression.Call(method, eventHandlerParams, eventParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
var result = func(this, data);
logger.LogTrace("Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(data));
return result;
} }
public override Task Invoke(List<EventBytesTransport> transports) public async Task Invoke(List<EventBytesTransport> transports)
{ {
if (transports.Count() != 0) if (transports.Count() != 0)
{ {
var firstTransport = transports.First(); var firstTransport = transports.First();
var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode); var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode);
var method = typeof(ClusterClientExtensions).GetMethod(Consts.BatchEventsHandlerMethodName, new Type[] { eventType }); var eventObjs = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).Select(@event => (TEvent)@event).ToList();
if (method == null) if (this is IPoleBulkEventsHandler<TEvent> batchHandler)
{ {
var tasks = transports.Select(transport => Invoke(transport)); await batchHandler.BulkEventsHandle(eventObjs);
return Task.WhenAll(tasks); logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs));
return;
}
else if (this is IPoleEventHandler<TEvent> handler)
{
var handleTasks = eventObjs.Select(m => handler.EventHandle(m));
await Task.WhenAll(handleTasks);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs));
return;
}
else
{
throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName);
} }
var datas = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).ToList();
var eventHandlerType = this.GetType();
var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
var eventObjectParams = Expression.Parameter(typeof(object), "events");
var eventsType = typeof(List<>).MakeGenericType(eventType);
var eventsParams = Expression.Convert(eventObjectParams, eventsType);
var body = Expression.Call(method, eventHandlerParams, eventsParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
var result = func(this, datas);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(datas));
return result;
} }
else else
{ {
if (logger.IsEnabled(LogLevel.Information)) if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed"); logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed");
return Task.CompletedTask;
} }
} }
} }
......
...@@ -6,6 +6,6 @@ namespace Pole.Core.EventBus ...@@ -6,6 +6,6 @@ namespace Pole.Core.EventBus
public interface IProducerContainer public interface IProducerContainer
{ {
ValueTask<IProducer> GetProducer<T>(); ValueTask<IProducer> GetProducer<T>();
ValueTask<IProducer> GetProducer(Type type); ValueTask<IProducer> GetProducer(string typeName);
} }
} }
...@@ -53,7 +53,7 @@ namespace Pole.Core.EventBus ...@@ -53,7 +53,7 @@ namespace Pole.Core.EventBus
public void Observer() public void Observer()
{ {
if (!typeof(PoleEventHandlerBase).IsAssignableFrom(EventHandlerType)) if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType))
throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler"); throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler");
eventHandler = EventHandler; eventHandler = EventHandler;
batchEventHandler = BatchEventHandler; batchEventHandler = BatchEventHandler;
...@@ -90,8 +90,8 @@ namespace Pole.Core.EventBus ...@@ -90,8 +90,8 @@ namespace Pole.Core.EventBus
return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports); return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports);
} }
} }
static readonly ConcurrentDictionary<Type, Func<IClusterClient, string, string, PoleEventHandlerBase>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, string, string, PoleEventHandlerBase>>(); static readonly ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>>();
private PoleEventHandlerBase GetObserver(Type ObserverType, string primaryKey) private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey)
{ {
var func = _observerGeneratorDict.GetOrAdd(ObserverType, key => var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
{ {
...@@ -101,7 +101,7 @@ namespace Pole.Core.EventBus ...@@ -101,7 +101,7 @@ namespace Pole.Core.EventBus
var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix"); var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) }); var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams); var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
return Expression.Lambda<Func<IClusterClient, string, string, PoleEventHandlerBase>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); return Expression.Lambda<Func<IClusterClient, string, string, IPoleEventHandler>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
}); });
return func(clusterClient, primaryKey, null); return func(clusterClient, primaryKey, null);
} }
......
...@@ -19,15 +19,18 @@ namespace Pole.Core.EventBus ...@@ -19,15 +19,18 @@ namespace Pole.Core.EventBus
var eventHandlerList = new List<(Type, EventHandlerAttribute)>(); var eventHandlerList = new List<(Type, EventHandlerAttribute)>();
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<ObserverUnitContainer>>())) foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<ObserverUnitContainer>>()))
{ {
foreach (var type in assembly.GetTypes()) foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{ {
foreach (var attribute in type.GetCustomAttributes(false)) var attribute = type.GetCustomAttributes(typeof(EventHandlerAttribute), false).FirstOrDefault();
var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
if (attribute != null)
{
eventHandlerList.Add((eventHandlerInterface, (EventHandlerAttribute)attribute));
}
else
{ {
if (attribute is EventHandlerAttribute eventHandlerAttribute) throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler");
{
eventHandlerList.Add((type, eventHandlerAttribute));
break;
}
} }
} }
} }
...@@ -42,14 +45,7 @@ namespace Pole.Core.EventBus ...@@ -42,14 +45,7 @@ namespace Pole.Core.EventBus
public List<IObserverUnit<PrimaryKey>> GetUnits<PrimaryKey>(string observerName) public List<IObserverUnit<PrimaryKey>> GetUnits<PrimaryKey>(string observerName)
{ {
if (unitDict.TryGetValue(observerName, out var units)) if (unitDict.TryGetValue(observerName, out var units))
{ return units.Select(m => (IObserverUnit<PrimaryKey>)m).ToList();
if (units is List<IObserverUnit<PrimaryKey>> result)
{
return result;
}
else
throw new UnmatchObserverUnitException(observerName);
}
else else
throw new UnfindObserverUnitException(observerName); throw new UnfindObserverUnitException(observerName);
} }
......
...@@ -4,9 +4,9 @@ using System.Text; ...@@ -4,9 +4,9 @@ using System.Text;
namespace Pole.Core.Exceptions namespace Pole.Core.Exceptions
{ {
public class EventHandlerTargetMethodNotFoundException: Exception public class EventHandlerImplementedNotRightException: Exception
{ {
public EventHandlerTargetMethodNotFoundException(string methodName,string eventTypeName):base($"EventHandler method:{methodName} not found when eventHandler invoke , eventType:{eventTypeName}") public EventHandlerImplementedNotRightException(string methodName,string eventTypeName,string eventHandlerName):base($"EventHandler method:{methodName} errors, when eventHandler: {eventHandlerName} invoke , eventType:{eventTypeName}")
{ {
} }
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Exceptions
{
public class PoleEventHandlerImplementException : Exception
{
public PoleEventHandlerImplementException(string message) : base(message)
{
}
}
}
...@@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Builder ...@@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Builder
{ {
public static IApplicationBuilder UsePole(this IApplicationBuilder applicationBuilder) public static IApplicationBuilder UsePole(this IApplicationBuilder applicationBuilder)
{ {
Startup.StartRay(applicationBuilder.ApplicationServices); Startup.StartPole(applicationBuilder.ApplicationServices).GetAwaiter().GetResult();
return applicationBuilder; return applicationBuilder;
} }
} }
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.0.2" /> <PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.0.2" />
<PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.0.2" /> <PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.0.2" />
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" /> <PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Include="System.Text.Json" Version="5.0.0-preview.2.20120.8" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" /> <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" /> <PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" />
</ItemGroup> </ItemGroup>
......
...@@ -18,18 +18,16 @@ namespace Pole.Core.Processor ...@@ -18,18 +18,16 @@ namespace Pole.Core.Processor
private readonly IEventStorage eventStorage; private readonly IEventStorage eventStorage;
private readonly PoleOptions options; private readonly PoleOptions options;
private readonly IProducerContainer producerContainer; private readonly IProducerContainer producerContainer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer; private readonly ISerializer serializer;
private readonly ILogger<PendingMessageRetryProcessor> logger; private readonly ILogger<PendingMessageRetryProcessor> logger;
private readonly ProducerOptions producerOptions; private readonly ProducerOptions producerOptions;
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger, public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger,
IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions<ProducerOptions> producerOptions) IProducerContainer producerContainer, ISerializer serializer, IOptions<ProducerOptions> producerOptions)
{ {
this.eventStorage = eventStorage; this.eventStorage = eventStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected");
this.logger = logger; this.logger = logger;
this.producerContainer = producerContainer; this.producerContainer = producerContainer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer; this.serializer = serializer;
this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected");
} }
...@@ -62,7 +60,6 @@ namespace Pole.Core.Processor ...@@ -62,7 +60,6 @@ namespace Pole.Core.Processor
} }
foreach (var pendingMessage in pendingMessages) foreach (var pendingMessage in pendingMessages)
{ {
var eventType = eventTypeFinder.FindType(pendingMessage.Name);
var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content); var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content);
var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes(); var bytes = bytesTransport.GetBytes();
...@@ -71,12 +68,15 @@ namespace Pole.Core.Processor ...@@ -71,12 +68,15 @@ namespace Pole.Core.Processor
pendingMessage.ExpiresAt = DateTime.UtcNow; pendingMessage.ExpiresAt = DateTime.UtcNow;
} }
pendingMessage.Retries++; pendingMessage.Retries++;
var producer = await producerContainer.GetProducer(eventType); var producer = await producerContainer.GetProducer(pendingMessage.Name);
await producer.Publish(bytes); await producer.Publish(bytes);
pendingMessage.StatusName = nameof(EventStatus.Published); pendingMessage.StatusName = nameof(EventStatus.Published);
pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
} }
await eventStorage.BulkChangePublishStateAsync(pendingMessages); if (pendingMessages.Count() > 0)
{
await eventStorage.BulkChangePublishStateAsync(pendingMessages);
}
} }
} }
} }
...@@ -3,13 +3,14 @@ using System.Collections.Generic; ...@@ -3,13 +3,14 @@ using System.Collections.Generic;
using System.Text; using System.Text;
using System.Text.Encodings.Web; using System.Text.Encodings.Web;
using System.Text.Json; using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Unicode; using System.Text.Unicode;
namespace Pole.Core.Serialization namespace Pole.Core.Serialization
{ {
public class DefaultJsonSerializer : ISerializer public class DefaultJsonSerializer : ISerializer
{ {
static readonly JsonSerializerOptions options = new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) }; static readonly JsonSerializerOptions options = new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All), MaxDepth = 5, ReferenceHandling = ReferenceHandling.Preserve, WriteIndented =true};
public T Deserialize<T>(string json) where T : class, new() public T Deserialize<T>(string json) where T : class, new()
{ {
return JsonSerializer.Deserialize<T>(json); return JsonSerializer.Deserialize<T>(json);
......
...@@ -6,6 +6,7 @@ using Pole.Core.Utils; ...@@ -6,6 +6,7 @@ using Pole.Core.Utils;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Text; using System.Text;
namespace Pole.Core.Serialization namespace Pole.Core.Serialization
...@@ -21,16 +22,13 @@ namespace Pole.Core.Serialization ...@@ -21,16 +22,13 @@ namespace Pole.Core.Serialization
var baseEventType = typeof(IEvent); var baseEventType = typeof(IEvent);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{ {
foreach (var type in assembly.GetTypes()) foreach (var type in assembly.GetTypes().Where(m => baseEventType.IsAssignableFrom(m)&&!m.IsAbstract))
{ {
if (baseEventType.IsAssignableFrom(type)) typeDict.TryAdd(type, type.FullName);
{
typeDict.TryAdd(type, type.FullName);
if (!codeDict.TryAdd(type.FullName, type)) if (!codeDict.TryAdd(type.FullName, type))
{ {
throw new TypeCodeRepeatedException(type.FullName, type.FullName); throw new TypeCodeRepeatedException(type.FullName, type.FullName);
}
} }
} }
} }
......
...@@ -13,7 +13,7 @@ namespace Pole.Core ...@@ -13,7 +13,7 @@ namespace Pole.Core
{ {
tasks.Add(new StartupTask(sortIndex, method)); tasks.Add(new StartupTask(sortIndex, method));
} }
internal static Task StartRay(IServiceProvider serviceProvider) internal static Task StartPole(IServiceProvider serviceProvider)
{ {
tasks = tasks.OrderBy(func => func.SortIndex).ToList(); tasks = tasks.OrderBy(func => func.SortIndex).ToList();
return Task.WhenAll(tasks.Select(value => value.Func(serviceProvider))); return Task.WhenAll(tasks.Select(value => value.Func(serviceProvider)));
......
...@@ -45,7 +45,7 @@ namespace Pole.Core.UnitOfWork ...@@ -45,7 +45,7 @@ namespace Pole.Core.UnitOfWork
var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content); var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content);
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes(); var bytes = bytesTransport.GetBytes();
var producer = await producerContainer.GetProducer(eventType); var producer = await producerContainer.GetProducer(@event.Name);
await producer.Publish(bytes); await producer.Publish(bytes);
@event.StatusName = nameof(EventStatus.Published); @event.StatusName = nameof(EventStatus.Published);
@event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); @event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
......
using RabbitMQ.Client; using RabbitMQ.Client;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
......
...@@ -3,6 +3,7 @@ using Pole.Core; ...@@ -3,6 +3,7 @@ using Pole.Core;
using Pole.Core.Exceptions; using Pole.Core.Exceptions;
using RabbitMQ.Client; using RabbitMQ.Client;
using System; using System;
using System.Collections.Generic;
namespace Pole.EventBus.RabbitMQ namespace Pole.EventBus.RabbitMQ
{ {
...@@ -23,11 +24,38 @@ namespace Pole.EventBus.RabbitMQ ...@@ -23,11 +24,38 @@ namespace Pole.EventBus.RabbitMQ
var consumeRetryTimesStr = consumeRetryTimes.ToString(); var consumeRetryTimesStr = consumeRetryTimes.ToString();
persistentProperties = Model.CreateBasicProperties(); persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true; persistentProperties.Persistent = true;
persistentProperties.Headers = new Dictionary<string, object>();
persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr); persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
noPersistentProperties = Model.CreateBasicProperties(); noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false; noPersistentProperties.Persistent = false;
noPersistentProperties.Headers = new Dictionary<string, object>();
noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr); noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
} }
public void Publish(byte[] msg, IDictionary<string, object> headers, string exchange, string routingKey, bool persistent = true)
{
if (persistent)
{
persistentProperties.Headers = headers;
}
else
{
noPersistentProperties.Headers = headers;
}
Model.ConfirmSelect();
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
{
if (isTimeout)
{
throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds);
}
else
{
throw new ProducerReceivedNAckException();
}
}
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{ {
Model.ConfirmSelect(); Model.ConfirmSelect();
......
...@@ -14,6 +14,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -14,6 +14,7 @@ namespace Pole.EventBus.RabbitMQ
options = config.Value; options = config.Value;
connectionFactory = new ConnectionFactory connectionFactory = new ConnectionFactory
{ {
Port=options.Port,
UserName = options.UserName, UserName = options.UserName,
Password = options.Password, Password = options.Password,
VirtualHost = options.VirtualHost, VirtualHost = options.VirtualHost,
......
...@@ -7,7 +7,8 @@ namespace Pole.EventBus.RabbitMQ ...@@ -7,7 +7,8 @@ namespace Pole.EventBus.RabbitMQ
{ {
public string UserName { get; set; } public string UserName { get; set; }
public string Password { get; set; } public string Password { get; set; }
public string VirtualHost { get; set; } public string VirtualHost { get; set; } = "/";
public int Port { get; set; } = 5672;
public int MasChannelsPerConnection { get; set; } = 200; public int MasChannelsPerConnection { get; set; } = 200;
/// <summary> /// <summary>
/// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力 /// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力
......
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.EventBus.RabbitMQ
{
public class ConsumerManager : IHostedService, IDisposable
{
readonly ILogger<ConsumerManager> logger;
readonly IRabbitMQClient client;
readonly IRabbitEventBusContainer rabbitEventBusContainer;
readonly IServiceProvider provider;
readonly IGrainFactory grainFactory;
const int _HoldTime = 20 * 1000;
const int _MonitTime = 60 * 2 * 1000;
const int _checkTime = 10 * 1000;
public ConsumerManager(
ILogger<ConsumerManager> logger,
IRabbitMQClient client,
IGrainFactory grainFactory,
IServiceProvider provider,
IRabbitEventBusContainer rabbitEventBusContainer)
{
this.provider = provider;
this.client = client;
this.logger = logger;
this.rabbitEventBusContainer = rabbitEventBusContainer;
this.grainFactory = grainFactory;
}
private readonly ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
private ConcurrentDictionary<string, long> Runners { get; } = new ConcurrentDictionary<string, long>();
private Timer HeathCheckTimer { get; set; }
private Timer DistributedMonitorTime { get; set; }
private Timer DistributedHoldTimer { get; set; }
const int lockHoldingSeconds = 60;
int distributedHoldTimerLock = 0;
int heathCheckTimerLock = 0;
private async Task Start()
{
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
var queue = value.QueueInfo;
var key = queue.Queue;
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
}
private async Task DistributedHold()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is holding.");
if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
{
foreach (var lockKV in Runners)
{
if (Runners.TryGetValue(lockKV.Key, out var lockId))
{
var holdResult = await grainFactory.GetGrain<IWeightHoldLock>(lockKV.Key).Hold(lockId, lockHoldingSeconds);
if (!holdResult)
{
if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
{
runner.Close();
}
Runners.TryRemove(lockKV.Key, out var _);
}
}
}
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
private async Task HeathCheck()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is checking.");
if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
{
await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is starting.");
DistributedMonitorTime = new Timer(state => Start().Wait(), null, 1000, _MonitTime);
DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is stopping.");
Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is disposing.");
foreach (var runner in ConsumerRunners.Values)
{
runner.Close();
}
DistributedMonitorTime?.Dispose();
DistributedHoldTimer?.Dispose();
HeathCheckTimer?.Dispose();
}
}
}
...@@ -9,6 +9,8 @@ using System.Linq; ...@@ -9,6 +9,8 @@ using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Pole.Core; using Pole.Core;
using Pole.Core.Serialization; using Pole.Core.Serialization;
using Microsoft.Extensions.Options;
using System.Text;
namespace Pole.EventBus.RabbitMQ namespace Pole.EventBus.RabbitMQ
{ {
...@@ -16,11 +18,13 @@ namespace Pole.EventBus.RabbitMQ ...@@ -16,11 +18,13 @@ namespace Pole.EventBus.RabbitMQ
{ {
readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel; readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel;
readonly ISerializer serializer; readonly ISerializer serializer;
readonly RabbitOptions rabbitOptions;
public ConsumerRunner( public ConsumerRunner(
IRabbitMQClient client, IRabbitMQClient client,
IServiceProvider provider, IServiceProvider provider,
RabbitConsumer consumer, RabbitConsumer consumer,
QueueInfo queue) QueueInfo queue,
RabbitOptions rabbitOptions)
{ {
Client = client; Client = client;
Logger = provider.GetService<ILogger<ConsumerRunner>>(); Logger = provider.GetService<ILogger<ConsumerRunner>>();
...@@ -29,6 +33,9 @@ namespace Pole.EventBus.RabbitMQ ...@@ -29,6 +33,9 @@ namespace Pole.EventBus.RabbitMQ
mpscChannel.BindConsumer(BatchExecuter); mpscChannel.BindConsumer(BatchExecuter);
Consumer = consumer; Consumer = consumer;
Queue = queue; Queue = queue;
this.rabbitOptions = rabbitOptions;
} }
public ILogger<ConsumerRunner> Logger { get; } public ILogger<ConsumerRunner> Logger { get; }
public IRabbitMQClient Client { get; } public IRabbitMQClient Client { get; }
...@@ -45,28 +52,22 @@ namespace Pole.EventBus.RabbitMQ ...@@ -45,28 +52,22 @@ namespace Pole.EventBus.RabbitMQ
if (isFirst) if (isFirst)
{ {
isFirst = false; isFirst = false;
Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true); Model.Model.ExchangeDeclare($"{rabbitOptions.Prefix}{Consumer.EventBus.Exchange}", "direct", true);
Model.Model.ExchangeDeclare(Queue.Queue, "direct", true); Model.Model.ExchangeDeclare(Queue.Queue, "direct", true);
Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue, string.Empty); Model.Model.ExchangeBind(Queue.Queue, $"{rabbitOptions.Prefix}{Consumer.EventBus.Exchange}", string.Empty);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null); Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty); Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty);
} }
Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false); Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false);
BasicConsumer = new EventingBasicConsumer(Model.Model); BasicConsumer = new EventingBasicConsumer(Model.Model);
BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea); BasicConsumer.Received += async (ch, ea) =>
{
await mpscChannel.WriteAsync(ea);
};
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer); BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer);
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task HeathCheck()
{
if (IsUnAvailable)
{
Close();
return Run();
}
else
return Task.CompletedTask;
}
private async Task BatchExecuter(List<BasicDeliverEventArgs> list) private async Task BatchExecuter(List<BasicDeliverEventArgs> list)
{ {
if (list.Count == 1) if (list.Count == 1)
...@@ -88,6 +89,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -88,6 +89,7 @@ namespace Pole.EventBus.RabbitMQ
{ {
await ProcessComsumerErrors(item, exception); await ProcessComsumerErrors(item, exception);
} }
return;
} }
} }
if (!Consumer.Config.AutoAck) if (!Consumer.Config.AutoAck)
...@@ -108,6 +110,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -108,6 +110,7 @@ namespace Pole.EventBus.RabbitMQ
if (Consumer.Config.Reenqueue) if (Consumer.Config.Reenqueue)
{ {
await ProcessComsumerErrors(ea, exception); await ProcessComsumerErrors(ea, exception);
return;
} }
} }
if (!Consumer.Config.AutoAck) if (!Consumer.Config.AutoAck)
...@@ -120,15 +123,18 @@ namespace Pole.EventBus.RabbitMQ ...@@ -120,15 +123,18 @@ namespace Pole.EventBus.RabbitMQ
{ {
if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj))
{ {
var retryTimes = Convert.ToInt32(retryTimesObj); var retryTimesStr = Encoding.UTF8.GetString((byte[])retryTimesObj);
if (retryTimes <= Consumer.Config.MaxReenqueueTimes) var retryTimes = Convert.ToInt32(retryTimesStr);
if (retryTimes < Consumer.Config.MaxReenqueueTimes)
{ {
retryTimes++; retryTimes++;
ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes; ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes.ToString();
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception)); ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException?.Message + exception.StackTrace ?? exception.Message + exception.StackTrace;
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
{ {
Model.Model.BasicReject(ea.DeliveryTag, true); using var channel = Client.PullChannel();
channel.Publish(ea.Body, ea.BasicProperties.Headers, Queue.Queue, string.Empty, true);
Model.Model.BasicAck(ea.DeliveryTag, false);
}); });
} }
else else
...@@ -138,6 +144,8 @@ namespace Pole.EventBus.RabbitMQ ...@@ -138,6 +144,8 @@ namespace Pole.EventBus.RabbitMQ
Model.Model.ExchangeDeclare(errorExchangeName, "direct", true); Model.Model.ExchangeDeclare(errorExchangeName, "direct", true);
Model.Model.QueueDeclare(errorQueueName, true, false, false, null); Model.Model.QueueDeclare(errorQueueName, true, false, false, null);
Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty); Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty);
using var channel = Client.PullChannel();
channel.Publish(ea.Body, ea.BasicProperties.Headers, errorExchangeName, string.Empty, true);
if (!Consumer.Config.AutoAck) if (!Consumer.Config.AutoAck)
{ {
Model.Model.BasicAck(ea.DeliveryTag, false); Model.Model.BasicAck(ea.DeliveryTag, false);
......
...@@ -12,17 +12,20 @@ using System.Threading.Tasks; ...@@ -12,17 +12,20 @@ using System.Threading.Tasks;
using Pole.Core.EventBus.Event; using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler; using Pole.Core.EventBus.EventHandler;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Linq;
using Pole.Core.Abstraction;
namespace Pole.EventBus.RabbitMQ namespace Pole.EventBus.RabbitMQ
{ {
public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer
{ {
private readonly ConcurrentDictionary<Type, RabbitEventBus> eventBusDictionary = new ConcurrentDictionary<Type, RabbitEventBus>(); private readonly ConcurrentDictionary<string, RabbitEventBus> eventBusDictionary = new ConcurrentDictionary<string, RabbitEventBus>();
private readonly List<RabbitEventBus> eventBusList = new List<RabbitEventBus>(); private readonly List<RabbitEventBus> eventBusList = new List<RabbitEventBus>();
readonly IRabbitMQClient rabbitMQClient; readonly IRabbitMQClient rabbitMQClient;
readonly IServiceProvider serviceProvider; readonly IServiceProvider serviceProvider;
private readonly IObserverUnitContainer observerUnitContainer; private readonly IObserverUnitContainer observerUnitContainer;
private readonly RabbitOptions rabbitOptions; private readonly RabbitOptions rabbitOptions;
public bool IsAutoRegisterFinished { get; private set; }
public EventBusContainer( public EventBusContainer(
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
IObserverUnitContainer observerUnitContainer, IObserverUnitContainer observerUnitContainer,
...@@ -42,16 +45,21 @@ namespace Pole.EventBus.RabbitMQ ...@@ -42,16 +45,21 @@ namespace Pole.EventBus.RabbitMQ
AddEventAndEventHandlerInfoList(eventList, evenHandlertList); AddEventAndEventHandlerInfoList(eventList, evenHandlertList);
foreach (var (type, config) in eventList) foreach (var (type, config) in eventList)
{ {
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName; var eventName = config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>(); await eventBus.AddGrainConsumer<string>();
} }
foreach (var (type, config) in evenHandlertList) foreach (var (type, config) in evenHandlertList)
{ {
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName; var eventName = config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>(); if (!eventBusDictionary.TryGetValue(eventName, out RabbitEventBus rabbitEventBus))
{
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
} }
IsAutoRegisterFinished = true;
} }
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true) public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true)
...@@ -60,35 +68,37 @@ namespace Pole.EventBus.RabbitMQ ...@@ -60,35 +68,37 @@ namespace Pole.EventBus.RabbitMQ
} }
public Task Work(RabbitEventBus bus) public Task Work(RabbitEventBus bus)
{ {
if (eventBusDictionary.TryAdd(bus.Event, bus)) if (eventBusDictionary.TryAdd(bus.EventName, bus))
{ {
eventBusList.Add(bus); eventBusList.Add(bus);
using var channel = rabbitMQClient.PullChannel(); using var channel = rabbitMQClient.PullChannel();
channel.Model.ExchangeDeclare(bus.Exchange, "direct", true); channel.Model.ExchangeDeclare($"{rabbitOptions.Prefix}{bus.Exchange}", "direct", true);
return Task.CompletedTask; return Task.CompletedTask;
} }
else else
throw new EventBusRepeatException(bus.Event.FullName); throw new EventBusRepeatException(bus.Event.FullName);
} }
readonly ConcurrentDictionary<Type, IProducer> producerDict = new ConcurrentDictionary<Type, IProducer>(); readonly ConcurrentDictionary<string, IProducer> producerDict = new ConcurrentDictionary<string, IProducer>();
public ValueTask<IProducer> GetProducer(Type type)
public ValueTask<IProducer> GetProducer(string typeName)
{ {
if (eventBusDictionary.TryGetValue(type, out var eventBus)) if (eventBusDictionary.TryGetValue(typeName, out var eventBus))
{ {
return new ValueTask<IProducer>(producerDict.GetOrAdd(type, key => return new ValueTask<IProducer>(producerDict.GetOrAdd(typeName, key =>
{ {
return new RabbitProducer(rabbitMQClient, eventBus); return new RabbitProducer(rabbitMQClient, eventBus, rabbitOptions);
})); }));
} }
else else
{ {
throw new NotImplementedException($"{nameof(IProducer)} of {type.FullName}"); throw new NotImplementedException($"{nameof(IProducer)} of {typeName}");
} }
} }
public ValueTask<IProducer> GetProducer<T>() public ValueTask<IProducer> GetProducer<T>()
{ {
return GetProducer(typeof(T)); return GetProducer(typeof(T).FullName);
} }
public List<IConsumer> GetConsumers() public List<IConsumer> GetConsumers()
{ {
...@@ -102,37 +112,44 @@ namespace Pole.EventBus.RabbitMQ ...@@ -102,37 +112,44 @@ namespace Pole.EventBus.RabbitMQ
#region helpers #region helpers
private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> evenHandlertList) private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> eventHandlertList)
{ {
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>())) foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{ {
foreach (var type in assembly.GetTypes()) foreach (var type in assembly.GetTypes().Where(m => typeof(IEvent).IsAssignableFrom(m) && m.IsClass))
{ {
foreach (var attribute in type.GetCustomAttributes(false)) var attribute = type.GetCustomAttributes(typeof(EventAttribute), false).FirstOrDefault();
if (attribute != null)
{ {
if (attribute is EventAttribute config) eventList.Add((type, (EventAttribute)attribute));
{ }
eventList.Add((type, config)); else
break; {
} eventList.Add((type, new EventAttribute() { EventName = type.FullName }));
} }
} }
} }
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>())) foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{ {
foreach (var type in assembly.GetTypes())
foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract&&!typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{ {
foreach (var attribute in type.GetCustomAttributes(false)) var attribute = type.GetCustomAttributes(typeof(EventHandlerAttribute), false).FirstOrDefault();
if (attribute != null)
{ {
if (attribute is EventHandlerAttribute config) eventHandlertList.Add((type, (EventHandlerAttribute)attribute));
{ }
evenHandlertList.Add((type, config)); else
break; {
} throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler");
} }
} }
} }
} }
#endregion #endregion
} }
} }
...@@ -5,6 +5,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -5,6 +5,7 @@ namespace Pole.EventBus.RabbitMQ
{ {
public interface IRabbitEventBusContainer : IConsumerContainer public interface IRabbitEventBusContainer : IConsumerContainer
{ {
bool IsAutoRegisterFinished { get; }
Task AutoRegister(); Task AutoRegister();
RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false); RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
Task Work(RabbitEventBus bus); Task Work(RabbitEventBus bus);
......
using Pole.Core.EventBus; using Pole.Core.EventBus;
using Pole.Core.EventBus.EventHandler;
using Pole.Core.Exceptions; using Pole.Core.Exceptions;
using Pole.Core.Utils; using Pole.Core.Utils;
using System; using System;
...@@ -61,12 +62,13 @@ namespace Pole.EventBus.RabbitMQ ...@@ -61,12 +62,13 @@ namespace Pole.EventBus.RabbitMQ
var observerUnits = observerUnitContainer.GetUnits<PrimaryKey>(EventName); var observerUnits = observerUnitContainer.GetUnits<PrimaryKey>(EventName);
foreach (var observerUnit in observerUnits) foreach (var observerUnit in observerUnits)
{ {
string queueNameSuffix = observerUnit.EventHandlerType.FullName;
var consumer = new RabbitConsumer( var consumer = new RabbitConsumer(
observerUnit.GetEventHandler(), observerUnit.GetEventHandler(),
observerUnit.GetBatchEventHandler()) observerUnit.GetBatchEventHandler())
{ {
EventBus = this, EventBus = this,
QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{observerUnit}" }, QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{queueNameSuffix}" },
Config = ConsumerConfig Config = ConsumerConfig
}; };
Consumers.Add(consumer); Consumers.Add(consumer);
......
using System; using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Pole.Core; using Pole.Core;
using Pole.Core.EventBus; using Pole.Core.EventBus;
using Pole.EventBus.RabbitMQ; using Pole.EventBus.RabbitMQ;
...@@ -9,6 +12,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -9,6 +12,7 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
public static class PoleRabbitmqStartupConfigExtensions public static class PoleRabbitmqStartupConfigExtensions
{ {
private static ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
public static void AddRabbitMQ( public static void AddRabbitMQ(
this StartupConfig startupOption, this StartupConfig startupOption,
Action<RabbitOptions> rabbitConfigAction, Action<RabbitOptions> rabbitConfigAction,
...@@ -16,16 +20,32 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -16,16 +20,32 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
startupOption.Services.Configure<RabbitOptions>(config => rabbitConfigAction(config)); startupOption.Services.Configure<RabbitOptions>(config => rabbitConfigAction(config));
startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>(); startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>();
startupOption.Services.AddHostedService<ConsumerManager>(); //startupOption.Services.AddHostedService<ConsumerManager>();
startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>(); startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>();
startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerContainer); startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerContainer);
Startup.Register(async serviceProvider => Startup.Register(async serviceProvider =>
{ {
var container = serviceProvider.GetService<IRabbitEventBusContainer>(); var container = serviceProvider.GetService<IRabbitEventBusContainer>();
var client = serviceProvider.GetService<IRabbitMQClient>();
var rabbitOptions = serviceProvider.GetService<IOptions<RabbitOptions>>().Value;
if (eventBusConfig != default) if (eventBusConfig != default)
await eventBusConfig(container); await eventBusConfig(container);
else else
await container.AutoRegister(); await container.AutoRegister();
var consumers = container.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
var queue = value.QueueInfo;
var key = queue.Queue;
var runner = new ConsumerRunner(client, serviceProvider, value, queue, rabbitOptions);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
}); });
} }
} }
......
using Pole.Core; using Pole.Core;
using Pole.Core.EventBus; using Pole.Core.EventBus;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Pole.EventBus.RabbitMQ namespace Pole.EventBus.RabbitMQ
...@@ -8,17 +9,20 @@ namespace Pole.EventBus.RabbitMQ ...@@ -8,17 +9,20 @@ namespace Pole.EventBus.RabbitMQ
{ {
readonly RabbitEventBus publisher; readonly RabbitEventBus publisher;
readonly IRabbitMQClient rabbitMQClient; readonly IRabbitMQClient rabbitMQClient;
readonly RabbitOptions rabbitOptions;
public RabbitProducer( public RabbitProducer(
IRabbitMQClient rabbitMQClient, IRabbitMQClient rabbitMQClient,
RabbitEventBus publisher) RabbitEventBus publisher,
RabbitOptions rabbitOptions)
{ {
this.publisher = publisher; this.publisher = publisher;
this.rabbitMQClient = rabbitMQClient; this.rabbitMQClient = rabbitMQClient;
this.rabbitOptions = rabbitOptions;
} }
public ValueTask Publish(byte[] bytes) public ValueTask Publish(byte[] bytes)
{ {
using var channel = rabbitMQClient.PullChannel(); using var channel = rabbitMQClient.PullChannel();
channel.Publish(bytes, publisher.Exchange, string.Empty, publisher.Persistent); channel.Publish(bytes, $"{rabbitOptions.Prefix}{publisher.Exchange}", string.Empty, publisher.Persistent);
return Consts.ValueTaskDone; return Consts.ValueTaskDone;
} }
} }
......
...@@ -31,11 +31,11 @@ namespace Pole.EventStorage.PostgreSql ...@@ -31,11 +31,11 @@ namespace Pole.EventStorage.PostgreSql
public async Task BulkChangePublishStateAsync(IEnumerable<EventEntity> events) public async Task BulkChangePublishStateAsync(IEnumerable<EventEntity> events)
{ {
var sql = var sql =
$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"= any @Ids"; $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\" IN (@Ids)";
using var connection = new NpgsqlConnection(options.ConnectionString); using var connection = new NpgsqlConnection(options.ConnectionString);
await connection.ExecuteAsync(sql, events.Select(@event=> new await connection.ExecuteAsync(sql, events.Select(@event=> new
{ {
Ids = events.Select(@event=>@event.Id).ToArray(), Ids =string.Join(',',events.Select(@event=>@event.Id).ToArray()),
@event.Retries, @event.Retries,
@event.ExpiresAt, @event.ExpiresAt,
@event.StatusName @event.StatusName
...@@ -79,6 +79,8 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN ...@@ -79,6 +79,8 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
result.Add(new EventEntity result.Add(new EventEntity
{ {
Id = reader.GetString(0), Id = reader.GetString(0),
Name=reader.GetString(2),
Content=reader.GetString(3),
Retries = reader.GetInt32(4), Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5) Added = reader.GetDateTime(5)
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment