diff --git a/src/Pole.Core/Channels/AsyncInputEvent.cs b/src/Pole.Core/Channels/AsyncInputEvent.cs deleted file mode 100644 index a1818c3..0000000 --- a/src/Pole.Core/Channels/AsyncInputEvent.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.Threading.Tasks; - -namespace Pole.Core.Channels -{ - public class AsyncInputEvent - { - public AsyncInputEvent(Input data) - { - Value = data; - } - public TaskCompletionSource TaskSource { get; } = new TaskCompletionSource(); - public Input Value { get; set; } - } -} diff --git a/src/Pole.Core/Consts.cs b/src/Pole.Core/Consts.cs index 545e407..03081e2 100644 --- a/src/Pole.Core/Consts.cs +++ b/src/Pole.Core/Consts.cs @@ -10,7 +10,5 @@ namespace Pole.Core public static ValueTask ValueTaskDone = new ValueTask(); public const string ConsumerRetryTimesStr = "pole-consumer-retry-times"; public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details"; - public const string EventHandlerMethodName = "EventHandler"; - public const string BatchEventsHandlerMethodName = "BatchEventsHandler"; } } diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs index ab593e5..0651532 100644 --- a/src/Pole.Core/EventBus/Bus.cs +++ b/src/Pole.Core/EventBus/Bus.cs @@ -1,4 +1,4 @@ -using Pole.Core.Abstraction; + using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventStorage; using Pole.Core.EventBus.Transaction; @@ -22,8 +22,7 @@ namespace Pole.Core.EventBus public IDbTransactionAdapter Transaction { get; set; } public IServiceProvider ServiceProvider { get; } - public BlockingCollection PrePublishEventBuffer { get; } = - new BlockingCollection(new ConcurrentQueue()); + public BlockingCollection PrePublishEventBuffer { get; } = new BlockingCollection(new ConcurrentQueue()); public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) { @@ -51,11 +50,11 @@ namespace Pole.Core.EventBus }; if (Transaction?.DbTransaction == null) { - var mediumMessage = await eventStorage.StoreMessage(eventEntity); + await eventStorage.StoreMessage(eventEntity); } else { - var mediumMessage = await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction); + await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction); } PrePublishEventBuffer.Add(eventEntity); diff --git a/src/Pole.Core/EventBus/Event/EventBase.cs b/src/Pole.Core/EventBus/Event/EventBase.cs deleted file mode 100644 index 4f5167f..0000000 --- a/src/Pole.Core/EventBus/Event/EventBase.cs +++ /dev/null @@ -1,31 +0,0 @@ -using Pole.Core.Utils; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.Event -{ - public class EventBase - { - public EventBase() { } - public EventBase(long version, long timestamp) - { - Version = version; - Timestamp = timestamp; - } - public long Version { get; set; } - public long Timestamp { get; set; } - public byte[] GetBytes() - { - using var ms = new PooledMemoryStream(); - ms.Write(BitConverter.GetBytes(Version)); - ms.Write(BitConverter.GetBytes(Timestamp)); - return ms.ToArray(); - } - public static EventBase FromBytes(byte[] bytes) - { - var bytesSpan = bytes.AsSpan(); - return new EventBase(BitConverter.ToInt64(bytesSpan), BitConverter.ToInt64(bytesSpan.Slice(sizeof(long)))); - } - } -} diff --git a/src/Pole.Core/EventBus/Event/FullyEvent.cs b/src/Pole.Core/EventBus/Event/FullyEvent.cs deleted file mode 100644 index aa0acef..0000000 --- a/src/Pole.Core/EventBus/Event/FullyEvent.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.Event -{ - public class FullyEvent - { - public IEvent Event { get; set; } - public EventBase Base { get; set; } - public PrimaryKey StateId { get; set; } - } -} diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs index d59c866..73f2f35 100644 --- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.Logging; using Orleans.Concurrency; -using Pole.Core.Abstraction; using Pole.Core.EventBus.Event; using Pole.Core.Serialization; using System; @@ -13,6 +12,7 @@ using System.Linq.Expressions; using System.Linq; using Pole.Core.Exceptions; using Orleans; +using Pole.Core.Utils.Abstraction; namespace Pole.Core.EventBus.EventHandler { @@ -52,7 +52,7 @@ namespace Pole.Core.EventBus.EventHandler if (this is IPoleEventHandler handler) { var result = handler.EventHandle((TEvent)eventObj); - logger.LogTrace($"{nameof(PoleEventHandler)} Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObj)); + logger.LogTrace($"{nameof(PoleEventHandler)} Invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObj)); return result; } else @@ -71,14 +71,14 @@ namespace Pole.Core.EventBus.EventHandler if (this is IPoleBulkEventsHandler batchHandler) { await batchHandler.BulkEventsHandle(eventObjs); - logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs)); + logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs)); return; } else if (this is IPoleEventHandler handler) { var handleTasks = eventObjs.Select(m => handler.EventHandle(m)); await Task.WhenAll(handleTasks); - logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs)); + logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs)); return; } else diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs index 6a28845..a0d3a4e 100644 --- a/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs +++ b/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs @@ -9,7 +9,6 @@ namespace Pole.Core.EventBus.EventStorage public interface IEventStorageInitializer { Task InitializeAsync(CancellationToken cancellationToken); - string GetTableName(); } } diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs index 8077914..05d3978 100644 --- a/src/Pole.Core/EventBus/ObserverUnit.cs +++ b/src/Pole.Core/EventBus/ObserverUnit.cs @@ -6,13 +6,13 @@ using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; -using Pole.Core.Abstraction; using System.Linq; using Pole.Core.EventBus.Event; using Orleans.Concurrency; using System.Collections.Concurrent; using System.Linq.Expressions; using Pole.Core.EventBus.EventHandler; +using Pole.Core.Utils.Abstraction; namespace Pole.Core.EventBus { diff --git a/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs b/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs deleted file mode 100644 index f0da534..0000000 --- a/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class BeginTxTimeoutException : Exception - { - public BeginTxTimeoutException(string stateId, long transactionId, Type type) : - base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs b/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs deleted file mode 100644 index e271283..0000000 --- a/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class ChannelUnavailabilityException : Exception - { - public ChannelUnavailabilityException(string id, Type grainType) : base($"Channel unavailability,type {grainType.FullName} with id {id}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs b/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs deleted file mode 100644 index b0fe921..0000000 --- a/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class EventBusRepeatBindingProducerException : Exception - { - public EventBusRepeatBindingProducerException(string name) : base(name) - { - } - } -} diff --git a/src/Pole.Core/Exceptions/EventIsClearedException.cs b/src/Pole.Core/Exceptions/EventIsClearedException.cs deleted file mode 100644 index c3137ad..0000000 --- a/src/Pole.Core/Exceptions/EventIsClearedException.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class EventIsClearedException : Exception - { - public EventIsClearedException(string eventType, string eventJsonString, long archiveIndex) : base($"eventType:{eventType},event:{eventJsonString},archive index:{archiveIndex}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs b/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs deleted file mode 100644 index 42262dd..0000000 --- a/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class EventVersionUnorderedException : Exception - { - public EventVersionUnorderedException(string id, Type type, long eventVersion, long stateVersion) : - base($"Event version and state version do not match of Grain type {type.FullName} and Id {id}.There state version are {stateVersion} and event version are {eventVersion}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs b/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs deleted file mode 100644 index e19b69e..0000000 --- a/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class ObserverNotCompletedException : Exception - { - public ObserverNotCompletedException(string typeName, string stateId) : base($"{typeName} with id={stateId}") - { - - } - } -} diff --git a/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs b/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs deleted file mode 100644 index 3d0182c..0000000 --- a/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class PrimaryKeyTypeException : Exception - { - public PrimaryKeyTypeException(string name) : base(name) - { - - } - } -} diff --git a/src/Pole.Core/Exceptions/RepeatedTxException.cs b/src/Pole.Core/Exceptions/RepeatedTxException.cs deleted file mode 100644 index ac7b83e..0000000 --- a/src/Pole.Core/Exceptions/RepeatedTxException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class RepeatedTxException : Exception - { - public RepeatedTxException(string stateId, long transactionId, Type type) : - base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs b/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs deleted file mode 100644 index 463b3ba..0000000 --- a/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class SnapshotNotSupportTxException : Exception - { - public SnapshotNotSupportTxException(Type type) : base(type.FullName) - { - } - } -} diff --git a/src/Pole.Core/Exceptions/StateInsecurityException.cs b/src/Pole.Core/Exceptions/StateInsecurityException.cs deleted file mode 100644 index 5f87ede..0000000 --- a/src/Pole.Core/Exceptions/StateInsecurityException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class StateInsecurityException : Exception - { - public StateInsecurityException(string id, Type grainType, long doingVersion, long stateVersion) : - base($"State insecurity of Grain type {grainType.FullName} and Id {id},Maybe because the previous event failed to execute.There state version are {stateVersion} and doing version are {doingVersion}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/StateIsOverException.cs b/src/Pole.Core/Exceptions/StateIsOverException.cs deleted file mode 100644 index 8e3d7e7..0000000 --- a/src/Pole.Core/Exceptions/StateIsOverException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class StateIsOverException : Exception - { - public StateIsOverException(string id, Type grainType) : - base($"State Is Over of Grain type {grainType.FullName} and Id {id}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/TxCommitException.cs b/src/Pole.Core/Exceptions/TxCommitException.cs deleted file mode 100644 index a8dba18..0000000 --- a/src/Pole.Core/Exceptions/TxCommitException.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class TxCommitException : Exception - { - } -} diff --git a/src/Pole.Core/Exceptions/TxIdException.cs b/src/Pole.Core/Exceptions/TxIdException.cs deleted file mode 100644 index cae8003..0000000 --- a/src/Pole.Core/Exceptions/TxIdException.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class TxIdException : Exception - { - } -} diff --git a/src/Pole.Core/Exceptions/TxSnapshotException.cs b/src/Pole.Core/Exceptions/TxSnapshotException.cs deleted file mode 100644 index 68c995c..0000000 --- a/src/Pole.Core/Exceptions/TxSnapshotException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class TxSnapshotException : Exception - { - public TxSnapshotException(string stateId, long snapshotVersion, long backupSnapshotVersion) : - base($"StateId {stateId} and snapshot version {snapshotVersion} and backup snapshot version {backupSnapshotVersion}") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs deleted file mode 100644 index 51f2bf1..0000000 --- a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class UnmatchObserverUnitException : Exception - { - public UnmatchObserverUnitException(string unitName) : base($"{unitName} do not match") - { - } - } -} diff --git a/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs b/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs deleted file mode 100644 index e2b3f1c..0000000 --- a/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class UnfindEventHandlerException : Exception - { - public UnfindEventHandlerException(Type eventType) : base(eventType.FullName) - { - - } - } -} diff --git a/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs b/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs deleted file mode 100644 index bdcd289..0000000 --- a/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class UnfindSnapshotHandlerException : Exception - { - public UnfindSnapshotHandlerException(Type grainType) : base(grainType.FullName) - { - - } - } -} diff --git a/src/Pole.Core/Exceptions/UnopenedTransactionException.cs b/src/Pole.Core/Exceptions/UnopenedTransactionException.cs deleted file mode 100644 index c139a0d..0000000 --- a/src/Pole.Core/Exceptions/UnopenedTransactionException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; - -namespace Pole.Core.Exceptions -{ - public class UnopenedTransactionException : Exception - { - public UnopenedTransactionException(string id, Type grainType, string methodName) : - base($"Unopened transaction, cannot be invoke {methodName},type {grainType.FullName} with id {id}") - { - } - } -} diff --git a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs index 78c601c..eb184f5 100644 --- a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs +++ b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.DependencyInjection; using Pole.Core; -using Pole.Core.Abstraction; using Pole.Core.Channels; using Pole.Core.EventBus; using Pole.Core.Processor; diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs index 9173461..d148170 100644 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using Pole.Core.Abstraction; using Pole.Core.EventBus; using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventStorage; diff --git a/src/Pole.Core/Serialization/EventTypeFinder.cs b/src/Pole.Core/Serialization/EventTypeFinder.cs index beb82cd..440f1dd 100644 --- a/src/Pole.Core/Serialization/EventTypeFinder.cs +++ b/src/Pole.Core/Serialization/EventTypeFinder.cs @@ -1,8 +1,8 @@ using Microsoft.Extensions.Logging; -using Pole.Core.Abstraction; using Pole.Core.EventBus.Event; using Pole.Core.Exceptions; using Pole.Core.Utils; +using Pole.Core.Utils.Abstraction; using System; using System.Collections.Concurrent; using System.Collections.Generic; diff --git a/src/Pole.Core/Services/Abstraction/IHoldLock.cs b/src/Pole.Core/Services/Abstraction/IHoldLock.cs deleted file mode 100644 index b170201..0000000 --- a/src/Pole.Core/Services/Abstraction/IHoldLock.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Threading.Tasks; -using Orleans; - -namespace Pole.Core.Services -{ - public interface IHoldLock : IGrainWithStringKey - { - Task<(bool isOk, long lockId)> Lock(int holdingSeconds =30); - Task Hold(long lockId, int holdingSeconds = 30); - Task Unlock(long lockId); - } -} diff --git a/src/Pole.Core/Services/Abstraction/ILocalUID.cs b/src/Pole.Core/Services/Abstraction/ILocalUID.cs deleted file mode 100644 index 08191e7..0000000 --- a/src/Pole.Core/Services/Abstraction/ILocalUID.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; - -namespace Pole.Core.Services -{ - public interface ILocalUID : IGrainWithStringKey - { - /// - /// 通过utc时间生成分布式唯一id - /// - [AlwaysInterleave] - Task NewID(); - } -} diff --git a/src/Pole.Core/Services/Abstraction/ILock.cs b/src/Pole.Core/Services/Abstraction/ILock.cs deleted file mode 100644 index 67264b3..0000000 --- a/src/Pole.Core/Services/Abstraction/ILock.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Threading.Tasks; -using Orleans; - -namespace Pole.Core.Services -{ - public interface ILock : IGrainWithStringKey - { - Task Lock(int millisecondsDelay = 0); - Task Unlock(); - } -} diff --git a/src/Pole.Core/Services/Abstraction/IUtcUID.cs b/src/Pole.Core/Services/Abstraction/IUtcUID.cs deleted file mode 100644 index 7b56d03..0000000 --- a/src/Pole.Core/Services/Abstraction/IUtcUID.cs +++ /dev/null @@ -1,15 +0,0 @@ -using Orleans; -using Orleans.Concurrency; -using System.Threading.Tasks; - -namespace Pole.Core.Services -{ - public interface IUtcUID : IGrainWithStringKey - { - /// - /// 通过utc时间生成分布式唯一id - /// - [AlwaysInterleave] - Task NewID(); - } -} diff --git a/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs b/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs deleted file mode 100644 index 8644a55..0000000 --- a/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.Threading.Tasks; -using Orleans; - - -namespace Pole.Core.Services -{ - public interface IWeightHoldLock : IGrainWithStringKey - { - Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30); - Task Hold(long lockId, int holdingSeconds = 30); - Task Unlock(long lockId); - } -} diff --git a/src/Pole.Core/Services/HoldLockGrain.cs b/src/Pole.Core/Services/HoldLockGrain.cs deleted file mode 100644 index 4cab2d2..0000000 --- a/src/Pole.Core/Services/HoldLockGrain.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using System.Threading.Tasks; -using Orleans; - -namespace Pole.Core.Services -{ - public class HoldLockGrain : Grain, IHoldLock - { - long lockId = 0; - long expireTime = 0; - public Task<(bool isOk, long lockId)> Lock(int holdingSeconds = 30) - { - var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - if (lockId == 0 || now > expireTime) - { - lockId = now; - expireTime = now + holdingSeconds * 1000; - return Task.FromResult((true, now)); - } - else - { - return Task.FromResult((false, (long)0)); - } - } - public Task Hold(long lockId, int holdingSeconds = 30) - { - if (this.lockId == lockId) - { - expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000; - return Task.FromResult(true); - } - else - { - return Task.FromResult(false); - } - } - - public Task Unlock(long lockId) - { - if (this.lockId == lockId) - { - this.lockId = 0; - expireTime = 0; - } - return Task.CompletedTask; - } - } -} diff --git a/src/Pole.Core/Services/LocalUIDGrain.cs b/src/Pole.Core/Services/LocalUIDGrain.cs deleted file mode 100644 index 9408031..0000000 --- a/src/Pole.Core/Services/LocalUIDGrain.cs +++ /dev/null @@ -1,59 +0,0 @@ -using Orleans; -using Orleans.Concurrency; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.Services -{ - [Reentrant] - public class LocalUIDGrain : Grain, ILocalUID - { - int start_id = 1; - string start_string; - long start_long; - const int length = 19; - public LocalUIDGrain() - { - start_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss"); - start_long = long.Parse(start_string); ; - } - public Task NewID() - { - return Task.FromResult(GenerateUtcId()); - string GenerateUtcId() - { - var now_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss"); - var now_Long = long.Parse(now_string); - if (now_Long > start_long) - { - Interlocked.Exchange(ref start_string, now_string); - Interlocked.Exchange(ref start_long, now_Long); - Interlocked.Exchange(ref start_id, 0); - } - var builder = new Span(new char[length]); - var newTimes = Interlocked.Increment(ref start_id); - if (newTimes <= 99999) - { - start_string.AsSpan().CopyTo(builder); - - var timesString = newTimes.ToString(); - for (int i = start_string.Length; i < length - timesString.Length; i++) - { - builder[i] = '0'; - } - var span = length - timesString.Length; - for (int i = span; i < length; i++) - { - builder[i] = timesString[i - span]; - } - return builder.ToString(); - } - else - { - return GenerateUtcId(); - } - } - } - } -} diff --git a/src/Pole.Core/Services/LockGrain.cs b/src/Pole.Core/Services/LockGrain.cs deleted file mode 100644 index 0e9206b..0000000 --- a/src/Pole.Core/Services/LockGrain.cs +++ /dev/null @@ -1,42 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; - -namespace Pole.Core.Services -{ - [Reentrant] - public class LockGrain : Grain, ILock - { - int locked = 0; - TaskCompletionSource taskSource; - public async Task Lock(int millisecondsDelay = 0) - { - if (locked == 0) - { - locked = 1; - return true; - } - else - { - taskSource = new TaskCompletionSource(); - if (millisecondsDelay != 0) - { - using var tc = new CancellationTokenSource(millisecondsDelay); - tc.Token.Register(() => - { - taskSource.TrySetCanceled(); - }); - } - return await taskSource.Task; - } - } - - public Task Unlock() - { - locked = 0; - taskSource.TrySetResult(true); - return Task.CompletedTask; - } - } -} diff --git a/src/Pole.Core/Services/UtcUIDGrain.cs b/src/Pole.Core/Services/UtcUIDGrain.cs deleted file mode 100644 index 638496c..0000000 --- a/src/Pole.Core/Services/UtcUIDGrain.cs +++ /dev/null @@ -1,59 +0,0 @@ -using Orleans; -using Orleans.Concurrency; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.Services -{ - [Reentrant] - public class UtcUIDGrain : Grain, IUtcUID - { - int start_id = 1; - string start_string; - long start_long; - const int length = 19; - public UtcUIDGrain() - { - start_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss"); - start_long = long.Parse(start_string); - } - public Task NewID() - { - return Task.FromResult(GenerateUtcId()); - string GenerateUtcId() - { - var now_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss"); - var now_Long = long.Parse(now_string); - if (now_Long > start_long) - { - Interlocked.Exchange(ref start_string, now_string); - Interlocked.Exchange(ref start_long, now_Long); - Interlocked.Exchange(ref start_id, 0); - } - var builder = new Span(new char[length]); - var newTimes = Interlocked.Increment(ref start_id); - if (newTimes <= 99999) - { - start_string.AsSpan().CopyTo(builder); - - var timesString = newTimes.ToString(); - for (int i = start_string.Length; i < length - timesString.Length; i++) - { - builder[i] = '0'; - } - var span = length - timesString.Length; - for (int i = span; i < length; i++) - { - builder[i] = timesString[i - span]; - } - return builder.ToString(); - } - else - { - return GenerateUtcId(); - } - } - } - } -} diff --git a/src/Pole.Core/Services/WeightHoldLock.cs b/src/Pole.Core/Services/WeightHoldLock.cs deleted file mode 100644 index 0d68149..0000000 --- a/src/Pole.Core/Services/WeightHoldLock.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Threading.Tasks; -using Orleans; - -namespace Pole.Core.Services -{ - public class WeightHoldLock : Grain, IWeightHoldLock - { - long lockId = 0; - long expireTime = 0; - int currentWeight = 0; - int maxWaitWeight = -1; - - public Task Hold(long lockId, int holdingSeconds = 30) - { - if (this.lockId == lockId && currentWeight >= maxWaitWeight) - { - expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000; - return Task.FromResult(true); - } - else - { - return Task.FromResult(false); - } - } - - public Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30) - { - var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - if (lockId == 0 || now > expireTime) - { - lockId = now; - currentWeight = weight; - maxWaitWeight = -1; - expireTime = now + holdingSeconds * 1000; - return Task.FromResult((true, now, 0)); - } - if (weight >= maxWaitWeight && weight > currentWeight) - { - maxWaitWeight = weight; - return Task.FromResult((false, (long)0, (int)(expireTime - now))); - } - return Task.FromResult((false, (long)0, 0)); - } - - public Task Unlock(long lockId) - { - if (this.lockId == lockId) - { - this.lockId = 0; - currentWeight = 0; - expireTime = 0; - } - return Task.CompletedTask; - } - } -} diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs index e50d6c8..f54101e 100644 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -8,11 +8,11 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; -using Pole.Core.Abstraction; using Pole.Core.Serialization; using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventStorage; using Microsoft.Extensions.Options; +using Pole.Core.Utils.Abstraction; namespace Pole.Core.UnitOfWork { diff --git a/src/Pole.Core/Abstraction/IEventTypeFinder.cs b/src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs similarity index 90% rename from src/Pole.Core/Abstraction/IEventTypeFinder.cs rename to src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs index 7c8084c..9f370fa 100644 --- a/src/Pole.Core/Abstraction/IEventTypeFinder.cs +++ b/src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Text; -namespace Pole.Core.Abstraction +namespace Pole.Core.Utils.Abstraction { public interface IEventTypeFinder { diff --git a/src/Pole.Core/Utils/SnowflakeIdGenerator.cs b/src/Pole.Core/Utils/SnowflakeIdGenerator.cs index ea6bfa8..bac8a2a 100644 --- a/src/Pole.Core/Utils/SnowflakeIdGenerator.cs +++ b/src/Pole.Core/Utils/SnowflakeIdGenerator.cs @@ -1,5 +1,4 @@ -using Pole.Core.Abstraction; -using System; +using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.EventBus.Rabbitmq/Attributes/ProducerAttribute.cs b/src/Pole.EventBus.Rabbitmq/Attributes/ProducerAttribute.cs deleted file mode 100644 index ea16731..0000000 --- a/src/Pole.EventBus.Rabbitmq/Attributes/ProducerAttribute.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System; - -namespace Pole.EventBus.RabbitMQ -{ - [AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] - public class ProducerAttribute : Attribute - { - public ProducerAttribute(string exchange = null, string routePrefix = null, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false) - { - Exchange = exchange; - RoutePrefix = routePrefix; - LBCount = lBCount; - AutoAck = autoAck; - Reenqueue = reenqueue; - Persistent = persistent; - } - public string Exchange { get; } - public string RoutePrefix { get; } - public int LBCount { get; } - public bool AutoAck { get; set; } - public bool Reenqueue { get; set; } - public bool Persistent { get; set; } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index 6b4282a..5c4f72c 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -13,7 +13,6 @@ using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventHandler; using Microsoft.Extensions.Options; using System.Linq; -using Pole.Core.Abstraction; namespace Pole.EventBus.RabbitMQ { @@ -39,7 +38,6 @@ namespace Pole.EventBus.RabbitMQ } public async Task AutoRegister() { - var observableList = new List<(Type type, ProducerAttribute config)>(); var eventList = new List<(Type type, EventAttribute config)>(); var evenHandlertList = new List<(Type type, EventHandlerAttribute config)>(); AddEventAndEventHandlerInfoList(eventList, evenHandlertList); diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs index 506ad03..c0c6c84 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs @@ -1,5 +1,4 @@ using Microsoft.EntityFrameworkCore.Storage; -using Pole.Core.Abstraction; using Pole.Core.EventBus; using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventStorage;