diff --git a/Pole.sln b/Pole.sln index a7de63d..2e9f4b0 100644 --- a/Pole.sln +++ b/Pole.sln @@ -11,26 +11,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Core", "src\Pole.Core\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Application", "src\Pole.Application\Pole.Application.csproj", "{C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Domain", "src\Pole.Domain\Pole.Domain.csproj", "{6F6DBA49-4274-4C62-BBE7-91FCC5B77989}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Domain.EntityframeworkCore", "src\Pole.Domain.EntityframeworkCore\Pole.Domain.EntityframeworkCore.csproj", "{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Grpc", "src\Pole.Grpc\Pole.Grpc.csproj", "{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "apis", "apis", "{475116FC-DEEC-4255-94E4-AE7B8C85038D}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage", "src\Pole.ReliableMessage\Pole.ReliableMessage.csproj", "{699C75AB-4814-4E16-A3F3-9735C4C609FE}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Masstransit", "src\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj", "{051BECA5-5E65-4FCB-9B7F-C9E64809E218}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Storage.Abstraction", "src\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj", "{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Storage.Mongodb", "src\Pole.ReliableMessage.Storage.Mongodb\Pole.ReliableMessage.Storage.Mongodb.csproj", "{793C73C6-93DE-4A56-B979-137914B247F2}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Storage.EntityframeworkCore", "src\Pole.ReliableMessage.Storage.EntityframeworkCore\Pole.ReliableMessage.Storage.EntityframeworkCore.csproj", "{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "webs", "webs", "{452B9D9E-881E-4E0E-A90B-98F2253F20F1}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Order.Api", "samples\apis\Order.Api\Order.Api.csproj", "{098DF771-6DC6-45D4-ABFA-FF84E8F7750B}" @@ -67,38 +53,10 @@ Global {C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}.Debug|Any CPU.Build.0 = Debug|Any CPU {C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}.Release|Any CPU.ActiveCfg = Release|Any CPU {C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}.Release|Any CPU.Build.0 = Release|Any CPU - {6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Debug|Any CPU.Build.0 = Debug|Any CPU - {6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Release|Any CPU.ActiveCfg = Release|Any CPU - {6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Release|Any CPU.Build.0 = Release|Any CPU - {1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Debug|Any CPU.Build.0 = Debug|Any CPU - {1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Release|Any CPU.Build.0 = Release|Any CPU {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.Build.0 = Debug|Any CPU {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.ActiveCfg = Release|Any CPU {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.Build.0 = Release|Any CPU - {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.Build.0 = Debug|Any CPU - {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.ActiveCfg = Release|Any CPU - {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.Build.0 = Release|Any CPU - {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.Build.0 = Debug|Any CPU - {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.ActiveCfg = Release|Any CPU - {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.Build.0 = Release|Any CPU - {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.Build.0 = Debug|Any CPU - {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.ActiveCfg = Release|Any CPU - {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.Build.0 = Release|Any CPU - {793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.Build.0 = Debug|Any CPU - {793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.ActiveCfg = Release|Any CPU - {793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.Build.0 = Release|Any CPU - {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.Build.0 = Debug|Any CPU - {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.ActiveCfg = Release|Any CPU - {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.Build.0 = Release|Any CPU {098DF771-6DC6-45D4-ABFA-FF84E8F7750B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {098DF771-6DC6-45D4-ABFA-FF84E8F7750B}.Debug|Any CPU.Build.0 = Debug|Any CPU {098DF771-6DC6-45D4-ABFA-FF84E8F7750B}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -142,15 +100,8 @@ Global GlobalSection(NestedProjects) = preSolution {CA80F6EF-95A0-4BB7-BA8B-02E167E82865} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C} = {9932C965-8B38-4F70-9E43-86DC56860E2B} - {6F6DBA49-4274-4C62-BBE7-91FCC5B77989} = {9932C965-8B38-4F70-9E43-86DC56860E2B} - {1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {475116FC-DEEC-4255-94E4-AE7B8C85038D} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB} - {699C75AB-4814-4E16-A3F3-9735C4C609FE} = {9932C965-8B38-4F70-9E43-86DC56860E2B} - {051BECA5-5E65-4FCB-9B7F-C9E64809E218} = {9932C965-8B38-4F70-9E43-86DC56860E2B} - {3D92F460-350B-4614-A6F6-C00A2D0FA9E2} = {9932C965-8B38-4F70-9E43-86DC56860E2B} - {793C73C6-93DE-4A56-B979-137914B247F2} = {9932C965-8B38-4F70-9E43-86DC56860E2B} - {805CF4F7-CCDC-4390-A92B-55E0FFA7F659} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {452B9D9E-881E-4E0E-A90B-98F2253F20F1} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB} {098DF771-6DC6-45D4-ABFA-FF84E8F7750B} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} {125B1E4B-B1C1-4F85-9C6A-38815960E654} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} diff --git a/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs b/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs deleted file mode 100644 index 6cb4183..0000000 --- a/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs +++ /dev/null @@ -1,51 +0,0 @@ -using MediatR; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.DependencyInjection; -using Pole.EntityframeworkCore.MediatR; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain.EntityframeworkCore -{ - public class EFCoreRepository : IRepository - where TEntity : Entity, IAggregateRoot - { - protected readonly DbContext _dbContext; - private readonly IMediator _mediator; - public EFCoreRepository(IServiceProvider serviceProvider) - { - var dbContextOptions = serviceProvider.GetRequiredService(); - _dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext; - _mediator = serviceProvider.GetRequiredService(); - } - public void Add(TEntity entity) - { - _dbContext.Set().Add(entity); - } - - public virtual void Delete(TEntity entity) - { - _dbContext.Set().Remove(entity); - } - - public virtual Task Get(string id) - { - return _dbContext.Set().FirstOrDefaultAsync(m => m.Id == id); - } - - public async Task SaveEntitiesAsync(CancellationToken cancellationToken = default) - { - await _mediator.DispatchDomainEventsAsync(_dbContext); - await _dbContext.SaveChangesAsync(cancellationToken); - return true; - } - - public void Update(TEntity entity) - { - throw new NotImplementedException(); - } - } -} diff --git a/src/Pole.Domain.EntityframeworkCore/MediatR/MediatorExtension.cs b/src/Pole.Domain.EntityframeworkCore/MediatR/MediatorExtension.cs deleted file mode 100644 index e6a32b2..0000000 --- a/src/Pole.Domain.EntityframeworkCore/MediatR/MediatorExtension.cs +++ /dev/null @@ -1,36 +0,0 @@ -using MediatR; -using Microsoft.EntityFrameworkCore; -using Pole.Domain; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.EntityframeworkCore.MediatR -{ - public static class MediatorExtension - { - public static async Task DispatchDomainEventsAsync(this IMediator mediator, DbContext ctx) - { - var domainEntities = ctx.ChangeTracker - .Entries() - .Where(x => x.Entity.DomainEvents != null && x.Entity.DomainEvents.Any()); - - var domainEvents = domainEntities - .SelectMany(x => x.Entity.DomainEvents) - .ToList(); - - domainEntities.ToList() - .ForEach(entity => entity.Entity.ClearDomainEvents()); - - var tasks = domainEvents - .Select(async (domainEvent) => - { - await mediator.Publish(domainEvent); - }); - - await Task.WhenAll(tasks); - } - } -} diff --git a/src/Pole.Domain.EntityframeworkCore/MediatR/NoMediator.cs b/src/Pole.Domain.EntityframeworkCore/MediatR/NoMediator.cs deleted file mode 100644 index e475944..0000000 --- a/src/Pole.Domain.EntityframeworkCore/MediatR/NoMediator.cs +++ /dev/null @@ -1,32 +0,0 @@ -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain.EntityframeworkCore.MediatR -{ - public class NoMediator : IMediator - { - public Task Publish(object notification, CancellationToken cancellationToken = default) - { - return Task.CompletedTask; - } - - public Task Publish(TNotification notification, CancellationToken cancellationToken = default) where TNotification : INotification - { - return Task.CompletedTask; - } - - public Task Send(IRequest request, CancellationToken cancellationToken = default) - { - return Task.FromResult(default(TResponse)); - } - - public Task Send(object request, CancellationToken cancellationToken = default) - { - return Task.FromResult(default(object)); - } - } -} diff --git a/src/Pole.Domain.EntityframeworkCore/Pole.Domain.EntityframeworkCore.csproj b/src/Pole.Domain.EntityframeworkCore/Pole.Domain.EntityframeworkCore.csproj deleted file mode 100644 index a5c207c..0000000 --- a/src/Pole.Domain.EntityframeworkCore/Pole.Domain.EntityframeworkCore.csproj +++ /dev/null @@ -1,17 +0,0 @@ - - - - netstandard2.0 - - - - - - - - - - - - - diff --git a/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs b/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs deleted file mode 100644 index d55aa5d..0000000 --- a/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs +++ /dev/null @@ -1,21 +0,0 @@ -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.DependencyInjection; -using Pole.Application; -using Pole.Domain.EntityframeworkCore; -using Pole.Domain.EntityframeworkCore.UnitOfWork; -using Pole.Domain.UnitOfWork; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Microsoft.Extensions.DependencyInjection -{ - public static class PoleApplicationOptionsExtension - { - public static PoleOptions AddPoleEntityFrameworkCoreDomain(this PoleOptions options) - { - options.Services.AddScoped(); - return options; - } - } -} diff --git a/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreTransactionWorker.cs b/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreTransactionWorker.cs deleted file mode 100644 index 1adba60..0000000 --- a/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreTransactionWorker.cs +++ /dev/null @@ -1,50 +0,0 @@ -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Storage; -using Microsoft.Extensions.DependencyInjection; -using Pole.Domain.UnitOfWork; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain.EntityframeworkCore.UnitOfWork -{ - public class EntityFrameworkCoreTransactionWorker : IWorker - { - private readonly IDbContextTransaction _dbContextTransaction; - public EntityFrameworkCoreTransactionWorker(DbContextOptions dbContextOptions, IServiceProvider serviceProvider) - { - var dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext; - _dbContextTransaction = dbContext.Database.BeginTransaction(); - } - - public int Order => 100; - - public WorkerStatus WorkerStatus { get; set; } - - public async Task Commit(CancellationToken cancellationToken = default) - { - await _dbContextTransaction.CommitAsync(cancellationToken); - WorkerStatus = WorkerStatus.Commited; - } - - public void Dispose() - { - // 无需手动dispose - //_dbContextTransaction?.Dispose(); - } - - public Task PreCommit(CancellationToken cancellationToken = default) - { - WorkerStatus = WorkerStatus.PostCommited; - return Task.FromResult(1); - } - - public async Task Rollback(CancellationToken cancellationToken = default) - { - await _dbContextTransaction.RollbackAsync(); - WorkerStatus = WorkerStatus.Rollbacked; - } - } -} diff --git a/src/Pole.Domain/Entity/Entity.cs b/src/Pole.Domain/Entity/Entity.cs deleted file mode 100644 index 0107d5e..0000000 --- a/src/Pole.Domain/Entity/Entity.cs +++ /dev/null @@ -1,75 +0,0 @@ -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Domain -{ - public abstract class Entity - { - string _id; - public virtual string Id - { - get - { - return _id; - } - set - { - _id = value; - } - } - public List DomainEvents { get; private set; } - public bool IsTransient() - { - return string.IsNullOrEmpty(this._id); - } - public override bool Equals(object obj) - { - if (obj == null || !(obj is Entity)) - return false; - - if (Object.ReferenceEquals(this, obj)) - return true; - - if (this.GetType() != obj.GetType()) - return false; - - Entity item = (Entity)obj; - - if (item.IsTransient() || this.IsTransient()) - return false; - else - return item.Id == this.Id; - } - public override int GetHashCode() - { - return this.Id.GetHashCode(); - } - public static bool operator ==(Entity left, Entity right) - { - if (Object.Equals(left, null)) - return (Object.Equals(right, null)) ? true : false; - else - return left.Equals(right); - } - public static bool operator !=(Entity left, Entity right) - { - return !(left == right); - } - public void AddDomainEvent(IDomainEvent eventItem) - { - DomainEvents = DomainEvents ?? new List(); - DomainEvents.Add(eventItem); - } - public void RemoveDomainEvent(IDomainEvent eventItem) - { - if (DomainEvents is null) return; - DomainEvents.Remove(eventItem); - } - public void ClearDomainEvents() - { - DomainEvents?.Clear(); - } - } -} diff --git a/src/Pole.Domain/Entity/Enumeration.cs b/src/Pole.Domain/Entity/Enumeration.cs deleted file mode 100644 index 49f0865..0000000 --- a/src/Pole.Domain/Entity/Enumeration.cs +++ /dev/null @@ -1,94 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; - -namespace Pole.Domain -{ - public abstract class Enumeration : IComparable - { - public string Name { get; private set; } - - public int Id { get; private set; } - - protected Enumeration() - { - } - - protected Enumeration(int id, string name) - { - Id = id; - Name = name; - } - - public override string ToString() - { - return Name; - } - - public static IEnumerable GetAll() where T : Enumeration - { - var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); - - return fields.Select(f => f.GetValue(null)).Cast(); - } - - public override bool Equals(object obj) - { - var otherValue = obj as Enumeration; - - if (otherValue == null) - { - return false; - } - - var typeMatches = GetType().Equals(obj.GetType()); - var valueMatches = Id.Equals(otherValue.Id); - - return typeMatches && valueMatches; - } - - public override int GetHashCode() - { - return Id.GetHashCode(); - } - - public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue) - { - var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id); - return absoluteDifference; - } - - public static T FromValue(int value) where T : Enumeration - { - var matchingItem = Parse(value, "value", item => item.Id == value); - return matchingItem; - } - - public static T FromDisplayName(string displayName) where T : Enumeration - { - var matchingItem = Parse(displayName, "display name", item => item.Name == displayName); - return matchingItem; - } - - private static T Parse(K value, string description, Func predicate) where T : Enumeration - { - var matchingItem = GetAll().FirstOrDefault(predicate); - - if (matchingItem == null) - { - var message = string.Format("'{0}' is not a valid {1} in {2}", value, description, typeof(T)); - - throw new InvalidOperationException(message); - } - - return matchingItem; - } - - public int CompareTo(object other) - { - return Id.CompareTo(((Enumeration)other).Id); - } - } -} diff --git a/src/Pole.Domain/Entity/IAggregateRoot.cs b/src/Pole.Domain/Entity/IAggregateRoot.cs deleted file mode 100644 index 50449a6..0000000 --- a/src/Pole.Domain/Entity/IAggregateRoot.cs +++ /dev/null @@ -1,9 +0,0 @@ -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Domain -{ - public interface IAggregateRoot { } -} diff --git a/src/Pole.Domain/Entity/ISoftDeleteable.cs b/src/Pole.Domain/Entity/ISoftDeleteable.cs deleted file mode 100644 index 249f357..0000000 --- a/src/Pole.Domain/Entity/ISoftDeleteable.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Domain -{ - public interface ISoftDeleteable - { - bool IsDelete { get; set; } - } -} diff --git a/src/Pole.Domain/Entity/ValueObject.cs b/src/Pole.Domain/Entity/ValueObject.cs deleted file mode 100644 index a338fe3..0000000 --- a/src/Pole.Domain/Entity/ValueObject.cs +++ /dev/null @@ -1,66 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; - -namespace Pole.Domain -{ - public abstract class ValueObject - { - protected static bool EqualOperator(ValueObject left, ValueObject right) - { - if (left is null ^ right is null) - { - return false; - } - return left is null || left.Equals(right); - } - - - protected static bool NotEqualOperator(ValueObject left, ValueObject right) - { - return !(EqualOperator(left, right)); - } - - - protected abstract IEnumerable GetAtomicValues(); - - - public override bool Equals(object obj) - { - if (obj == null || obj.GetType() != GetType()) - { - return false; - } - ValueObject other = (ValueObject)obj; - IEnumerator thisValues = GetAtomicValues().GetEnumerator(); - IEnumerator otherValues = other.GetAtomicValues().GetEnumerator(); - while (thisValues.MoveNext() && otherValues.MoveNext()) - { - if (thisValues.Current is null ^ otherValues.Current is null) - { - return false; - } - if (thisValues.Current != null && !thisValues.Current.Equals(otherValues.Current)) - { - return false; - } - } - return !thisValues.MoveNext() && !otherValues.MoveNext(); - } - - - public override int GetHashCode() - { - return GetAtomicValues() - .Select(x => x != null ? x.GetHashCode() : 0) - .Aggregate((x, y) => x ^ y); - } - - public ValueObject GetCopy() - { - return this.MemberwiseClone() as ValueObject; - } - } - -} diff --git a/src/Pole.Domain/IDomainEvent.cs b/src/Pole.Domain/IDomainEvent.cs deleted file mode 100644 index b337c54..0000000 --- a/src/Pole.Domain/IDomainEvent.cs +++ /dev/null @@ -1,12 +0,0 @@ -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Domain -{ - public interface IDomainEvent : INotification - { - - } -} diff --git a/src/Pole.Domain/IDomainEventHandler.cs b/src/Pole.Domain/IDomainEventHandler.cs deleted file mode 100644 index f2ff36c..0000000 --- a/src/Pole.Domain/IDomainEventHandler.cs +++ /dev/null @@ -1,14 +0,0 @@ -using MediatR; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain -{ - public interface IDomainEventHandler : INotificationHandler where TCommand : IDomainEvent - { - - } -} diff --git a/src/Pole.Domain/IRepository.cs b/src/Pole.Domain/IRepository.cs deleted file mode 100644 index 24eceaf..0000000 --- a/src/Pole.Domain/IRepository.cs +++ /dev/null @@ -1,23 +0,0 @@ -using Pole.Core.DependencyInjection; -using Pole.Domain.UnitOfWork; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain -{ - public interface IRepository : IRepository where T : IAggregateRoot - { - void Update(T entity); - void Delete(T entity); - void Add(T entity); - Task Get(string id); - Task SaveEntitiesAsync(CancellationToken cancellationToken = default); - } - public interface IRepository: IScopedDenpendency - { - - } -} diff --git a/src/Pole.Domain/Pole.Domain.csproj b/src/Pole.Domain/Pole.Domain.csproj deleted file mode 100644 index 794f936..0000000 --- a/src/Pole.Domain/Pole.Domain.csproj +++ /dev/null @@ -1,17 +0,0 @@ - - - - netstandard2.0 - - - - - - - - - - - - - diff --git a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs deleted file mode 100644 index 6b3d89f..0000000 --- a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs +++ /dev/null @@ -1,56 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain.UnitOfWork -{ - public class DefaultUnitOfWork : IUnitOfWork - { - private readonly List _workers; - public DefaultUnitOfWork(IServiceProvider serviceProvider) - { - _workers = serviceProvider.GetServices().ToList(); - } - public async Task CompeleteAsync(CancellationToken cancellationToken = default) - { - var preCommitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker => - { - await worker.PreCommit(); - }); - await Task.WhenAll(preCommitTasks); - try - { - var commitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker => - { - await worker.Commit(); - }); - await Task.WhenAll(commitTasks); - } - catch (Exception ex) - { - var rollbackTasks = _workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).Select(async worker => - { - await worker.Rollback(); - }); - await Task.WhenAll(rollbackTasks); - throw ex; - } - } - - public void Dispose() - { - // Workers 都是 scoped 的 每次请求结束后 会自动 dispose 所以这里不需要 调用 Workers 的 dispose - //_workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Dispose()); - } - - public Task Rollback(CancellationToken cancellationToken = default) - { - _workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Rollback()); - return Task.FromResult(1); - } - } -} diff --git a/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs deleted file mode 100644 index 9e0fb19..0000000 --- a/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain.UnitOfWork -{ - public interface IUnitOfWork : IDisposable - { - Task CompeleteAsync(CancellationToken cancellationToken = default); - Task Rollback(CancellationToken cancellationToken = default); - } - -} diff --git a/src/Pole.Domain/UnitOfWork/IWorker.cs b/src/Pole.Domain/UnitOfWork/IWorker.cs deleted file mode 100644 index 8de416f..0000000 --- a/src/Pole.Domain/UnitOfWork/IWorker.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Domain.UnitOfWork -{ - public interface IWorker : IDisposable - { - int Order { get; } - WorkerStatus WorkerStatus { get; } - Task PreCommit(CancellationToken cancellationToken = default); - Task Commit(CancellationToken cancellationToken = default); - Task Rollback(CancellationToken cancellationToken = default); - } -} diff --git a/src/Pole.Domain/UnitOfWork/UnitOfWorkResult.cs b/src/Pole.Domain/UnitOfWork/UnitOfWorkResult.cs deleted file mode 100644 index 7c701da..0000000 --- a/src/Pole.Domain/UnitOfWork/UnitOfWorkResult.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Domain.UnitOfWork -{ - public class UnitOfWorkResult - { - public static UnitOfWorkResult SuccessResult = new UnitOfWorkResult(1, "保存成功"); - public static UnitOfWorkResult FaildResult = new UnitOfWorkResult(1, "保存失败"); - public UnitOfWorkResult(int status, string message) - { - Status = status; - Message = message; - } - - /// - /// 1 Success 2 Faild ... - /// - public int Status { get;private set; } - public string Message { get;private set; } - } -} diff --git a/src/Pole.Domain/UnitOfWork/WorkerStatus.cs b/src/Pole.Domain/UnitOfWork/WorkerStatus.cs deleted file mode 100644 index 8c34e69..0000000 --- a/src/Pole.Domain/UnitOfWork/WorkerStatus.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Domain.UnitOfWork -{ - public enum WorkerStatus - { - Init = 0, - PreCommited = 1, - Commited = 2, - PostCommited = 3, - Rollbacked = 4 - } -} diff --git a/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj b/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj deleted file mode 100644 index 9f5c4f4..0000000 --- a/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj +++ /dev/null @@ -1,7 +0,0 @@ - - - - netstandard2.0 - - - diff --git a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs deleted file mode 100644 index 5f80e76..0000000 --- a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Masstransit.Abstraction -{ - public interface IReliableEvent - { - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs deleted file mode 100644 index eb09889..0000000 --- a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Pole.ReliableMessage.EventBus; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Masstransit.Abstraction -{ - public interface IReliableEventHandlerRegistrarFactory - { - MasstransitEventHandlerRegistrar Create(Type eventHandlerType); - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs deleted file mode 100644 index fbb2ec5..0000000 --- a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs +++ /dev/null @@ -1,21 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using MassTransit; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Masstransit -{ - public class DefaultReliableEventHandlerContext : IReliableEventHandlerContext - where TEvent : class - { - private readonly ConsumeContext _executeContext; - public DefaultReliableEventHandlerContext(ConsumeContext executeContext) - { - _executeContext = executeContext; - this.Event = executeContext.Message; - } - public TEvent Event { get; private set; } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs deleted file mode 100644 index 439e0b1..0000000 --- a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs +++ /dev/null @@ -1,76 +0,0 @@ -using Pole.ReliableMessage.EventBus; -using Pole.ReliableMessage.Masstransit.Abstraction; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using Pole.Domain; - -namespace Pole.ReliableMessage.Masstransit -{ - class DefaultReliableEventHandlerRegistrarFactory : IReliableEventHandlerRegistrarFactory - { - private readonly MasstransitRabbitmqOption _masstransitOptions; - public DefaultReliableEventHandlerRegistrarFactory(IOptions masstransitOptions) - { - _masstransitOptions = masstransitOptions.Value ?? throw new ArgumentNullException(nameof(masstransitOptions)); - } - public MasstransitEventHandlerRegistrar Create(Type eventHnadler) - { - - if (!eventHnadler.Name.EndsWith(_masstransitOptions.EventHandlerNameSuffix)) - { - throw new Exception($"EventHandler Name Must EndWith {_masstransitOptions.EventHandlerNameSuffix}"); - } - var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault(); - - var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix, _masstransitOptions.EventHandlerNameSuffix); - - var parentEventHandler = eventHnadler.BaseType; - var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault(); - - ushort prefetchCount = GetPrefetchCount(eventHnadler, reliableEventHandlerParemeterAttribute); - - MasstransitEventHandlerRegistrar eventHandlerRegisterInvoker = new MasstransitEventHandlerRegistrar(eventHandlerName, eventHnadler, eventType, _masstransitOptions.RetryConfigure, prefetchCount); - return eventHandlerRegisterInvoker; - } - - private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix,string eventHandlerNameSuffix) - { - var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace(eventHandlerNameSuffix, "").ToLowerInvariant()}"; - var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}"; - - if (reliableEventHandlerParemeterAttribute != null) - { - var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType(); - var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.QueueHaType)); - var queueHaTypeValue = Convert.ToInt32(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute)); - if (queueHaTypeValue != 0) - { - var currentQueueType = Enumeration.FromValue(queueHaTypeValue); - eventHandlerName = currentQueueType.GenerateQueueName(eventHandlerName); - } - } - - return eventHandlerName; - } - - private ushort GetPrefetchCount(Type eventHnadler, object reliableEventHandlerParemeterAttribute) - { - var prefetchCount = _masstransitOptions.PrefetchCount; - if (reliableEventHandlerParemeterAttribute != null) - { - var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType(); - var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.PrefetchCount)); - var prefetchCountValue = Convert.ToUInt16(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute)); - if (prefetchCountValue != 0) - { - prefetchCount = prefetchCountValue; - } - } - - return prefetchCount; - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs b/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs deleted file mode 100644 index d819701..0000000 --- a/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs +++ /dev/null @@ -1,33 +0,0 @@ -using MassTransit; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Masstransit -{ - public class MassTransitHostedService : IHostedService - { - readonly IBusControl _bus; - readonly ILogger _logger; - public MassTransitHostedService(IBusControl bus, ILogger logger) - { - _bus = bus; - _logger = logger; - } - public async Task StartAsync(CancellationToken cancellationToken) - { - await _bus.StartAsync(); - _logger.LogInformation("MassTransit Bus Start Successful"); - } - - public async Task StopAsync(CancellationToken cancellationToken) - { - await _bus.StopAsync(); - _logger.LogInformation("MassTransit Bus Stop Successful"); - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs deleted file mode 100644 index c1a8e12..0000000 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Masstransit.Pipe; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Masstransit -{ - class MasstransitBasedMessageBus : IMessageBus - { - public MasstransitBasedMessageBus(MassTransit.IBus bus) - { - _bus = bus; - } - private readonly MassTransit.IBus _bus; - public Task Publish(object @event, string reliableMessageId, CancellationToken cancellationToken = default) - { - var pipe = new AddReliableMessageIdPipe(reliableMessageId); - return _bus.Publish(@event, pipe, cancellationToken); - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs deleted file mode 100644 index a1ea419..0000000 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs +++ /dev/null @@ -1,46 +0,0 @@ -using GreenPipes; -using GreenPipes.Configurators; -using MassTransit; -using MassTransit.ExtensionsDependencyInjectionIntegration; -using MassTransit.RabbitMqTransport; -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.EventBus -{ - public class MasstransitEventHandlerRegistrar - { - private readonly string _queueName; - private readonly Type _eventHandlerType; - private readonly Type _eventHandlerEventType; - private readonly Action _retryConfigure; - public readonly ushort _prefetchCount; - public MasstransitEventHandlerRegistrar(string eventHandlerName, Type eventHandlerType, Type eventHandlerEventType, Action retryConfigure, ushort prefetchCount) - { - _queueName = eventHandlerName; - _eventHandlerType = eventHandlerType; - _eventHandlerEventType = eventHandlerEventType; - _retryConfigure = retryConfigure; - _prefetchCount = prefetchCount; - } - public void RegisterEventHandler(IServiceCollectionConfigurator serviceCollectionConfigurator, IServiceCollection services) - { - serviceCollectionConfigurator.AddConsumer(_eventHandlerType); - } - public void RegisterQueue(IServiceCollectionConfigurator serviceCollectionConfigurator, IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator, IRabbitMqHost rabbitMqHost, IServiceProvider serviceProvider) - { - - //serviceCollectionConfigurator.AddConsumer(_eventHandlerType); - - rabbitMqBusFactoryConfigurator.ReceiveEndpoint(_queueName, conf => - { - //conf.Consumer() - conf.ConfigureConsumer(serviceProvider, _eventHandlerType); - conf.PrefetchCount = _prefetchCount; - conf.UseMessageRetry(_retryConfigure); - }); - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs deleted file mode 100644 index b2f1742..0000000 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs +++ /dev/null @@ -1,59 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.EventBus; -using Pole.ReliableMessage.Masstransit.Abstraction; -using MassTransit; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Masstransit -{ - class MasstransitMessageBusConfigurator : IMessageBusConfigurator - { - private readonly IReliableEventHandlerRegistrarFactory _reliableEventHandlerRegistrarFactory; - private readonly MasstransitRabbitmqOption _options; - public MasstransitMessageBusConfigurator(IReliableEventHandlerRegistrarFactory reliableEventHandlerRegistrarFactory, IOptions options) - { - _reliableEventHandlerRegistrarFactory = reliableEventHandlerRegistrarFactory; - _options = options.Value; - } - public async Task Configure(IServiceCollection services,IEnumerable eventHandlerTypes) - { - await Task.CompletedTask; - var eventHandlerRegistrars = GetEventHandlerRegistrars(eventHandlerTypes).ToList(); - services.AddMassTransit(x => - { - foreach (var eventHandlerRegistrar in eventHandlerRegistrars) - { - eventHandlerRegistrar.RegisterEventHandler(x, services); - } - x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => - { - var host = cfg.Host(new Uri(_options.RabbitMqHostAddress), h => - { - h.Username(_options.RabbitMqHostUserName); - h.Password(_options.RabbitMqHostPassword); - - }); - foreach (var eventHandlerRegistrar in eventHandlerRegistrars) - { - eventHandlerRegistrar.RegisterQueue(x, cfg, host, provider); - } - })); - }); - } - private IEnumerable GetEventHandlerRegistrars(IEnumerable eventHandlerTypes) - { - foreach (var eventHandler in eventHandlerTypes) - { - var model = _reliableEventHandlerRegistrarFactory.Create(eventHandler); - yield return model; - } - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs deleted file mode 100644 index 739c965..0000000 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs +++ /dev/null @@ -1,31 +0,0 @@ -using GreenPipes; -using GreenPipes.Configurators; -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Masstransit -{ - public class MasstransitRabbitmqOption - { - public string RabbitMqHostAddress { get; set; } - public string RabbitMqHostUserName { get; set; } - public string RabbitMqHostPassword { get; set; } - public string QueueNamePrefix { get; set; } = string.Empty; - - public string EventHandlerNameSuffix = "EventHandler"; - /// - /// 2 个并发 - /// - public ushort PrefetchCount { get; set; } = 2; - - public Action RetryConfigure { get; set; } = - r => r.Intervals(TimeSpan.FromSeconds(0.1) - , TimeSpan.FromSeconds(1) - , TimeSpan.FromSeconds(4) - , TimeSpan.FromSeconds(16) - , TimeSpan.FromSeconds(64) - ); - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs deleted file mode 100644 index cb8c236..0000000 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs +++ /dev/null @@ -1,43 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Masstransit.Pipe; -using Pole.ReliableMessage.Messaging; -using MassTransit; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Masstransit -{ - - public abstract class ReliableEventHandler : IReliableEventHandler, IConsumer - where TEvent : class - { - private readonly ILogger> _logger; - public ReliableEventHandler(IServiceProvider serviceProvider) - { - var loggerFactory = serviceProvider.GetRequiredService(); - _logger = loggerFactory.CreateLogger>(); - } - - public abstract Task Handle(IReliableEventHandlerContext context); - public async Task Consume(ConsumeContext context) - { - var messageId = GetReliableMessageId(context); - - _logger.LogDebug($"Message Begin Handle,messageId:{messageId}"); - - await Handle(new DefaultReliableEventHandlerContext(context)); - - _logger.LogDebug($"Message handled successfully ,messageId:{messageId}"); - } - - private string GetReliableMessageId(ConsumeContext context) - { - return context.Headers.Get(AddReliableMessageIdPipe.RELIABLE_MESSAGE_ID, string.Empty); - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs b/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs deleted file mode 100644 index 20b50e1..0000000 --- a/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs +++ /dev/null @@ -1,16 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using MassTransit; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Masstransit.Messaging -{ - class DefaultMessageIdGenerator : IMessageIdGenerator - { - public string Generate() - { - return NewId.Next().ToString("N").ToLowerInvariant(); - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs b/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs deleted file mode 100644 index 594bd58..0000000 --- a/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs +++ /dev/null @@ -1,29 +0,0 @@ -using GreenPipes; -using MassTransit; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Masstransit.Pipe -{ - class AddReliableMessageIdPipe : IPipe - { - public const string RELIABLE_MESSAGE_ID = "ReliableMessageId"; - private readonly string _reliableMessageId; - public AddReliableMessageIdPipe(string reliableMessageId) - { - _reliableMessageId = reliableMessageId; - } - public void Probe(ProbeContext context) - { - - } - - public async Task Send(PublishContext context) - { - context.Headers.Set(RELIABLE_MESSAGE_ID, _reliableMessageId); - await Task.CompletedTask; - } - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj b/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj deleted file mode 100644 index e4c922f..0000000 --- a/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj +++ /dev/null @@ -1,17 +0,0 @@ - - - - netstandard2.0 - - - - - - - - - - - - - diff --git a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs deleted file mode 100644 index 6fc0f3e..0000000 --- a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs +++ /dev/null @@ -1,32 +0,0 @@ -using Pole.Domain; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Masstransit -{ - public class QueueHaType : Enumeration - { - public static QueueHaType None = new QueueHaType(1, "无",string.Empty); - public static QueueHaType Default = new QueueHaType(2, "默认高可用","Rmd."); - public static QueueHaType Backlog = new QueueHaType(3, "消息可积压","Rmb."); - private readonly string _queuePrefix; - public QueueHaType(int id, string name) : base(id, name) - { - } - public QueueHaType(int id, string name,string prefix) : this(id, name) - { - _queuePrefix = prefix; - } - public string GenerateQueueName(string rawQueueName) - { - return string.Concat(_queuePrefix, rawQueueName); - } - } - public enum QueueHaTypeEnum:int - { - None = 1, - Default = 2, - Backlog = 3 - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs b/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs deleted file mode 100644 index e1e6d8e..0000000 --- a/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs +++ /dev/null @@ -1,15 +0,0 @@ -using GreenPipes.Configurators; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Masstransit -{ - [AttributeUsage(AttributeTargets.Class)] - public class ReliableEventHandlerParemeterAttribute : Attribute - { - public ushort PrefetchCount { get; set; } - public QueueHaTypeEnum QueueHaType { get; set; } = QueueHaTypeEnum.Default; - - } -} diff --git a/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs deleted file mode 100644 index 5b05c2e..0000000 --- a/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs +++ /dev/null @@ -1,38 +0,0 @@ -using Pole.ReliableMessage; -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Masstransit; -using Pole.ReliableMessage.Masstransit.Abstraction; -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; -using Pole.ReliableMessage.Masstransit.Messaging; - -namespace Microsoft.Extensions.DependencyInjection -{ - public static class ReliableMessageOptionExtension - { - public static ReliableMessageOption AddMasstransitRabbitmq(this ReliableMessageOption option, Action optionConfig) - { - option.ReliableMessageOptionExtensions.Add(new MasstransitRabbitmqExtension(optionConfig)); - return option; - } - } - public class MasstransitRabbitmqExtension : IReliableMessageOptionExtension - { - private readonly Action _masstransitRabbitmqOption; - public MasstransitRabbitmqExtension(Action masstransitRabbitmqOption) - { - _masstransitRabbitmqOption = masstransitRabbitmqOption; - } - public void AddServices(IServiceCollection services) - { - services.Configure(_masstransitRabbitmqOption); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddHostedService(); - } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs deleted file mode 100644 index 46ef040..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Storage.Abstraction -{ - public interface IMemberShipTableManager - { - Task IsPendingMessageCheckerServiceInstance(string ipAddress); - Task UpdateIAmAlive(string ipAddress, DateTime dateTime); - /// - /// 如果当前 超时时间内 没有可用 实例 返回 空 - /// - /// - /// - Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime); - - Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime); - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs deleted file mode 100644 index 10c4e70..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs +++ /dev/null @@ -1,54 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Storage.Abstraction -{ - public interface IMessageStorage - { - /// - /// - /// - /// - /// - Task Add(Message message); - - Task Delete(Expression> filter); - - /// - /// - /// - /// - /// - /// - Task> GetMany(Expression> filter, int count); - - /// - /// - /// - /// - /// - /// - Task GetOne(Expression> filter); - - /// - /// 批量更新 - /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime - /// - /// - /// - Task Save(IEnumerable messages); - - /// - /// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false - /// - /// - /// - /// - Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus); - - Task UpdateStatus(Expression> filter, MessageStatus messageStatus); - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs deleted file mode 100644 index ba28c92..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Storage.Abstraction -{ - public class MemberShipTable - { - public string Id { get;private set; } - public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime) - { - ServiceName = serviceName; - PendingMessageCheckerIp = pendingMessageCheckerIp; - IAmAliveUTCTime = iAmAliveUTCTime; - } - - public string ServiceName { get;private set; } - public string PendingMessageCheckerIp { get; private set; } - public DateTime IAmAliveUTCTime { get; private set; } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs b/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs deleted file mode 100644 index da785a8..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Storage.Abstraction -{ - public class Message : IComparable - { - /// - /// 这里id 永远为 string - /// - public string Id { get; set; } - - /// - /// 消息状态 - /// - public MessageStatus MessageStatus { get; set; } - - /// - /// 消息状态Id - /// - - public int MessageStatusId { get; set; } - - /// - /// 预发送的时间 - /// - public DateTime AddedUTCTime { get; set; } - - /// - /// 用来存放 消息内容 目前没有大小限制 这个需要根据 实际情况 , mongodb 和 rabiitmq 的 综合指标来定 ,开发人员 在使用超大对象时需谨慎 - /// - public string Content { get; set; } - - /// - /// 消息的名称 用来鉴别不同的消息 - /// - public string MessageTypeId { get; set; } - - /// - /// 当前消息 回调者所需参数值 - /// - public string RePushCallBackParameterValue { get; set; } - - ///// - ///// 最后一次的重试时间 - ///// - //public DateTime LastRetryUTCTime { get; set; } - - - /// - /// 下一次的重试时间 - /// - public DateTime NextRetryUTCTime { get; set; } - - /// - /// 重试次数 - /// - public int RetryTimes { get; set; } = 0; - - public int CompareTo(Message other) - { - return Id.CompareTo(other.Id); - } - } - public class MessageIEqualityComparer : IEqualityComparer - { - public static MessageIEqualityComparer Default = new MessageIEqualityComparer(); - public bool Equals(Message x, Message y) - { - return x.CompareTo(y) == 0; - } - - public int GetHashCode(Message obj) - { - return obj.Id.GetHashCode(); - } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs deleted file mode 100644 index 049dd6b..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.Domain; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Storage.Abstraction -{ - public class MessageStatus : Enumeration - { - public static MessageStatus Pending = new MessageStatus(3,"待发送"); - public static MessageStatus Pushed = new MessageStatus(6,"已发送"); - public static MessageStatus Canced = new MessageStatus(9,"已取消"); - //public static MessageStatus Handed = new MessageStatus(12, "已处理"); - - public MessageStatus(int id,string name ):base(id,name) - { - - } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj deleted file mode 100644 index c48c146..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj +++ /dev/null @@ -1,11 +0,0 @@ - - - - netstandard2.0 - - - - - - - diff --git a/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj b/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj deleted file mode 100644 index 9ef9d54..0000000 --- a/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj +++ /dev/null @@ -1,11 +0,0 @@ - - - - netstandard2.0 - - - - - - - diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs deleted file mode 100644 index 33f4d32..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Storage.Mongodb -{ - public sealed class MongoHost - { - /// - /// 主机或者IP地址 - /// - public string Host { get; set; } - - /// - /// 端口号 - /// - public int Port { get; set; } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs deleted file mode 100644 index 6350b55..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs +++ /dev/null @@ -1,94 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using MongoDB.Driver; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Storage.Mongodb -{ - class MongodbMemberShipTableManager : IMemberShipTableManager - { - private readonly MongoClient _mongoClient; - private readonly MongodbOption _mongodbOption; - private readonly ILogger _logger; - public MongodbMemberShipTableManager(IConfiguration configuration, MongoClient mongoClient, IOptions mongodbOption, ILogger logger) - { - _mongoClient = mongoClient; - _mongodbOption = mongodbOption.Value; - _logger = logger; - } - private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase) - { - return _mongoClient.GetDatabase(activeMessageDatabase); - } - private IMongoCollection GetCollection() - { - var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName); - var messageCollectionName = _mongodbOption.MembershipCollectionName; - var collection = database.GetCollection(messageCollectionName); - return collection; - } - public async Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime) - { - var collection = GetCollection(); - var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName); - MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime); - await collection.InsertOneAsync(memberShipTable); - return true; - } - - public async Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime) - { - var collection = GetCollection(); - - var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList(); - if (instances.Count > 1) - { - _logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances"); - var currentInstance = instances.FirstOrDefault(); - var extraInstances = instances.Remove(currentInstance); - instances.ForEach(async n => - { - await collection.DeleteOneAsync(m => m.Id == n.Id); - }); - _logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully"); - return currentInstance.PendingMessageCheckerIp; - } - else if (instances.Count == 1) - { - return instances.FirstOrDefault().PendingMessageCheckerIp; - } - else - { - return null; - } - } - - public async Task IsPendingMessageCheckerServiceInstance(string ipAddress) - { - var collection = GetCollection(); - - var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault(); - if (instances != null) - { - return true; - } - return false; - } - - public async Task UpdateIAmAlive(string ipAddress,DateTime dateTime) - { - var collection = GetCollection(); - var filter = Builders.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress); - var update = Builders.Update.Set(m=>m.IAmAliveUTCTime,dateTime); - var result = await collection.UpdateOneAsync(filter, update); - return true; - } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs deleted file mode 100644 index 3990f98..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Storage.Mongodb -{ - public class MongodbOption - { - public string MessageDatabaseName { get; set; } = "ReliableMessage"; - public string MembershipCollectionName { get; set; } = "Membership"; - public string ServiceCollectionName { get; set; } - public MongoHost[] Servers { get; set; } - } - -} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs deleted file mode 100644 index eb40f33..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs +++ /dev/null @@ -1,129 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using MongoDB.Driver; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; -using Pole.Domain; - -namespace Pole.ReliableMessage.Storage.Mongodb -{ - class MongodbMessageStorage : IMessageStorage - { - private readonly MongoClient _mongoClient; - private readonly MongodbOption _mongodbOption; - private readonly ILogger _logger; - public MongodbMessageStorage(MongoClient mongoClient, IOptions mongodbOption, ILogger logger) - { - _mongoClient = mongoClient; - _mongodbOption = mongodbOption.Value; - _logger = logger; - } - private IMongoDatabase GetActiveMessageDatabase(string messageDatabase) - { - return _mongoClient.GetDatabase(messageDatabase); - } - private IMongoCollection GetCollection() - { - var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName); - var messageCollectionName = _mongodbOption.ServiceCollectionName; - var collection = database.GetCollection(messageCollectionName); - return collection; - } - public async Task Add(Message message) - { - IMongoCollection collection = GetCollection(); - - await collection.InsertOneAsync(message); - return true; - } - - public async Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus) - { - IMongoCollection collection = GetCollection(); - - var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); - var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions() { ReturnDocument = ReturnDocument.Before }); - if (beforeDoc == null) - { - throw new Exception("IMessageStorage.CheckAndUpdateStatus Error ,Message not found in Storage"); - } - if (beforeDoc.MessageStatusId == messageStatus.Id) - { - return false; - } - return true; - } - - public async Task> GetMany(Expression> filter, int count) - { - IMongoCollection collection = GetCollection(); - - var list = await collection.Find(filter).Limit(count).ToListAsync(); - list.ForEach(m => - { - m.MessageStatus = Enumeration.FromValue(m.MessageStatusId); - }); - return list; - } - - public async Task Save(IEnumerable messages) - { - var count = messages.Count(); - _logger.LogDebug($"MongodbMessageStorage Save begin, Messages count: {messages.Count()}"); - if (count == 0) - { - _logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: 0"); - return true; - } - IMongoCollection collection = GetCollection(); - - var models = new List>(); - foreach (var message in messages) - { - FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id); - UpdateDefinition update = Builders.Update - .Set(m => m.MessageStatusId, message.MessageStatus.Id) - .Set(m => m.RetryTimes, message.RetryTimes) - .Set(m => m.NextRetryUTCTime, message.NextRetryUTCTime); - - var model = new UpdateOneModel(filter, update); - models.Add(model); - } - var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false }); - - _logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: {result.ModifiedCount}"); - - return result.IsAcknowledged; - } - - public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus) - { - IMongoCollection collection = GetCollection(); - - var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); - var result = await collection.UpdateOneAsync(filter, update); - return result.IsAcknowledged; - } - - public async Task Delete(Expression> filter) - { - IMongoCollection collection = GetCollection(); - - var result = await collection.DeleteManyAsync(filter); - return result.DeletedCount; - } - - public Task GetOne(Expression> filter) - { - throw new NotImplementedException(); - } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj b/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj deleted file mode 100644 index 8ded00e..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj +++ /dev/null @@ -1,19 +0,0 @@ - - - - netstandard2.0 - - - - - - - - - - - - - - - diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs deleted file mode 100644 index e33c8c7..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs +++ /dev/null @@ -1,121 +0,0 @@ -using Pole.ReliableMessage; -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; -using Pole.ReliableMessage.Storage.Mongodb; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using MongoDB.Bson; -using MongoDB.Bson.Serialization; -using MongoDB.Bson.Serialization.IdGenerators; -using MongoDB.Driver; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Microsoft.Extensions.DependencyInjection -{ - public static class ReliableMessageOptionExtension - { - public static ReliableMessageOption AddMongodb(this ReliableMessageOption option, Action mongodbOptionConfig) - { - option.ReliableMessageOptionExtensions.Add(new MongodbStorageExtension(mongodbOptionConfig)); - return option; - } - } - public class MongodbStorageExtension : IReliableMessageOptionExtension - { - private readonly Action _mongodbOption; - public MongodbStorageExtension(Action masstransitRabbitmqOption) - { - _mongodbOption = masstransitRabbitmqOption; - } - public void AddServices(IServiceCollection services) - { - services.Configure(_mongodbOption); - services.AddSingleton(); - services.AddSingleton(); - - var mongodbOption = services.BuildServiceProvider().GetRequiredService>().Value; - - var servers = mongodbOption.Servers.Select(x => new MongoServerAddress(x.Host, x.Port)).ToList(); - var settings = new MongoClientSettings() - { - Servers = servers - }; - var client = new MongoClient(settings); - var database = client.GetDatabase(mongodbOption.MessageDatabaseName); - - AddMapper(); - - InitCollection(mongodbOption, database); - - services.AddSingleton(client); - } - - private static void InitCollection(MongodbOption mongodbOption, IMongoDatabase database) - { - var collectionNames = database.ListCollectionNames().ToList(); - - if (!collectionNames.Contains(mongodbOption.ServiceCollectionName)) - { - database.CreateCollection(mongodbOption.ServiceCollectionName); - var messageCollection = database.GetCollection(mongodbOption.ServiceCollectionName); - AddMessageCollectionIndex(messageCollection); - } - - if (!collectionNames.Contains(mongodbOption.MembershipCollectionName)) - { - database.CreateCollection(mongodbOption.MembershipCollectionName); - var membershipCollection = database.GetCollection(mongodbOption.MembershipCollectionName); - AddMemberShipTableCollectionIndex(membershipCollection); - } - } - - private static void AddMessageCollectionIndex(IMongoCollection collection) - { - List> createIndexModels = new List>(); - - //var nextRetryUTCTimeIndex = Builders.IndexKeys.Ascending(m => m.NextRetryUTCTime); - //CreateIndexModel nextRetryUTCTimeIndexModel = new CreateIndexModel(nextRetryUTCTimeIndex, new CreateIndexOptions() { Background = true }); - //createIndexModels.Add(nextRetryUTCTimeIndexModel); - - var AddedUTCTimeUTCTimeIndex = Builders.IndexKeys.Ascending(m => m.AddedUTCTime); - CreateIndexModel AddedUTCTimeIndexModel = new CreateIndexModel(AddedUTCTimeUTCTimeIndex, new CreateIndexOptions() { Background = true }); - createIndexModels.Add(AddedUTCTimeIndexModel); - - //var messageTypeIdIndex = Builders.IndexKeys.Ascending(m => m.MessageTypeId); - //CreateIndexModel messageTypeIdIndexModel = new CreateIndexModel(messageTypeIdIndex, new CreateIndexOptions() { Background = true }); - //createIndexModels.Add(messageTypeIdIndexModel); - - collection.Indexes.CreateMany(createIndexModels); - } - private static void AddMemberShipTableCollectionIndex(IMongoCollection collection) - { - List> createIndexMembershipModels = new List>(); - - var serviceNameIndex = Builders.IndexKeys.Ascending(m => m.ServiceName); - CreateIndexModel serviceNameIndexModel = new CreateIndexModel(serviceNameIndex, new CreateIndexOptions() { Background = true, Unique = true }); - createIndexMembershipModels.Add(serviceNameIndexModel); - - collection.Indexes.CreateMany(createIndexMembershipModels); - } - - private static void AddMapper() - { - BsonClassMap.RegisterClassMap(cm => - { - cm.AutoMap(); - cm.UnmapMember(m => m.MessageStatus); - cm.MapIdField(m => m.Id); - cm.MapMember(m => m.NextRetryUTCTime).SetIsRequired(true); - }); - BsonClassMap.RegisterClassMap(cm => - { - cm.AutoMap(); - cm.MapIdField(m => m.Id).SetIdGenerator(StringObjectIdGenerator.Instance); - }); - } - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs b/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs deleted file mode 100644 index 5b9c8a8..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Microsoft.AspNetCore.Builder; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IApplicationBuilderConfigurator - { - void Config(IApplicationBuilder applicationBuilder); - void Add(Action config); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs b/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs deleted file mode 100644 index da17eb8..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IComteckReliableMessageBootstrap - { - Task Initialize(IServiceCollection services, List eventHandlerAssemblies, List eventAssemblies); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs b/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs deleted file mode 100644 index 88dd982..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IJsonConverter - { - string Serialize(object obj); - T Deserialize(string json); - object Deserialize(string json,Type type); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs deleted file mode 100644 index 9f10177..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageBus - { - Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs deleted file mode 100644 index 0261d1a..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageBusConfigurator - { - Task Configure(IServiceCollection services, IEnumerable eventHandlerTypes); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs deleted file mode 100644 index 3a7e5c6..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Pole.ReliableMessage.Messaging.CallBack; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageCallBackInfoGenerator - { - MessageCallBackInfo Generate(Type eventHandlerType); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs deleted file mode 100644 index c82ab66..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Pole.ReliableMessage.Messaging.CallBack; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageCallBackInfoStore - { - Task Add(MessageCallBackInfo messageCallBackInfo); - Task Get(string messageTypeId); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs deleted file mode 100644 index c39a628..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageCallBackRegister - { - Task Register(IEnumerable eventHandlerTypes); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs deleted file mode 100644 index a16c5b9..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Pole.ReliableMessage.Storage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageCheckRetryer - { - Task Execute(IEnumerable messages, DateTime dateTime); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs deleted file mode 100644 index 5092971..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs +++ /dev/null @@ -1,15 +0,0 @@ -using Pole.ReliableMessage.Messaging; -using Pole.ReliableMessage.Storage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageChecker - { - Task GetResult(Message message); - - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs deleted file mode 100644 index aa888c7..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageIdGenerator - { - string Generate(); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs deleted file mode 100644 index 58d5206..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageTypeIdGenerator - { - string Generate(Type messageType); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IProcessor.cs b/src/Pole.ReliableMessage/Abstraction/IProcessor.cs deleted file mode 100644 index 8037f05..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IProcessor.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Pole.ReliableMessage.Processor; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IProcessor - { - string Name { get; } - Task Process(ProcessingContext context); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs b/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs deleted file mode 100644 index 61f1988..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IProcessorServer - { - Task Start(CancellationToken stoppingToken); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs deleted file mode 100644 index bb86e56..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IReliableBus - { - Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class; - Task PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default); - Task Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default); - Task Cancel(string prePublishMessageId, CancellationToken cancellationToken = default); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs deleted file mode 100644 index 8cfded7..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IReliableEventCallBackFinder - { - List FindAll(IEnumerable assemblies); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs deleted file mode 100644 index 22e7b5f..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IReliableEventCallback : IReliableEventCallback - { - Task Callback(TCallbackParemeter callbackParemeter); - } - public interface IReliableEventCallback - { - - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs deleted file mode 100644 index 03df883..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IReliableEventHandler : IReliableEventHandler - where TEvent : class - { - Task Handle(IReliableEventHandlerContext context); - - } - public interface IReliableEventHandler - { - - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs deleted file mode 100644 index 9b36ea8..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IReliableEventHandlerContext where TEvent : class - { - TEvent Event { get; } - //Task Publish(object @event); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs deleted file mode 100644 index bd17aa1..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IReliableEventHandlerFinder - { - List FindAll(IEnumerable assemblies); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs b/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs deleted file mode 100644 index 5302364..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IRetryTimeDelayCalculator - { - int Get(int retryTimes, int maxPendingMessageRetryDelay); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs b/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs deleted file mode 100644 index cdc42f7..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IServiceIPv4AddressProvider - { - string Get(); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs b/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs deleted file mode 100644 index a844fc3..0000000 --- a/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface ITimeHelper - { - DateTime GetUTCNow(); - /// - /// "UTC :{_timeHelper.GetNow().ToString("yyyy-MM-dd HH:mm:ss.fff")}" - /// - /// - string GetAppropriateFormatedDateString(); - } -} diff --git a/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs b/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs deleted file mode 100644 index d6d8c73..0000000 --- a/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs +++ /dev/null @@ -1,47 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Processor; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage -{ - public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer - { - private readonly IServiceProvider _serviceProvider; - private Task _compositeTask; - - public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - } - public async Task Start(CancellationToken stoppingToken) - { - - ProcessingContext processingContext = new ProcessingContext(stoppingToken); - List loopProcessors = new List(); - var innerProcessors = _serviceProvider.GetServices(); - var loggerFactory = _serviceProvider.GetService(); - var timeHelper = _serviceProvider.GetService(); - foreach (var innerProcessor in innerProcessors) - { - LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory, timeHelper); - loopProcessors.Add(processor); - } - var tasks = loopProcessors.Select(p => p.Process(processingContext)); - - _compositeTask = Task.WhenAll(tasks); - await _compositeTask; - } - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - return Start(stoppingToken); - } - } -} diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs deleted file mode 100644 index a8f9913..0000000 --- a/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs +++ /dev/null @@ -1,27 +0,0 @@ -using Pole.ReliableMessage; -using Pole.ReliableMessage.Abstraction; -using Microsoft.AspNetCore.Builder; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; - -namespace Microsoft.AspNetCore.Builder -{ - public static class ComteckReliableMessageIApplicationBuilderExtensions - { - public static IApplicationBuilder UsePoleReliableMessage(this IApplicationBuilder applicationBuilder) - { - var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions)) as IOptions; - var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister; - var reliableEventCallBackFinder = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IReliableEventCallBackFinder)) as IReliableEventCallBackFinder; - - var eventCallbacks = reliableEventCallBackFinder.FindAll(option.Value.EventCallbackAssemblies); - messageCallBackRegister.Register(eventCallbacks).GetAwaiter().GetResult(); - - return applicationBuilder; - } - } -} diff --git a/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs b/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs deleted file mode 100644 index 805204d..0000000 --- a/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.AspNetCore.Builder; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage -{ - class DefaultApplicationBuilderConfigurator : IApplicationBuilderConfigurator - { - private readonly List> _configs = new List>(); - public void Add(Action config) - { - _configs.Add(config); - } - - public void Config(IApplicationBuilder applicationBuilder) - { - _configs.ForEach(m => { - m(applicationBuilder); - }); - } - } -} diff --git a/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs b/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs deleted file mode 100644 index f0987b6..0000000 --- a/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs +++ /dev/null @@ -1,42 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage -{ - class DefaultComteckReliableMessageBootstrap : IComteckReliableMessageBootstrap - { - private readonly IReliableEventHandlerFinder _reliableEventHandlerFinder; - private readonly IMessageBusConfigurator _messageBusConfigurator; - private readonly IMessageCallBackRegister _messageCallBackRegister; - private readonly IReliableEventCallBackFinder _reliableEventCallBackFinder; - public DefaultComteckReliableMessageBootstrap(IReliableEventHandlerFinder reliableEventHandlerFinder, IMessageBusConfigurator messageBusConfigurator, IMessageCallBackRegister messageCallBackRegister, IReliableEventCallBackFinder reliableEventCallBackFinder) - { - _reliableEventHandlerFinder = reliableEventHandlerFinder; - _messageBusConfigurator = messageBusConfigurator; - _messageCallBackRegister = messageCallBackRegister; - _reliableEventCallBackFinder = reliableEventCallBackFinder; - } - public async Task Initialize(IServiceCollection services, List eventHandlerAssemblies, List eventAssemblies) - { - var eventHandlers = _reliableEventHandlerFinder.FindAll(eventHandlerAssemblies); - await _messageBusConfigurator.Configure(services, eventHandlers); - - var eventCallbacks = _reliableEventCallBackFinder.FindAll(eventAssemblies); - await _messageCallBackRegister.Register(eventCallbacks); - RegisterEventCallbacks(services, eventCallbacks); - } - - private void RegisterEventCallbacks(IServiceCollection services, List eventCallbacks) - { - eventCallbacks.ForEach(m => - { - services.AddScoped(m); - }); - } - } -} diff --git a/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs b/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs deleted file mode 100644 index 3c9c692..0000000 --- a/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs +++ /dev/null @@ -1,103 +0,0 @@ -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Storage.Abstraction; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage -{ - class DefaultMessageCheckRetryer : IMessageCheckRetryer - { - private readonly ILogger _logger; - private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator; - private readonly ReliableMessageOption _options; - private readonly IMessageStorage _storage; - private readonly IMessageChecker _messageChecker; - private readonly IMessageBus _messageBus; - private readonly List _changedMessage = new List(); - public DefaultMessageCheckRetryer(ILogger logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus) - { - _logger = logger; - _retryTimeDelayCalculator = retryTimeDelayCalculator; - _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - _storage = storage; - _messageChecker = messageChecker; - _messageBus = messageBus; - } - public async Task Execute(IEnumerable messages, DateTime dateTime) - { - try - { - messages.AsParallel().ForAll(async m => await Retry(m, dateTime)); - if (_changedMessage.Count != 0) - { - await _storage.Save(_changedMessage); - } - } - catch (Exception ex) - { - _logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors"); - } - finally - { - if (_changedMessage.Count != 0) - { - _changedMessage.Clear(); - } - } - } - private async Task Retry(Message message, DateTime retryTime) - { - try - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry"); - } - var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay); - message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay); - - if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut)) - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}"); - } - - message.NextRetryUTCTime = DateTime.MinValue; - message.MessageStatus = MessageStatus.Canced; - _changedMessage.Add(message); - return; - } - message.RetryTimes++; - - var messageCheckerResult = await _messageChecker.GetResult(message); - if (messageCheckerResult.IsFinished) - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed"); - } - message.MessageStatus = MessageStatus.Pushed; - await _messageBus.Publish(messageCheckerResult.Event, message.Id); - } - else - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time"); - } - } - _changedMessage.Add(message); - } - catch (Exception ex) - { - _logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id} retry with errors"); - } - } - } -} diff --git a/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs b/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs deleted file mode 100644 index 3d8d327..0000000 --- a/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage -{ - class DefaultRetryTimeDelayCalculator : IRetryTimeDelayCalculator - { - public int Get(int retryTimes, int maxPendingMessageRetryDelay) - { - var retryTimeDelay = (int)Math.Pow(2, retryTimes + 1); - if (retryTimeDelay >= maxPendingMessageRetryDelay) - { - return maxPendingMessageRetryDelay; - } - return retryTimeDelay; - } - } -} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs deleted file mode 100644 index 751965d..0000000 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs +++ /dev/null @@ -1,124 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using ILogger = Microsoft.Extensions.Logging.ILogger; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.Pole.ReliableMessage.EventBus -{ - public class DefaultReliableBus : IReliableBus - { - private readonly IMessageBus _messageBus; - private readonly IMessageStorage _messageStorage; - private readonly IMessageIdGenerator _messageIdGenerator; - private readonly ITimeHelper _timeHelper; - //private readonly IMessageBuffer _messageBuffer; - private readonly ILogger _logger; - private readonly IJsonConverter _jsonConverter; - private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; - private readonly IMessageTypeIdGenerator _messageTypeIdGenerator; - public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator) - { - _messageBus = messageBus; - _messageStorage = messageStorage; - _messageIdGenerator = messageIdGenerator; - _timeHelper = timeHelper; - _logger = logger; - _jsonConverter = jsonConverter; - _messageCallBackInfoStore = messageCallBackInfoStore; - _messageTypeIdGenerator = messageTypeIdGenerator; - } - - public Task Cancel(string prePublishMessageId, CancellationToken cancellationToken = default) - { - try - { - return _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId, MessageStatus.Canced); - } - catch (Exception ex) - { - var errorInfo = $"Cancel PrePublish errors in defaultReliableBus;{ex.Message}"; - _logger.LogError(ex, errorInfo); - throw new Exception(errorInfo, ex); - } - } - - #region PrePublish - public async Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class - { - - var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent)); - var content = _jsonConverter.Serialize(@event); - - return await PrePublishCore(callbackParemeter, messageTypeId, content); - } - - public async Task PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default) - { - var messageTypeId = _messageTypeIdGenerator.Generate(eventType); - var content = _jsonConverter.Serialize(@event); - return await PrePublishCore(callbackParemeter, messageTypeId, content); - } - private async Task PrePublishCore(object callbackParemeter, string messageTypeId, string content) - { - var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId); - if (currentMessageCallbackInfo == null) - { - throw new Exception($"Current message type Callback not registered ,messageTypeId:{messageTypeId}"); - } - try - { - var messageId = _messageIdGenerator.Generate(); - - _logger.LogDebug($"PrePublish message begin ,messageId:{messageId}"); - - var now = _timeHelper.GetUTCNow(); - - var callBackParem = _jsonConverter.Serialize(callbackParemeter); - Message newMessage = new Message() - { - AddedUTCTime = now, - Content = content, - Id = messageId, - MessageStatusId = MessageStatus.Pending.Id, - MessageTypeId = messageTypeId, - RePushCallBackParameterValue = callBackParem, - NextRetryUTCTime = DateTime.MinValue - }; - await _messageStorage.Add(newMessage); - - _logger.LogDebug($"PrePublish message successful ,messageId:{messageId}"); - - return messageId; - } - catch (Exception ex) - { - var errorInfo = $"PrePublish errors in DefaultReliableBus;{ex.Message}"; - _logger.LogError(ex, errorInfo); - throw new Exception(errorInfo, ex); - } - } - #endregion - - public async Task Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default) - { - try - { - await _messageBus.Publish(@event, prePublishMessageId, cancellationToken); - - var messageBufferResult = await _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId && m.MessageStatusId == MessageStatus.Pending.Id, MessageStatus.Pushed); - return messageBufferResult; - } - catch (Exception ex) - { - var errorInfo = $"Publish errors in DefaultReliableBus;{ex.Message}"; - _logger.LogError(ex, errorInfo); - throw new Exception(errorInfo, ex); - } - } - } -} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs deleted file mode 100644 index 2665937..0000000 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage.EventBus -{ - class DefaultReliableEventCallBackFinder : IReliableEventCallBackFinder - { - public List FindAll(IEnumerable assemblies) - { - var callbackType = typeof(IReliableEventCallback); - - var callbackTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => callbackType.IsAssignableFrom(type))); - return callbackTypes.ToList(); ; - } - } -} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs deleted file mode 100644 index ebe3328..0000000 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage.EventBus -{ - public class DefaultReliableEventHandlerFinder : IReliableEventHandlerFinder - { - public List FindAll(IEnumerable assemblies) - { - var eventHandlerType = typeof(IReliableEventHandler); - - var eventHandlerTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventHandlerType.IsAssignableFrom(type))); - return eventHandlerTypes.ToList(); - } - } -} diff --git a/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs deleted file mode 100644 index 5032fa9..0000000 --- a/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage -{ - public interface IReliableMessageOptionExtension - { - void AddServices(IServiceCollection services); - } -} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs deleted file mode 100644 index 6e3c93e..0000000 --- a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs +++ /dev/null @@ -1,51 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.EventBus; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Messaging.CallBack -{ - class DefaultMessageCallBackInfoGenerator : IMessageCallBackInfoGenerator - { - private readonly IMessageTypeIdGenerator _messageTypeIdGenerator; - public DefaultMessageCallBackInfoGenerator(IMessageTypeIdGenerator messageTypeIdGenerator) - { - _messageTypeIdGenerator = messageTypeIdGenerator; - } - public MessageCallBackInfo Generate(Type eventCallbackType) - { - var @interface = eventCallbackType.GetInterfaces().FirstOrDefault(); - Func> deleg = MakeCallBackFunc(eventCallbackType, @interface); - - var eventType = @interface.GetGenericArguments()[0]; - var eventCallbackArguemntType = @interface.GetGenericArguments()[1]; - var enentName = _messageTypeIdGenerator.Generate(eventType); - - MessageCallBackInfo messageCallBackInfo = new MessageCallBackInfo(enentName, deleg, eventCallbackType, eventCallbackArguemntType, eventType); - return messageCallBackInfo; - } - - private static Func> MakeCallBackFunc(Type eventType, Type @interface) - { - var callbackParemeterType = @interface.GetGenericArguments()[1]; - var argument = Expression.Parameter(typeof(object)); - var paremeter = Expression.Parameter(typeof(object)); - // var typedParemeter = Expression.Parameter(eventType); - var typedcallbackParemeter = Expression.Convert(argument, callbackParemeterType); - - var typedParemeter = Expression.Convert(paremeter, eventType); - - var callBackMethod = eventType.GetMethod("Callback"); - var call = Expression.Call(typedParemeter, callBackMethod, typedcallbackParemeter); - - //var innerParemeter = eventType.GetInterfaces().FirstOrDefault(); - var lambda = Expression.Lambda>>(call, true, argument, paremeter); - var deleg = lambda.Compile(); - return deleg; - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs deleted file mode 100644 index fbe8035..0000000 --- a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs +++ /dev/null @@ -1,27 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Messaging.CallBack -{ - class DefaultMessageCallBackRegister : IMessageCallBackRegister - { - private readonly IMessageCallBackInfoGenerator _messageCallBackInfoGenerator; - private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; - public DefaultMessageCallBackRegister(IMessageCallBackInfoGenerator messageCallBackInfoGenerator, IMessageCallBackInfoStore messageCallBackInfoStore) - { - _messageCallBackInfoGenerator = messageCallBackInfoGenerator; - _messageCallBackInfoStore = messageCallBackInfoStore; - } - public async Task Register(IEnumerable eventCallbackTypes) - { - foreach(var eventCallbackType in eventCallbackTypes) - { - var messageCallBackInfo = _messageCallBackInfoGenerator.Generate(eventCallbackType); - await _messageCallBackInfoStore.Add(messageCallBackInfo); - } - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs deleted file mode 100644 index e111c97..0000000 --- a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Messaging.CallBack -{ - public class MessageCallBackInfo - { - private Func> _callBack; - public MessageCallBackInfo(string messageTypeId, Func> callBack, Type eventCallbackType, Type eventCallbackArguemntType,Type eventType) - { - MessageTypeId = messageTypeId; - _callBack = callBack; - EventCallbackType = eventCallbackType; - EventCallbackArguemntType = eventCallbackArguemntType; - EventType = eventType; - } - public string MessageTypeId { get;private set; } - public Type EventType { get; private set; } - public Type EventCallbackType { get; private set; } - public Type EventCallbackArguemntType { get; private set; } - - public Task Invoke(object parameter, object reliableEvent) - { - return _callBack(parameter, reliableEvent); - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs deleted file mode 100644 index 7cee68e..0000000 --- a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs +++ /dev/null @@ -1,38 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Messaging.CallBack -{ - public class MessageCallBackInfoInMemoryStore : Dictionary, IMessageCallBackInfoStore - { - private readonly ILogger _logger; - public MessageCallBackInfoInMemoryStore(ILogger logger) - { - _logger = logger; - } - public async Task Add(MessageCallBackInfo messageCallBackInfo) - { - await Task.CompletedTask; - if (TryGetValue(messageCallBackInfo.MessageTypeId, out MessageCallBackInfo info)) - { - throw new Exception($"Add MessageCallBackInfo Faild , MessageCallBackInfo Already Added ,MessageTypeId:{messageCallBackInfo.MessageTypeId}"); - } - Add(messageCallBackInfo.MessageTypeId, messageCallBackInfo); - _logger.LogDebug($"Add MessageCallBackInfo Success ,MessageTypeId:{messageCallBackInfo.MessageTypeId}"); - } - - public async Task Get(string messageTypeId) - { - await Task.CompletedTask; - if (TryGetValue(messageTypeId, out MessageCallBackInfo info)) - { - return info; - } - return null; - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs deleted file mode 100644 index 331ff05..0000000 --- a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs +++ /dev/null @@ -1,18 +0,0 @@ -using Pole.Domain; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Messaging.CallBack -{ - public class MessageCallBackResponseStatus : Enumeration - { - public static MessageCallBackResponseStatus Finised = new MessageCallBackResponseStatus(3, "已完成"); - public static MessageCallBackResponseStatus Unfinished = new MessageCallBackResponseStatus(6, "未完成"); - public static MessageCallBackResponseStatus Unusual = new MessageCallBackResponseStatus(9, "异常"); - - public MessageCallBackResponseStatus(int id, string name) : base(id, name) - { - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs b/src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs deleted file mode 100644 index 0159576..0000000 --- a/src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs +++ /dev/null @@ -1,25 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Messaging -{ - class DefaultJsonConverter : IJsonConverter - { - public T Deserialize(string json) - { - return System.Text.Json.JsonSerializer.Deserialize(json); - } - - public object Deserialize(string json, Type type) - { - return System.Text.Json.JsonSerializer.Deserialize(json, type); - } - - public string Serialize(object obj) - { - return System.Text.Json.JsonSerializer.Serialize(obj); - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs deleted file mode 100644 index fc44b21..0000000 --- a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs +++ /dev/null @@ -1,63 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; -using Pole.ReliableMessage.Messaging.CallBack; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Messaging -{ - class DefaultMessageChecker : IMessageChecker - { - private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; - private readonly ILogger _logger; - private readonly IJsonConverter _jsonConverter; - private readonly IServiceProvider _serviceProvider; - public DefaultMessageChecker(IMessageCallBackInfoStore messageCallBackInfoStore, ILogger logger, IJsonConverter jsonConverter, IServiceProvider serviceProvider) - { - _messageCallBackInfoStore = messageCallBackInfoStore; - _logger = logger; - _jsonConverter = jsonConverter; - _serviceProvider = serviceProvider; - } - public async Task GetResult(Message message) - { - try - { - var callBackInfo = await _messageCallBackInfoStore.Get(message.MessageTypeId); - if (callBackInfo == null) - { - _logger.LogError($"Can't find the callBackInfo, MessageTypeId:{message.MessageTypeId}"); - return MessageCheckerResult.NotFinished; - } - using (var scope = _serviceProvider.CreateScope()) - { - var callback = scope.ServiceProvider.GetRequiredService(callBackInfo.EventCallbackType); - var argument = _jsonConverter.Deserialize(message.RePushCallBackParameterValue, callBackInfo.EventCallbackArguemntType); - var result = await callBackInfo.Invoke(argument, callback); - if (_logger.IsEnabled(LogLevel.Debug)) - { - var messageInfoDetail = _jsonConverter.Serialize(message); - _logger.LogDebug($"DefaultMessageChecker IsFinished Result:{result.ToString()},MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}"); - } - if (result) - { - var @event = _jsonConverter.Deserialize(message.Content, callBackInfo.EventType); - return new MessageCheckerResult(true, @event); - } - return MessageCheckerResult.NotFinished; - } - } - catch (Exception ex) - { - var messageInfoDetail = _jsonConverter.Serialize(message); - _logger.LogError(ex, $"DefaultMessageChecker.IsFinished Error, MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}"); - return MessageCheckerResult.NotFinished; - } - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs deleted file mode 100644 index 893433a..0000000 --- a/src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs +++ /dev/null @@ -1,15 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Messaging -{ - class DefaultMessageTypeIdGenerator : IMessageTypeIdGenerator - { - public string Generate(Type messageType) - { - return messageType.FullName; - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs b/src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs deleted file mode 100644 index a3ab5cb..0000000 --- a/src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Messaging -{ - public class MessageCheckerResult - { - public static MessageCheckerResult NotFinished = new MessageCheckerResult(false); - public MessageCheckerResult(bool isFinished,object @event):this(isFinished) - { - Event = @event; - } - public MessageCheckerResult(bool isFinished) - { - IsFinished = isFinished; - } - public bool IsFinished { get; set; } - public object Event { get; set; } - } -} diff --git a/src/Pole.ReliableMessage/Pole.ReliableMessage.csproj b/src/Pole.ReliableMessage/Pole.ReliableMessage.csproj deleted file mode 100644 index a71d49c..0000000 --- a/src/Pole.ReliableMessage/Pole.ReliableMessage.csproj +++ /dev/null @@ -1,24 +0,0 @@ - - - - netstandard2.0 - - - - - - - - - - - - - - - - - - - - diff --git a/src/Pole.ReliableMessage/PoleReliableMessageServiceCollectionExtensions.cs b/src/Pole.ReliableMessage/PoleReliableMessageServiceCollectionExtensions.cs deleted file mode 100644 index 3dcc225..0000000 --- a/src/Pole.ReliableMessage/PoleReliableMessageServiceCollectionExtensions.cs +++ /dev/null @@ -1,67 +0,0 @@ -using Pole.Pole.ReliableMessage.EventBus; -using Pole.ReliableMessage; -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.EventBus; -using Pole.ReliableMessage.Messaging; -using Pole.ReliableMessage.Messaging.CallBack; -using Pole.ReliableMessage.Processor; -using Pole.ReliableMessage.Utils; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Microsoft.Extensions.DependencyInjection -{ - public static class PoleReliableMessageServiceCollectionExtensions - { - public static IServiceCollection AddPoleReliableMessage(this IServiceCollection services, Action optionConfig) - { - ReliableMessageOption reliableMessageOption = new ReliableMessageOption(); - optionConfig(reliableMessageOption); - - foreach(var extension in reliableMessageOption.ReliableMessageOptionExtensions) - { - extension.AddServices(services); - } - services.Configure(optionConfig); - - services.AddSingleton(); - //services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - - services.AddHostedService(); - - services.AddHttpClient(); - - - - - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - - var provider = services.BuildServiceProvider(); - - IComteckReliableMessageBootstrap applicationBuilderConfigurator = provider.GetService(typeof(IComteckReliableMessageBootstrap)) as IComteckReliableMessageBootstrap; - - applicationBuilderConfigurator.Initialize(services, reliableMessageOption.EventHandlerAssemblies, reliableMessageOption.EventCallbackAssemblies); - return services; - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/LoopProcessor.cs b/src/Pole.ReliableMessage/Processor/LoopProcessor.cs deleted file mode 100644 index e52ffa7..0000000 --- a/src/Pole.ReliableMessage/Processor/LoopProcessor.cs +++ /dev/null @@ -1,51 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Processor -{ - public class LoopProcessor : ProcessorBase - { - private IProcessor _processor; - private readonly ILoggerFactory _loggerFactory; - private readonly ITimeHelper _timeHelper; - - public LoopProcessor(IProcessor processor, ILoggerFactory loggerFactory, ITimeHelper timeHelper) - { - _processor = processor; - _loggerFactory = loggerFactory; - _timeHelper = timeHelper; - } - public override string Name => "LoopProcessor"; - public override async Task Process(ProcessingContext context) - { - var logger = _loggerFactory.CreateLogger(); - - while (!context.IsStopping) - { - try - { - logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process start"); - - await _processor.Process(context); - - logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process compelete"); - } - catch (Exception ex) - { - logger.LogError(ex, $"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process error"); - } - } - } - public override string ToString() - { - var strArray = new string[2]; - strArray[0] = Name; - strArray[1] = _processor.Name; - return string.Join("_", strArray); - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs deleted file mode 100644 index fa2cff3..0000000 --- a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs +++ /dev/null @@ -1,58 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Processor -{ - public class MessageCleanProcessor : ProcessorBase - { - private readonly ReliableMessageOption _options; - private readonly ILogger _logger; - private readonly IMessageStorage _messageStorage; - private readonly IMemberShipTableManager _memberShipTable; - private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - public MessageCleanProcessor(IOptions options, ILogger logger, IMessageStorage messageStorage, IMemberShipTableManager memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider) - { - _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - _logger = logger; - _messageStorage = messageStorage; - _memberShipTable = memberShipTable; - _serviceIPv4AddressProvider = serviceIPv4AddressProvider; - } - public override string Name => nameof(PendingMessageCheckProcessor); - - - public override async Task Process(ProcessingContext context) - { - try - { - var iPStr = _serviceIPv4AddressProvider.Get(); - var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去 - if (!isPendingChecker) - { - _logger.LogDebug("I an not the pendingChecker ,Ignore clean up messages"); - return; - } - _logger.LogInformation($"Begin clean message"); - - var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Pushed.Id); - - _logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully"); - } - catch (Exception ex) - { - _logger.LogError(ex, $"Clean message error"); - } - finally - { - await Task.Delay(_options.MessageCleanInterval * 1000); - } - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs deleted file mode 100644 index c72b3e1..0000000 --- a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs +++ /dev/null @@ -1,82 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.NetworkInformation; -using System.Net.Sockets; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Processor -{ - class PendingMessageCheckProcessor : ProcessorBase - { - private readonly IMessageStorage _storage; - private readonly ReliableMessageOption _options; - //private readonly IMessageBuffer _messageBuffer; - private readonly ITimeHelper _timeHelper; - private readonly IMemberShipTableManager _memberShipTable; - private readonly ILogger _logger; - private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - private readonly IMessageCheckRetryer _messageCheckRetryer; - public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, ITimeHelper timeHelper, IMemberShipTableManager memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer) - { - _storage = storage; - _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - //_messageBuffer = messageBuffer; - _timeHelper = timeHelper; - _memberShipTable = memberShipTable; - _logger = logger; - _serviceIPv4AddressProvider = serviceIPv4AddressProvider; - _messageCheckRetryer = messageCheckRetryer; - } - public override string Name => nameof(PendingMessageCheckProcessor); - - - public override async Task Process(ProcessingContext context) - { - try - { - var iPStr = _serviceIPv4AddressProvider.Get(); - - var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去 - if (!isPendingChecker) - { - _logger.LogDebug("I an not the PendingChecker ,Ignore check"); - return; - } - - await ProcessInternal(); - } - catch (Exception ex) - { - _logger.LogError(ex, $"PendingMessageCheckProcessor Process Error"); - } - finally - { - await Task.Delay(_options.PendingMessageRetryInterval * 1000); - } - } - public async Task ProcessInternal() - { - var now = _timeHelper.GetUTCNow(); - var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id && m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime) && m.AddedUTCTime >= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds), _options.PendingMessageCheckBatchCount); - - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}"); - } - - await _messageCheckRetryer.Execute(pendingMessages, now); - // 说明此时 消息数量超过 批量获取数 - if (pendingMessages.Count == _options.PendingMessageCheckBatchCount) - { - await ProcessInternal(); - } - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs deleted file mode 100644 index 07696b9..0000000 --- a/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs +++ /dev/null @@ -1,68 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Processor -{ - class PendingMessageServiceInstanceCheckProcessor : ProcessorBase - { - private readonly ReliableMessageOption _options; - private readonly ITimeHelper _timeHelper; - private readonly IMemberShipTableManager _memberShipTable; - private readonly ILogger _logger; - private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - public PendingMessageServiceInstanceCheckProcessor(IOptions options, ITimeHelper timeHelper, IMemberShipTableManager memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider) - { - _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - _timeHelper = timeHelper; - _memberShipTable = memberShipTable; - _logger = logger; - _serviceIPv4AddressProvider = serviceIPv4AddressProvider; - } - public override string Name => nameof(PendingMessageCheckProcessor); - - - public override async Task Process(ProcessingContext context) - { - try - { - var now = _timeHelper.GetUTCNow(); - var iPStr = _serviceIPv4AddressProvider.Get(); - _logger.LogDebug($"Current instance ip is {iPStr}"); - - var currentCheckIp = await _memberShipTable.GetPendingMessageCheckerServiceInstanceIp(now.AddSeconds(-1 * _options.PendingMessageCheckerInstanceIAnAliveTimeout)); - if (string.IsNullOrEmpty(currentCheckIp)) - { - var addInstanceResult = await _memberShipTable.AddCheckerServiceInstanceAndDeleteOthers(iPStr, now); - if (addInstanceResult) - { - _logger.LogInformation($"I am the PendingMessageCheck now, ip:{iPStr}"); - return; - } - _logger.LogError($"There is no PendingMessageChecker ,I want to be the PendingMessageCheck ,but faild ,memberShipTable.AddCheckerServiceInstance faild , ip:{iPStr}"); - return; - } - if (currentCheckIp == iPStr) - { - _logger.LogDebug($"Begin update pendingMessageChecker iAmAliveUTCTime"); - await _memberShipTable.UpdateIAmAlive(currentCheckIp, now); - _logger.LogDebug($"Update pendingMessageChecker iAmAliveUTCTime successfully"); - } - } - catch (Exception ex) - { - _logger.LogError(ex, $"PendingMessageServiceInstanceCheckProcessor Process Error"); - } - finally - { - await Task.Delay(_options.PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay * 1000); - } - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/ProcessingContext.cs b/src/Pole.ReliableMessage/Processor/ProcessingContext.cs deleted file mode 100644 index 0d51926..0000000 --- a/src/Pole.ReliableMessage/Processor/ProcessingContext.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; - -namespace Pole.ReliableMessage.Processor -{ - public class ProcessingContext - { - public ProcessingContext(CancellationToken cancellationToken) - { - CancellationToken = cancellationToken; - } - public CancellationToken CancellationToken { get; } - public bool IsStopping => CancellationToken.IsCancellationRequested; - } -} diff --git a/src/Pole.ReliableMessage/Processor/ProcessorBase.cs b/src/Pole.ReliableMessage/Processor/ProcessorBase.cs deleted file mode 100644 index e6cd01a..0000000 --- a/src/Pole.ReliableMessage/Processor/ProcessorBase.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Processor -{ - public abstract class ProcessorBase : IProcessor - { - public abstract string Name { get; } - - public abstract Task Process(ProcessingContext context); - - public override string ToString() - { - return Name; - } - } -} diff --git a/src/Pole.ReliableMessage/ReliableMessageOption.cs b/src/Pole.ReliableMessage/ReliableMessageOption.cs deleted file mode 100644 index fea3d98..0000000 --- a/src/Pole.ReliableMessage/ReliableMessageOption.cs +++ /dev/null @@ -1,97 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage -{ - public class ReliableMessageOption - { - public List ReliableMessageOptionExtensions { get; private set; } = new List(); - public List EventCallbackAssemblies { get; private set; } - public List EventHandlerAssemblies { get; private set; } - /// - /// 待发送消息重试间隔 单位 秒 - /// - public int PendingMessageRetryInterval { get; set; } = 10; - - /// - /// 待发送消息最大重试次数 , 0 为 无上限 - /// - public int PendingMessageRetryTimes { get; set; } = 0; - - /// - /// 预发送消息超时时间 单位 秒 - /// - public int PendingMessageTimeOut { get; set; } = 10 * 60; - - /// - /// 预发送消息检查时每一次获取的消息数量 - /// - public int PendingMessageCheckBatchCount { get; set; } = 1000; - - /// - /// 预发送消息状态检查最后时间 单位 秒 - /// - public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13 * 60; - - /// - /// 已发送的消息缓冲区 flush to storage 的时间间隔 单位 秒 - /// - public int PushedMessageFlushInterval { get; set; } = 2; - - - /// - /// PendingMessage 第一次处理等待时间 单位 秒 - /// - public int PendingMessageFirstProcessingWaitTime { get; set; } = 2 + 10; - - /// - /// 每次重试之间最大间隔 单位 秒 - /// - public int MaxPendingMessageRetryDelay { get; set; } = 2 * 60; - - /// - /// PendingMessageCheck 实例更新 存活时间的时间间隔 单位 秒 - /// - public int PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay { get; set; } = 10; - - /// - /// PendingMessageCheck 实例存活超时时间 单位 秒 - /// - public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3 * 10; - - /// - /// Message 定期清理时间间隔 单位 秒 - /// - public int MessageCleanInterval { get; set; } = 30 * 60; - - /// - /// 当主机有多个网络时通过指定网关地址找到合适的服务ip地址 - /// - public string NetworkInterfaceGatewayAddress { get; set; } = string.Empty; - - public ReliableMessageOption AddEventAssemblies(params Assembly[] assemblies) - { - EventCallbackAssemblies = assemblies.ToList(); - return this; - } - public ReliableMessageOption AddEventAssemblies(IEnumerable assemblies) - { - EventCallbackAssemblies = assemblies.ToList(); - return this; - } - public ReliableMessageOption AddEventHandlerAssemblies(params Assembly[] assemblies) - { - EventHandlerAssemblies = assemblies.ToList(); - return this; - } - public ReliableMessageOption AddEventHandlerAssemblies(IEnumerable assemblies) - { - EventHandlerAssemblies = assemblies.ToList(); - return this; - } - } -} diff --git a/src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs b/src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs deleted file mode 100644 index a9945b8..0000000 --- a/src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Utils -{ - class DefaulTimeHelper : ITimeHelper - { - public string GetAppropriateFormatedDateString() - { - return DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); - } - - public DateTime GetUTCNow() - { - return DateTime.UtcNow; - } - } -} diff --git a/src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs b/src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs deleted file mode 100644 index cd2a250..0000000 --- a/src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs +++ /dev/null @@ -1,57 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.NetworkInformation; -using System.Net.Sockets; -using System.Text; - -namespace Pole.ReliableMessage.Utils -{ - class DefaultServiceIPv4AddressProvider : IServiceIPv4AddressProvider - { - private readonly ReliableMessageOption _options; - private string _ipAddress; - public DefaultServiceIPv4AddressProvider(IOptions options) - { - _options = options.Value; - Init(); - } - - private void Init() - { - var gatewayAddress = _options.NetworkInterfaceGatewayAddress; - NetworkInterface networkInterface = null; - if (string.IsNullOrEmpty(_options.NetworkInterfaceGatewayAddress)) - { - networkInterface = NetworkInterface.GetAllNetworkInterfaces() - .OrderByDescending(c => c.Speed) - .Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up) - .FirstOrDefault(); - } - else - { - networkInterface = NetworkInterface.GetAllNetworkInterfaces() -.OrderByDescending(c => c.Speed).Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up).Where(m => m.GetIPProperties().GatewayAddresses.FirstOrDefault(c => c.Address.AddressFamily == AddressFamily.InterNetwork)?.Address.ToString() == gatewayAddress) -.FirstOrDefault(); - } - if (networkInterface == null) - { - throw new Exception($"Not found correct NetworkInterface, option.NetworkInterfaceGatewayAddress:{gatewayAddress}"); - } - var props = networkInterface.GetIPProperties(); - // get first IPV4 address assigned to this interface - var firstIpV4Address = props.UnicastAddresses - .Where(c => c.Address.AddressFamily == AddressFamily.InterNetwork) - .Select(c => c.Address) - .FirstOrDefault(); - _ipAddress = firstIpV4Address.ToString(); - } - - public string Get() - { - return _ipAddress; - } - } -}