diff --git a/samples/apis/Backet.Api/Backet.Api.csproj b/samples/apis/Backet.Api/Backet.Api.csproj
index 7b52d42..27d5320 100644
--- a/samples/apis/Backet.Api/Backet.Api.csproj
+++ b/samples/apis/Backet.Api/Backet.Api.csproj
@@ -17,14 +17,18 @@
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
+
+
diff --git a/samples/apis/Backet.Api/Controllers/BacketController.cs b/samples/apis/Backet.Api/Controllers/BacketController.cs
index 650312c..9f5a74e 100644
--- a/samples/apis/Backet.Api/Controllers/BacketController.cs
+++ b/samples/apis/Backet.Api/Controllers/BacketController.cs
@@ -1,11 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Linq.Expressions;
using System.Threading.Tasks;
+using Backet.Api.EventHandlers.Abstraction;
using Backet.Api.Grains.Abstraction;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Orleans;
+using Pole.Core.EventBus;
+using Pole.Core.EventBus.EventHandler;
namespace Backet.Api.Controllers
{
@@ -21,10 +25,20 @@ namespace Backet.Api.Controllers
[HttpPost("api/backet/AddBacket")]
public Task AddBacket([FromBody]Backet.Api.Grains.Abstraction.BacketDto backet)
{
- var newId = "da8a489fa7b44092eeeeeee";
+ var newId = Guid.NewGuid().ToString("N").ToLower();
backet.Id = newId;
var grain = clusterClient.GetGrain(newId);
return grain.AddBacket(backet);
+ //var clientType = typeof(IClusterClient);
+ //var clientParams = Expression.Parameter(clientType, "client");
+ //var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
+ //var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
+ //var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
+ //var body = Expression.Call(method.MakeGenericMethod(typeof(IToNoticeBacketCreatedEventHandler)), clientParams, primaryKeyParams, grainClassNamePrefixParams);
+ //var func = Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
+ //var handler = func(clusterClient, newId, null);
+ //await handler.Invoke(null);
+ //return true;
}
[HttpPost("api/backet/UpdateBacket")]
public Task UpdateBacket()
@@ -38,13 +52,13 @@ namespace Backet.Api.Controllers
{
var id = "da8a489fa7b4409294ee1b358fbbfba5";
var grain = clusterClient.GetGrain(id);
- return grain.AddBacketItem("55","测试3",1000);
+ return grain.AddBacketItem("55", "测试3", 1000);
}
[HttpPost("api/backet/RemoveFirstItem")]
public Task RemoveFirstItem()
{
var id = "da8a489fa7b4409294ee1b358fbbfba5";
- var grain = clusterClient.GetGrain(id);
+ var grain = clusterClient.GetGrain(id);
return grain.RemoveFirstItem();
}
}
diff --git a/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs
new file mode 100644
index 0000000..23f75ec
--- /dev/null
+++ b/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs
@@ -0,0 +1,13 @@
+using Backet.Api.Domain.Event;
+using Pole.Core.EventBus.EventHandler;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Backet.Api.EventHandlers.Abstraction
+{
+ public interface IToNoticeBacketCreatedEventHandler : IPoleBulkEventsHandler, IPoleEventHandler
+ {
+ }
+}
diff --git a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs
index 8e20550..649044d 100644
--- a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs
+++ b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs
@@ -1,4 +1,5 @@
using Backet.Api.Domain.Event;
+using Backet.Api.EventHandlers.Abstraction;
using Pole.Core.EventBus.EventHandler;
using System;
using System.Collections.Generic;
@@ -7,8 +8,14 @@ using System.Threading.Tasks;
namespace Backet.Api.EventHandlers
{
- public class ToNoticeBacketCreatedEventHandler : PoleEventHandler
+ [EventHandler(EventName = "Backet.Api.Domain.Event.BacketCreatedEvent")]
+ public class ToNoticeBacketCreatedEventHandler : PoleEventHandler, IToNoticeBacketCreatedEventHandler
{
+ public Task BulkEventsHandle(List @event)
+ {
+ return Task.CompletedTask;
+ }
+
public Task EventHandle(BacketCreatedEvent @event)
{
return Task.CompletedTask;
diff --git a/samples/apis/Backet.Api/Program.cs b/samples/apis/Backet.Api/Program.cs
index 2d6843d..a8b0b0b 100644
--- a/samples/apis/Backet.Api/Program.cs
+++ b/samples/apis/Backet.Api/Program.cs
@@ -20,14 +20,14 @@ namespace Backet.Api
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
- .ConfigureWebHostDefaults(webBuilder =>
- {
- webBuilder.UseStartup();
- })
.UseOrleans(siloBuilder =>
{
siloBuilder.UseLocalhostClustering();
siloBuilder.AddEfGrainStorageAsDefault();
- });
+ })
+ .ConfigureWebHostDefaults(webBuilder =>
+ {
+ webBuilder.UseStartup();
+ });
}
}
diff --git a/samples/apis/Backet.Api/appsettings.json b/samples/apis/Backet.Api/appsettings.json
index aefd67c..13020d9 100644
--- a/samples/apis/Backet.Api/appsettings.json
+++ b/samples/apis/Backet.Api/appsettings.json
@@ -12,7 +12,7 @@
},
"ServiceName": "Backet",
"RabbitmqConfig": {
- "HostAddress": "rabbitmq://192.168.0.41/",
+ "HostAddress": "192.168.0.41",
"HostUserName": "test",
"HostPassword": "test"
}
diff --git a/src/Pole.Core/Consts.cs b/src/Pole.Core/Consts.cs
index 94d54df..545e407 100644
--- a/src/Pole.Core/Consts.cs
+++ b/src/Pole.Core/Consts.cs
@@ -10,7 +10,7 @@ namespace Pole.Core
public static ValueTask ValueTaskDone = new ValueTask();
public const string ConsumerRetryTimesStr = "pole-consumer-retry-times";
public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details";
- public const string EventHandlerMethodName = "EventHandle";
+ public const string EventHandlerMethodName = "EventHandler";
public const string BatchEventsHandlerMethodName = "BatchEventsHandler";
}
}
diff --git a/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs b/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs
index 0be1e5d..949a37f 100644
--- a/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs
+++ b/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs
@@ -8,6 +8,5 @@ namespace Pole.Core.EventBus.EventHandler
public class EventHandlerAttribute: Attribute
{
public string EventName { get; set; }
- public string EventHandlerName { get; set; }
}
}
diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs b/src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs
similarity index 59%
rename from src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs
rename to src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs
index a45f448..38b77f4 100644
--- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs
+++ b/src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs
@@ -1,5 +1,4 @@
using Orleans;
-using Orleans.Concurrency;
using Pole.Core.EventBus.Event;
using System;
using System.Collections.Generic;
@@ -8,9 +7,17 @@ using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventHandler
{
- public abstract class PoleEventHandlerBase : Grain
+ public interface IPoleEventHandler : IPoleEventHandler
{
- public abstract Task Invoke(EventBytesTransport transport);
- public abstract Task Invoke(List transports);
+ Task EventHandle(TEvent @event);
+ }
+ public interface IPoleBulkEventsHandler : IPoleEventHandler
+ {
+ Task BulkEventsHandle(List events);
+ }
+ public interface IPoleEventHandler : IGrainWithStringKey
+ {
+ public Task Invoke(EventBytesTransport transport);
+ public Task Invoke(List transports);
}
}
diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs
index bb39137..d59c866 100644
--- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs
+++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs
@@ -12,13 +12,14 @@ using System.Reflection.Emit;
using System.Linq.Expressions;
using System.Linq;
using Pole.Core.Exceptions;
+using Orleans;
namespace Pole.Core.EventBus.EventHandler
{
///
///
///
- public class PoleEventHandler : PoleEventHandlerBase
+ public abstract class PoleEventHandler : Grain
{
private IEventTypeFinder eventTypeFinder;
private ISerializer serializer;
@@ -28,12 +29,11 @@ namespace Pole.Core.EventBus.EventHandler
public PoleEventHandler()
{
grainType = GetType();
- DependencyInjection();
}
public override async Task OnActivateAsync()
{
- await DependencyInjection();
await base.OnActivateAsync();
+ await DependencyInjection();
}
protected virtual Task DependencyInjection()
{
@@ -44,59 +44,52 @@ namespace Pole.Core.EventBus.EventHandler
return Task.CompletedTask;
}
- public override Task Invoke(EventBytesTransport transport)
+ public Task Invoke(EventBytesTransport transport)
{
var eventType = eventTypeFinder.FindType(transport.EventTypeCode);
- var method = typeof(ClusterClientExtensions).GetMethod(Consts.EventHandlerMethodName, new Type[] { eventType });
- if (method == null)
+
+ var eventObj = serializer.Deserialize(transport.EventBytes, eventType);
+ if (this is IPoleEventHandler handler)
{
- throw new EventHandlerTargetMethodNotFoundException(Consts.EventHandlerMethodName, eventType.Name);
+ var result = handler.EventHandle((TEvent)eventObj);
+ logger.LogTrace($"{nameof(PoleEventHandler)} Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObj));
+ return result;
+ }
+ else
+ {
+ throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName);
}
- var data = serializer.Deserialize(transport.EventBytes, eventType);
- var eventHandlerType = this.GetType();
- var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
- var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
- var eventObjectParams = Expression.Parameter(typeof(object), "event");
- var eventParams = Expression.Convert(eventObjectParams, eventType);
-
- var body = Expression.Call(method, eventHandlerParams, eventParams);
- var func = Expression.Lambda>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
- var result = func(this, data);
- logger.LogTrace("Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(data));
- return result;
}
- public override Task Invoke(List transports)
+ public async Task Invoke(List transports)
{
if (transports.Count() != 0)
{
var firstTransport = transports.First();
var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode);
- var method = typeof(ClusterClientExtensions).GetMethod(Consts.BatchEventsHandlerMethodName, new Type[] { eventType });
- if (method == null)
+ var eventObjs = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).Select(@event => (TEvent)@event).ToList();
+ if (this is IPoleBulkEventsHandler batchHandler)
{
- var tasks = transports.Select(transport => Invoke(transport));
- return Task.WhenAll(tasks);
+ await batchHandler.BulkEventsHandle(eventObjs);
+ logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs));
+ return;
+ }
+ else if (this is IPoleEventHandler handler)
+ {
+ var handleTasks = eventObjs.Select(m => handler.EventHandle(m));
+ await Task.WhenAll(handleTasks);
+ logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs));
+ return;
+ }
+ else
+ {
+ throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName);
}
- var datas = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).ToList();
- var eventHandlerType = this.GetType();
- var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
- var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
- var eventObjectParams = Expression.Parameter(typeof(object), "events");
- var eventsType = typeof(List<>).MakeGenericType(eventType);
- var eventsParams = Expression.Convert(eventObjectParams, eventsType);
-
- var body = Expression.Call(method, eventHandlerParams, eventsParams);
- var func = Expression.Lambda>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
- var result = func(this, datas);
- logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(datas));
- return result;
}
else
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed");
- return Task.CompletedTask;
}
}
}
diff --git a/src/Pole.Core/EventBus/IProducerContainer.cs b/src/Pole.Core/EventBus/IProducerContainer.cs
index 9a79460..2006625 100644
--- a/src/Pole.Core/EventBus/IProducerContainer.cs
+++ b/src/Pole.Core/EventBus/IProducerContainer.cs
@@ -6,6 +6,6 @@ namespace Pole.Core.EventBus
public interface IProducerContainer
{
ValueTask GetProducer();
- ValueTask GetProducer(Type type);
+ ValueTask GetProducer(string typeName);
}
}
diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs
index d89eecf..8077914 100644
--- a/src/Pole.Core/EventBus/ObserverUnit.cs
+++ b/src/Pole.Core/EventBus/ObserverUnit.cs
@@ -53,7 +53,7 @@ namespace Pole.Core.EventBus
public void Observer()
{
- if (!typeof(PoleEventHandlerBase).IsAssignableFrom(EventHandlerType))
+ if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType))
throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler");
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
@@ -90,8 +90,8 @@ namespace Pole.Core.EventBus
return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports);
}
}
- static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>();
- private PoleEventHandlerBase GetObserver(Type ObserverType, string primaryKey)
+ static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>();
+ private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey)
{
var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
{
@@ -101,7 +101,7 @@ namespace Pole.Core.EventBus
var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
- return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
+ return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
});
return func(clusterClient, primaryKey, null);
}
diff --git a/src/Pole.Core/EventBus/ObserverUnitContainer.cs b/src/Pole.Core/EventBus/ObserverUnitContainer.cs
index e832ec4..b78771f 100644
--- a/src/Pole.Core/EventBus/ObserverUnitContainer.cs
+++ b/src/Pole.Core/EventBus/ObserverUnitContainer.cs
@@ -19,15 +19,18 @@ namespace Pole.Core.EventBus
var eventHandlerList = new List<(Type, EventHandlerAttribute)>();
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>()))
{
- foreach (var type in assembly.GetTypes())
+ foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{
- foreach (var attribute in type.GetCustomAttributes(false))
+ var attribute = type.GetCustomAttributes(typeof(EventHandlerAttribute), false).FirstOrDefault();
+ var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
+
+ if (attribute != null)
+ {
+ eventHandlerList.Add((eventHandlerInterface, (EventHandlerAttribute)attribute));
+ }
+ else
{
- if (attribute is EventHandlerAttribute eventHandlerAttribute)
- {
- eventHandlerList.Add((type, eventHandlerAttribute));
- break;
- }
+ throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler");
}
}
}
@@ -42,14 +45,7 @@ namespace Pole.Core.EventBus
public List> GetUnits(string observerName)
{
if (unitDict.TryGetValue(observerName, out var units))
- {
- if (units is List> result)
- {
- return result;
- }
- else
- throw new UnmatchObserverUnitException(observerName);
- }
+ return units.Select(m => (IObserverUnit)m).ToList();
else
throw new UnfindObserverUnitException(observerName);
}
diff --git a/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs b/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs
index 2cc81fe..f208ce0 100644
--- a/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs
+++ b/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs
@@ -4,9 +4,9 @@ using System.Text;
namespace Pole.Core.Exceptions
{
- public class EventHandlerTargetMethodNotFoundException: Exception
+ public class EventHandlerImplementedNotRightException: Exception
{
- public EventHandlerTargetMethodNotFoundException(string methodName,string eventTypeName):base($"EventHandler method:{methodName} not found when eventHandler invoke , eventType:{eventTypeName}")
+ public EventHandlerImplementedNotRightException(string methodName,string eventTypeName,string eventHandlerName):base($"EventHandler method:{methodName} errors, when eventHandler: {eventHandlerName} invoke , eventType:{eventTypeName}")
{
}
diff --git a/src/Pole.Core/Exceptions/PoleEventHandlerImplementException.cs b/src/Pole.Core/Exceptions/PoleEventHandlerImplementException.cs
new file mode 100644
index 0000000..dfa8bb6
--- /dev/null
+++ b/src/Pole.Core/Exceptions/PoleEventHandlerImplementException.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.Core.Exceptions
+{
+ public class PoleEventHandlerImplementException : Exception
+ {
+ public PoleEventHandlerImplementException(string message) : base(message)
+ {
+
+ }
+ }
+}
diff --git a/src/Pole.Core/Extensions/PoleApplicationBuilderExtensions.cs b/src/Pole.Core/Extensions/PoleApplicationBuilderExtensions.cs
index 45e5ec4..2c9c371 100644
--- a/src/Pole.Core/Extensions/PoleApplicationBuilderExtensions.cs
+++ b/src/Pole.Core/Extensions/PoleApplicationBuilderExtensions.cs
@@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Builder
{
public static IApplicationBuilder UsePole(this IApplicationBuilder applicationBuilder)
{
- Startup.StartRay(applicationBuilder.ApplicationServices);
+ Startup.StartPole(applicationBuilder.ApplicationServices).GetAwaiter().GetResult();
return applicationBuilder;
}
}
diff --git a/src/Pole.Core/Pole.Core.csproj b/src/Pole.Core/Pole.Core.csproj
index 3a39e45..ddd92fc 100644
--- a/src/Pole.Core/Pole.Core.csproj
+++ b/src/Pole.Core/Pole.Core.csproj
@@ -10,6 +10,7 @@
+
diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs
index c71c71c..9173461 100644
--- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs
+++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs
@@ -18,18 +18,16 @@ namespace Pole.Core.Processor
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private readonly IProducerContainer producerContainer;
- private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ILogger logger;
private readonly ProducerOptions producerOptions;
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger,
- IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions producerOptions)
+ IProducerContainer producerContainer, ISerializer serializer, IOptions producerOptions)
{
this.eventStorage = eventStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected");
this.logger = logger;
this.producerContainer = producerContainer;
- this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected");
}
@@ -62,7 +60,6 @@ namespace Pole.Core.Processor
}
foreach (var pendingMessage in pendingMessages)
{
- var eventType = eventTypeFinder.FindType(pendingMessage.Name);
var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content);
var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
@@ -71,12 +68,15 @@ namespace Pole.Core.Processor
pendingMessage.ExpiresAt = DateTime.UtcNow;
}
pendingMessage.Retries++;
- var producer = await producerContainer.GetProducer(eventType);
+ var producer = await producerContainer.GetProducer(pendingMessage.Name);
await producer.Publish(bytes);
pendingMessage.StatusName = nameof(EventStatus.Published);
pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
}
- await eventStorage.BulkChangePublishStateAsync(pendingMessages);
+ if (pendingMessages.Count() > 0)
+ {
+ await eventStorage.BulkChangePublishStateAsync(pendingMessages);
+ }
}
}
}
diff --git a/src/Pole.Core/Serialization/DefaultJsonSerializer.cs b/src/Pole.Core/Serialization/DefaultJsonSerializer.cs
index c8664fc..02e3807 100644
--- a/src/Pole.Core/Serialization/DefaultJsonSerializer.cs
+++ b/src/Pole.Core/Serialization/DefaultJsonSerializer.cs
@@ -3,13 +3,14 @@ using System.Collections.Generic;
using System.Text;
using System.Text.Encodings.Web;
using System.Text.Json;
+using System.Text.Json.Serialization;
using System.Text.Unicode;
namespace Pole.Core.Serialization
{
public class DefaultJsonSerializer : ISerializer
{
- static readonly JsonSerializerOptions options = new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) };
+ static readonly JsonSerializerOptions options = new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All), MaxDepth = 5, ReferenceHandling = ReferenceHandling.Preserve, WriteIndented =true};
public T Deserialize(string json) where T : class, new()
{
return JsonSerializer.Deserialize(json);
diff --git a/src/Pole.Core/Serialization/EventTypeFinder.cs b/src/Pole.Core/Serialization/EventTypeFinder.cs
index 30abaea..beb82cd 100644
--- a/src/Pole.Core/Serialization/EventTypeFinder.cs
+++ b/src/Pole.Core/Serialization/EventTypeFinder.cs
@@ -6,6 +6,7 @@ using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Linq;
using System.Text;
namespace Pole.Core.Serialization
@@ -21,16 +22,13 @@ namespace Pole.Core.Serialization
var baseEventType = typeof(IEvent);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{
- foreach (var type in assembly.GetTypes())
+ foreach (var type in assembly.GetTypes().Where(m => baseEventType.IsAssignableFrom(m)&&!m.IsAbstract))
{
- if (baseEventType.IsAssignableFrom(type))
- {
- typeDict.TryAdd(type, type.FullName);
+ typeDict.TryAdd(type, type.FullName);
- if (!codeDict.TryAdd(type.FullName, type))
- {
- throw new TypeCodeRepeatedException(type.FullName, type.FullName);
- }
+ if (!codeDict.TryAdd(type.FullName, type))
+ {
+ throw new TypeCodeRepeatedException(type.FullName, type.FullName);
}
}
}
diff --git a/src/Pole.Core/Startup.cs b/src/Pole.Core/Startup.cs
index e77a66a..eec1451 100644
--- a/src/Pole.Core/Startup.cs
+++ b/src/Pole.Core/Startup.cs
@@ -13,7 +13,7 @@ namespace Pole.Core
{
tasks.Add(new StartupTask(sortIndex, method));
}
- internal static Task StartRay(IServiceProvider serviceProvider)
+ internal static Task StartPole(IServiceProvider serviceProvider)
{
tasks = tasks.OrderBy(func => func.SortIndex).ToList();
return Task.WhenAll(tasks.Select(value => value.Func(serviceProvider)));
diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs
index 07a6db2..e50d6c8 100644
--- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs
+++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs
@@ -45,7 +45,7 @@ namespace Pole.Core.UnitOfWork
var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content);
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
- var producer = await producerContainer.GetProducer(eventType);
+ var producer = await producerContainer.GetProducer(@event.Name);
await producer.Publish(bytes);
@event.StatusName = nameof(EventStatus.Published);
@event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
diff --git a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs
index 7e3db14..b1511aa 100644
--- a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs
+++ b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs
@@ -1,4 +1,5 @@
using RabbitMQ.Client;
+using System;
using System.Collections.Generic;
using System.Threading;
diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs
index 129a4af..6539051 100644
--- a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs
+++ b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs
@@ -3,6 +3,7 @@ using Pole.Core;
using Pole.Core.Exceptions;
using RabbitMQ.Client;
using System;
+using System.Collections.Generic;
namespace Pole.EventBus.RabbitMQ
{
@@ -23,11 +24,38 @@ namespace Pole.EventBus.RabbitMQ
var consumeRetryTimesStr = consumeRetryTimes.ToString();
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
+ persistentProperties.Headers = new Dictionary();
persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
+ noPersistentProperties.Headers = new Dictionary();
noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
}
+ public void Publish(byte[] msg, IDictionary headers, string exchange, string routingKey, bool persistent = true)
+ {
+ if (persistent)
+ {
+ persistentProperties.Headers = headers;
+ }
+ else
+ {
+ noPersistentProperties.Headers = headers;
+ }
+ Model.ConfirmSelect();
+ Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
+ if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
+ {
+ if (isTimeout)
+ {
+ throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds);
+ }
+ else
+ {
+ throw new ProducerReceivedNAckException();
+ }
+ }
+
+ }
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.ConfirmSelect();
diff --git a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs
index 1b2be55..ba6375c 100644
--- a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs
+++ b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs
@@ -14,6 +14,7 @@ namespace Pole.EventBus.RabbitMQ
options = config.Value;
connectionFactory = new ConnectionFactory
{
+ Port=options.Port,
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs
index 0a679a8..7a6b707 100644
--- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs
+++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs
@@ -7,7 +7,8 @@ namespace Pole.EventBus.RabbitMQ
{
public string UserName { get; set; }
public string Password { get; set; }
- public string VirtualHost { get; set; }
+ public string VirtualHost { get; set; } = "/";
+ public int Port { get; set; } = 5672;
public int MasChannelsPerConnection { get; set; } = 200;
///
/// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力
diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs
deleted file mode 100644
index ecea2bf..0000000
--- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs
+++ /dev/null
@@ -1,140 +0,0 @@
-using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Logging;
-using Orleans;
-using Pole.Core.Services;
-using System;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Pole.EventBus.RabbitMQ
-{
- public class ConsumerManager : IHostedService, IDisposable
- {
- readonly ILogger logger;
- readonly IRabbitMQClient client;
- readonly IRabbitEventBusContainer rabbitEventBusContainer;
- readonly IServiceProvider provider;
- readonly IGrainFactory grainFactory;
- const int _HoldTime = 20 * 1000;
- const int _MonitTime = 60 * 2 * 1000;
- const int _checkTime = 10 * 1000;
-
- public ConsumerManager(
- ILogger logger,
- IRabbitMQClient client,
- IGrainFactory grainFactory,
- IServiceProvider provider,
- IRabbitEventBusContainer rabbitEventBusContainer)
- {
- this.provider = provider;
- this.client = client;
- this.logger = logger;
- this.rabbitEventBusContainer = rabbitEventBusContainer;
- this.grainFactory = grainFactory;
- }
- private readonly ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary();
- private ConcurrentDictionary Runners { get; } = new ConcurrentDictionary();
- private Timer HeathCheckTimer { get; set; }
- private Timer DistributedMonitorTime { get; set; }
- private Timer DistributedHoldTimer { get; set; }
- const int lockHoldingSeconds = 60;
- int distributedHoldTimerLock = 0;
- int heathCheckTimerLock = 0;
- private async Task Start()
- {
- var consumers = rabbitEventBusContainer.GetConsumers();
- foreach (var consumer in consumers)
- {
- if (consumer is RabbitConsumer value)
- {
- var queue = value.QueueInfo;
- var key = queue.Queue;
-
- var runner = new ConsumerRunner(client, provider, value, queue);
- ConsumerRunners.TryAdd(key, runner);
- await runner.Run();
- }
- }
- }
- private async Task DistributedHold()
- {
- try
- {
- if (logger.IsEnabled(LogLevel.Information))
- logger.LogInformation("EventBus Background Service is holding.");
- if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
- {
- foreach (var lockKV in Runners)
- {
- if (Runners.TryGetValue(lockKV.Key, out var lockId))
- {
- var holdResult = await grainFactory.GetGrain(lockKV.Key).Hold(lockId, lockHoldingSeconds);
- if (!holdResult)
- {
- if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
- {
- runner.Close();
- }
- Runners.TryRemove(lockKV.Key, out var _);
- }
- }
- }
- Interlocked.Exchange(ref distributedHoldTimerLock, 0);
- }
- }
- catch (Exception exception)
- {
- logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
- Interlocked.Exchange(ref distributedHoldTimerLock, 0);
- }
- }
- private async Task HeathCheck()
- {
- try
- {
- if (logger.IsEnabled(LogLevel.Information))
- logger.LogInformation("EventBus Background Service is checking.");
- if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
- {
- await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
- Interlocked.Exchange(ref heathCheckTimerLock, 0);
- }
- }
- catch (Exception exception)
- {
- logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
- Interlocked.Exchange(ref heathCheckTimerLock, 0);
- }
- }
- public Task StartAsync(CancellationToken cancellationToken)
- {
- if (logger.IsEnabled(LogLevel.Information))
- logger.LogInformation("EventBus Background Service is starting.");
- DistributedMonitorTime = new Timer(state => Start().Wait(), null, 1000, _MonitTime);
- DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
- HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
- return Task.CompletedTask;
- }
- public Task StopAsync(CancellationToken cancellationToken)
- {
- if (logger.IsEnabled(LogLevel.Information))
- logger.LogInformation("EventBus Background Service is stopping.");
- Dispose();
- return Task.CompletedTask;
- }
- public void Dispose()
- {
- if (logger.IsEnabled(LogLevel.Information))
- logger.LogInformation("EventBus Background Service is disposing.");
- foreach (var runner in ConsumerRunners.Values)
- {
- runner.Close();
- }
- DistributedMonitorTime?.Dispose();
- DistributedHoldTimer?.Dispose();
- HeathCheckTimer?.Dispose();
- }
- }
-}
diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs
index a757002..854cc5c 100644
--- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs
+++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs
@@ -9,6 +9,8 @@ using System.Linq;
using System.Threading.Tasks;
using Pole.Core;
using Pole.Core.Serialization;
+using Microsoft.Extensions.Options;
+using System.Text;
namespace Pole.EventBus.RabbitMQ
{
@@ -16,11 +18,13 @@ namespace Pole.EventBus.RabbitMQ
{
readonly IMpscChannel mpscChannel;
readonly ISerializer serializer;
+ readonly RabbitOptions rabbitOptions;
public ConsumerRunner(
IRabbitMQClient client,
IServiceProvider provider,
RabbitConsumer consumer,
- QueueInfo queue)
+ QueueInfo queue,
+ RabbitOptions rabbitOptions)
{
Client = client;
Logger = provider.GetService>();
@@ -29,6 +33,9 @@ namespace Pole.EventBus.RabbitMQ
mpscChannel.BindConsumer(BatchExecuter);
Consumer = consumer;
Queue = queue;
+ this.rabbitOptions = rabbitOptions;
+
+
}
public ILogger Logger { get; }
public IRabbitMQClient Client { get; }
@@ -45,28 +52,22 @@ namespace Pole.EventBus.RabbitMQ
if (isFirst)
{
isFirst = false;
- Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true);
+ Model.Model.ExchangeDeclare($"{rabbitOptions.Prefix}{Consumer.EventBus.Exchange}", "direct", true);
Model.Model.ExchangeDeclare(Queue.Queue, "direct", true);
- Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue, string.Empty);
+ Model.Model.ExchangeBind(Queue.Queue, $"{rabbitOptions.Prefix}{Consumer.EventBus.Exchange}", string.Empty);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty);
}
Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false);
BasicConsumer = new EventingBasicConsumer(Model.Model);
- BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea);
+ BasicConsumer.Received += async (ch, ea) =>
+ {
+
+ await mpscChannel.WriteAsync(ea);
+ };
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer);
return Task.CompletedTask;
}
- public Task HeathCheck()
- {
- if (IsUnAvailable)
- {
- Close();
- return Run();
- }
- else
- return Task.CompletedTask;
- }
private async Task BatchExecuter(List list)
{
if (list.Count == 1)
@@ -88,6 +89,7 @@ namespace Pole.EventBus.RabbitMQ
{
await ProcessComsumerErrors(item, exception);
}
+ return;
}
}
if (!Consumer.Config.AutoAck)
@@ -108,6 +110,7 @@ namespace Pole.EventBus.RabbitMQ
if (Consumer.Config.Reenqueue)
{
await ProcessComsumerErrors(ea, exception);
+ return;
}
}
if (!Consumer.Config.AutoAck)
@@ -120,15 +123,18 @@ namespace Pole.EventBus.RabbitMQ
{
if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj))
{
- var retryTimes = Convert.ToInt32(retryTimesObj);
- if (retryTimes <= Consumer.Config.MaxReenqueueTimes)
+ var retryTimesStr = Encoding.UTF8.GetString((byte[])retryTimesObj);
+ var retryTimes = Convert.ToInt32(retryTimesStr);
+ if (retryTimes < Consumer.Config.MaxReenqueueTimes)
{
retryTimes++;
- ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes;
- ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception));
+ ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes.ToString();
+ ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException?.Message + exception.StackTrace ?? exception.Message + exception.StackTrace;
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
{
- Model.Model.BasicReject(ea.DeliveryTag, true);
+ using var channel = Client.PullChannel();
+ channel.Publish(ea.Body, ea.BasicProperties.Headers, Queue.Queue, string.Empty, true);
+ Model.Model.BasicAck(ea.DeliveryTag, false);
});
}
else
@@ -138,6 +144,8 @@ namespace Pole.EventBus.RabbitMQ
Model.Model.ExchangeDeclare(errorExchangeName, "direct", true);
Model.Model.QueueDeclare(errorQueueName, true, false, false, null);
Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty);
+ using var channel = Client.PullChannel();
+ channel.Publish(ea.Body, ea.BasicProperties.Headers, errorExchangeName, string.Empty, true);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs
index 81bfd3b..6b4282a 100644
--- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs
+++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs
@@ -12,17 +12,20 @@ using System.Threading.Tasks;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler;
using Microsoft.Extensions.Options;
+using System.Linq;
+using Pole.Core.Abstraction;
namespace Pole.EventBus.RabbitMQ
{
public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer
{
- private readonly ConcurrentDictionary eventBusDictionary = new ConcurrentDictionary();
+ private readonly ConcurrentDictionary eventBusDictionary = new ConcurrentDictionary();
private readonly List eventBusList = new List();
readonly IRabbitMQClient rabbitMQClient;
readonly IServiceProvider serviceProvider;
private readonly IObserverUnitContainer observerUnitContainer;
private readonly RabbitOptions rabbitOptions;
+ public bool IsAutoRegisterFinished { get; private set; }
public EventBusContainer(
IServiceProvider serviceProvider,
IObserverUnitContainer observerUnitContainer,
@@ -42,16 +45,21 @@ namespace Pole.EventBus.RabbitMQ
AddEventAndEventHandlerInfoList(eventList, evenHandlertList);
foreach (var (type, config) in eventList)
{
- var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName;
+ var eventName = config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer();
}
foreach (var (type, config) in evenHandlertList)
{
- var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName;
- var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
- await eventBus.AddGrainConsumer();
+ var eventName = config.EventName;
+
+ if (!eventBusDictionary.TryGetValue(eventName, out RabbitEventBus rabbitEventBus))
+ {
+ var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
+ await eventBus.AddGrainConsumer();
+ }
}
+ IsAutoRegisterFinished = true;
}
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true)
@@ -60,35 +68,37 @@ namespace Pole.EventBus.RabbitMQ
}
public Task Work(RabbitEventBus bus)
{
- if (eventBusDictionary.TryAdd(bus.Event, bus))
+ if (eventBusDictionary.TryAdd(bus.EventName, bus))
{
eventBusList.Add(bus);
using var channel = rabbitMQClient.PullChannel();
- channel.Model.ExchangeDeclare(bus.Exchange, "direct", true);
+ channel.Model.ExchangeDeclare($"{rabbitOptions.Prefix}{bus.Exchange}", "direct", true);
return Task.CompletedTask;
}
else
throw new EventBusRepeatException(bus.Event.FullName);
}
- readonly ConcurrentDictionary producerDict = new ConcurrentDictionary();
- public ValueTask GetProducer(Type type)
+ readonly ConcurrentDictionary producerDict = new ConcurrentDictionary();
+
+
+ public ValueTask GetProducer(string typeName)
{
- if (eventBusDictionary.TryGetValue(type, out var eventBus))
+ if (eventBusDictionary.TryGetValue(typeName, out var eventBus))
{
- return new ValueTask(producerDict.GetOrAdd(type, key =>
+ return new ValueTask(producerDict.GetOrAdd(typeName, key =>
{
- return new RabbitProducer(rabbitMQClient, eventBus);
+ return new RabbitProducer(rabbitMQClient, eventBus, rabbitOptions);
}));
}
else
{
- throw new NotImplementedException($"{nameof(IProducer)} of {type.FullName}");
+ throw new NotImplementedException($"{nameof(IProducer)} of {typeName}");
}
}
public ValueTask GetProducer()
{
- return GetProducer(typeof(T));
+ return GetProducer(typeof(T).FullName);
}
public List GetConsumers()
{
@@ -102,37 +112,44 @@ namespace Pole.EventBus.RabbitMQ
#region helpers
- private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> evenHandlertList)
+ private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> eventHandlertList)
{
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>()))
{
- foreach (var type in assembly.GetTypes())
+ foreach (var type in assembly.GetTypes().Where(m => typeof(IEvent).IsAssignableFrom(m) && m.IsClass))
{
- foreach (var attribute in type.GetCustomAttributes(false))
+ var attribute = type.GetCustomAttributes(typeof(EventAttribute), false).FirstOrDefault();
+
+ if (attribute != null)
{
- if (attribute is EventAttribute config)
- {
- eventList.Add((type, config));
- break;
- }
+ eventList.Add((type, (EventAttribute)attribute));
+ }
+ else
+ {
+ eventList.Add((type, new EventAttribute() { EventName = type.FullName }));
}
}
}
+
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>()))
{
- foreach (var type in assembly.GetTypes())
+
+ foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract&&!typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{
- foreach (var attribute in type.GetCustomAttributes(false))
+ var attribute = type.GetCustomAttributes(typeof(EventHandlerAttribute), false).FirstOrDefault();
+
+ if (attribute != null)
{
- if (attribute is EventHandlerAttribute config)
- {
- evenHandlertList.Add((type, config));
- break;
- }
+ eventHandlertList.Add((type, (EventHandlerAttribute)attribute));
+ }
+ else
+ {
+ throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler");
}
}
+
}
- }
+ }
#endregion
}
}
diff --git a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs
index d90fce0..879630d 100644
--- a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs
+++ b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs
@@ -5,6 +5,7 @@ namespace Pole.EventBus.RabbitMQ
{
public interface IRabbitEventBusContainer : IConsumerContainer
{
+ bool IsAutoRegisterFinished { get; }
Task AutoRegister();
RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
Task Work(RabbitEventBus bus);
diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs
index 5735443..8a69213 100644
--- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs
+++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs
@@ -1,4 +1,5 @@
using Pole.Core.EventBus;
+using Pole.Core.EventBus.EventHandler;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
@@ -61,12 +62,13 @@ namespace Pole.EventBus.RabbitMQ
var observerUnits = observerUnitContainer.GetUnits(EventName);
foreach (var observerUnit in observerUnits)
{
+ string queueNameSuffix = observerUnit.EventHandlerType.FullName;
var consumer = new RabbitConsumer(
observerUnit.GetEventHandler(),
observerUnit.GetBatchEventHandler())
{
EventBus = this,
- QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{observerUnit}" },
+ QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{queueNameSuffix}" },
Config = ConsumerConfig
};
Consumers.Add(consumer);
diff --git a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs
index 837b8a7..755fd33 100644
--- a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs
+++ b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs
@@ -1,6 +1,9 @@
using System;
+using System.Collections.Concurrent;
+using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
using Pole.Core;
using Pole.Core.EventBus;
using Pole.EventBus.RabbitMQ;
@@ -9,6 +12,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleRabbitmqStartupConfigExtensions
{
+ private static ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary();
public static void AddRabbitMQ(
this StartupConfig startupOption,
Action rabbitConfigAction,
@@ -16,16 +20,32 @@ namespace Microsoft.Extensions.DependencyInjection
{
startupOption.Services.Configure(config => rabbitConfigAction(config));
startupOption.Services.AddSingleton();
- startupOption.Services.AddHostedService();
+ //startupOption.Services.AddHostedService();
startupOption.Services.AddSingleton();
startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService() as IProducerContainer);
Startup.Register(async serviceProvider =>
{
var container = serviceProvider.GetService();
+ var client = serviceProvider.GetService();
+ var rabbitOptions = serviceProvider.GetService>().Value;
if (eventBusConfig != default)
await eventBusConfig(container);
else
await container.AutoRegister();
+
+ var consumers = container.GetConsumers();
+ foreach (var consumer in consumers)
+ {
+ if (consumer is RabbitConsumer value)
+ {
+ var queue = value.QueueInfo;
+ var key = queue.Queue;
+
+ var runner = new ConsumerRunner(client, serviceProvider, value, queue, rabbitOptions);
+ ConsumerRunners.TryAdd(key, runner);
+ await runner.Run();
+ }
+ }
});
}
}
diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs
index 54cbb57..a3f2c06 100644
--- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs
+++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs
@@ -1,5 +1,6 @@
using Pole.Core;
using Pole.Core.EventBus;
+using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.EventBus.RabbitMQ
@@ -8,17 +9,20 @@ namespace Pole.EventBus.RabbitMQ
{
readonly RabbitEventBus publisher;
readonly IRabbitMQClient rabbitMQClient;
+ readonly RabbitOptions rabbitOptions;
public RabbitProducer(
IRabbitMQClient rabbitMQClient,
- RabbitEventBus publisher)
+ RabbitEventBus publisher,
+ RabbitOptions rabbitOptions)
{
this.publisher = publisher;
this.rabbitMQClient = rabbitMQClient;
+ this.rabbitOptions = rabbitOptions;
}
public ValueTask Publish(byte[] bytes)
{
using var channel = rabbitMQClient.PullChannel();
- channel.Publish(bytes, publisher.Exchange, string.Empty, publisher.Persistent);
+ channel.Publish(bytes, $"{rabbitOptions.Prefix}{publisher.Exchange}", string.Empty, publisher.Persistent);
return Consts.ValueTaskDone;
}
}
diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs
index 00222c0..4bd6d32 100644
--- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs
+++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs
@@ -31,11 +31,11 @@ namespace Pole.EventStorage.PostgreSql
public async Task BulkChangePublishStateAsync(IEnumerable events)
{
var sql =
-$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"= any @Ids";
+$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\" IN (@Ids)";
using var connection = new NpgsqlConnection(options.ConnectionString);
await connection.ExecuteAsync(sql, events.Select(@event=> new
{
- Ids = events.Select(@event=>@event.Id).ToArray(),
+ Ids =string.Join(',',events.Select(@event=>@event.Id).ToArray()),
@event.Retries,
@event.ExpiresAt,
@event.StatusName
@@ -79,6 +79,8 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
result.Add(new EventEntity
{
Id = reader.GetString(0),
+ Name=reader.GetString(2),
+ Content=reader.GetString(3),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});