Commit 2c98d157 by dingsongjie

精简代码

parent ef9a3f0e
Showing with 13 additions and 639 deletions
using System.Threading.Tasks;
namespace Pole.Core.Channels
{
public class AsyncInputEvent<Input, Output>
{
public AsyncInputEvent(Input data)
{
Value = data;
}
public TaskCompletionSource<Output> TaskSource { get; } = new TaskCompletionSource<Output>();
public Input Value { get; set; }
}
}
......@@ -10,7 +10,5 @@ namespace Pole.Core
public static ValueTask ValueTaskDone = new ValueTask();
public const string ConsumerRetryTimesStr = "pole-consumer-retry-times";
public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details";
public const string EventHandlerMethodName = "EventHandler";
public const string BatchEventsHandlerMethodName = "BatchEventsHandler";
}
}
using Pole.Core.Abstraction;

using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
......@@ -22,8 +22,7 @@ namespace Pole.Core.EventBus
public IDbTransactionAdapter Transaction { get; set; }
public IServiceProvider ServiceProvider { get; }
public BlockingCollection<EventEntity> PrePublishEventBuffer { get; } =
new BlockingCollection<EventEntity>(new ConcurrentQueue<EventEntity>());
public BlockingCollection<EventEntity> PrePublishEventBuffer { get; } = new BlockingCollection<EventEntity>(new ConcurrentQueue<EventEntity>());
public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage)
{
......@@ -51,11 +50,11 @@ namespace Pole.Core.EventBus
};
if (Transaction?.DbTransaction == null)
{
var mediumMessage = await eventStorage.StoreMessage(eventEntity);
await eventStorage.StoreMessage(eventEntity);
}
else
{
var mediumMessage = await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction);
await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction);
}
PrePublishEventBuffer.Add(eventEntity);
......
using Pole.Core.Utils;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
public class EventBase
{
public EventBase() { }
public EventBase(long version, long timestamp)
{
Version = version;
Timestamp = timestamp;
}
public long Version { get; set; }
public long Timestamp { get; set; }
public byte[] GetBytes()
{
using var ms = new PooledMemoryStream();
ms.Write(BitConverter.GetBytes(Version));
ms.Write(BitConverter.GetBytes(Timestamp));
return ms.ToArray();
}
public static EventBase FromBytes(byte[] bytes)
{
var bytesSpan = bytes.AsSpan();
return new EventBase(BitConverter.ToInt64(bytesSpan), BitConverter.ToInt64(bytesSpan.Slice(sizeof(long))));
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
public class FullyEvent<PrimaryKey>
{
public IEvent Event { get; set; }
public EventBase Base { get; set; }
public PrimaryKey StateId { get; set; }
}
}
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.Serialization;
using System;
......@@ -13,6 +12,7 @@ using System.Linq.Expressions;
using System.Linq;
using Pole.Core.Exceptions;
using Orleans;
using Pole.Core.Utils.Abstraction;
namespace Pole.Core.EventBus.EventHandler
{
......@@ -52,7 +52,7 @@ namespace Pole.Core.EventBus.EventHandler
if (this is IPoleEventHandler<TEvent> handler)
{
var result = handler.EventHandle((TEvent)eventObj);
logger.LogTrace($"{nameof(PoleEventHandler<TEvent>)} Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObj));
logger.LogTrace($"{nameof(PoleEventHandler<TEvent>)} Invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObj));
return result;
}
else
......@@ -71,14 +71,14 @@ namespace Pole.Core.EventBus.EventHandler
if (this is IPoleBulkEventsHandler<TEvent> batchHandler)
{
await batchHandler.BulkEventsHandle(eventObjs);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs));
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs));
return;
}
else if (this is IPoleEventHandler<TEvent> handler)
{
var handleTasks = eventObjs.Select(m => handler.EventHandle(m));
await Task.WhenAll(handleTasks);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(eventObjs));
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs));
return;
}
else
......
......@@ -9,7 +9,6 @@ namespace Pole.Core.EventBus.EventStorage
public interface IEventStorageInitializer
{
Task InitializeAsync(CancellationToken cancellationToken);
string GetTableName();
}
}
......@@ -6,13 +6,13 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Abstraction;
using System.Linq;
using Pole.Core.EventBus.Event;
using Orleans.Concurrency;
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Pole.Core.EventBus.EventHandler;
using Pole.Core.Utils.Abstraction;
namespace Pole.Core.EventBus
{
......
using System;
namespace Pole.Core.Exceptions
{
public class BeginTxTimeoutException : Exception
{
public BeginTxTimeoutException(string stateId, long transactionId, Type type) :
base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class ChannelUnavailabilityException : Exception
{
public ChannelUnavailabilityException(string id, Type grainType) : base($"Channel unavailability,type {grainType.FullName} with id {id}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class EventBusRepeatBindingProducerException : Exception
{
public EventBusRepeatBindingProducerException(string name) : base(name)
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class EventIsClearedException : Exception
{
public EventIsClearedException(string eventType, string eventJsonString, long archiveIndex) : base($"eventType:{eventType},event:{eventJsonString},archive index:{archiveIndex}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class EventVersionUnorderedException : Exception
{
public EventVersionUnorderedException(string id, Type type, long eventVersion, long stateVersion) :
base($"Event version and state version do not match of Grain type {type.FullName} and Id {id}.There state version are {stateVersion} and event version are {eventVersion}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class ObserverNotCompletedException : Exception
{
public ObserverNotCompletedException(string typeName, string stateId) : base($"{typeName} with id={stateId}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class PrimaryKeyTypeException : Exception
{
public PrimaryKeyTypeException(string name) : base(name)
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class RepeatedTxException : Exception
{
public RepeatedTxException(string stateId, long transactionId, Type type) :
base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class SnapshotNotSupportTxException : Exception
{
public SnapshotNotSupportTxException(Type type) : base(type.FullName)
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class StateInsecurityException : Exception
{
public StateInsecurityException(string id, Type grainType, long doingVersion, long stateVersion) :
base($"State insecurity of Grain type {grainType.FullName} and Id {id},Maybe because the previous event failed to execute.There state version are {stateVersion} and doing version are {doingVersion}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class StateIsOverException : Exception
{
public StateIsOverException(string id, Type grainType) :
base($"State Is Over of Grain type {grainType.FullName} and Id {id}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class TxCommitException : Exception
{
}
}
using System;
namespace Pole.Core.Exceptions
{
public class TxIdException : Exception
{
}
}
using System;
namespace Pole.Core.Exceptions
{
public class TxSnapshotException : Exception
{
public TxSnapshotException(string stateId, long snapshotVersion, long backupSnapshotVersion) :
base($"StateId {stateId} and snapshot version {snapshotVersion} and backup snapshot version {backupSnapshotVersion}")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class UnmatchObserverUnitException : Exception
{
public UnmatchObserverUnitException(string unitName) : base($"{unitName} do not match")
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class UnfindEventHandlerException : Exception
{
public UnfindEventHandlerException(Type eventType) : base(eventType.FullName)
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class UnfindSnapshotHandlerException : Exception
{
public UnfindSnapshotHandlerException(Type grainType) : base(grainType.FullName)
{
}
}
}
using System;
namespace Pole.Core.Exceptions
{
public class UnopenedTransactionException : Exception
{
public UnopenedTransactionException(string id, Type grainType, string methodName) :
base($"Unopened transaction, cannot be invoke {methodName},type {grainType.FullName} with id {id}")
{
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Abstraction;
using Pole.Core.Channels;
using Pole.Core.EventBus;
using Pole.Core.Processor;
......
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.Abstraction;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
......
using Microsoft.Extensions.Logging;
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using Pole.Core.Utils.Abstraction;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
......
using System.Threading.Tasks;
using Orleans;
namespace Pole.Core.Services
{
public interface IHoldLock : IGrainWithStringKey
{
Task<(bool isOk, long lockId)> Lock(int holdingSeconds =30);
Task<bool> Hold(long lockId, int holdingSeconds = 30);
Task Unlock(long lockId);
}
}
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Pole.Core.Services
{
public interface ILocalUID : IGrainWithStringKey
{
/// <summary>
/// 通过utc时间生成分布式唯一id
/// </summary>
[AlwaysInterleave]
Task<string> NewID();
}
}
using System.Threading.Tasks;
using Orleans;
namespace Pole.Core.Services
{
public interface ILock : IGrainWithStringKey
{
Task<bool> Lock(int millisecondsDelay = 0);
Task Unlock();
}
}
using Orleans;
using Orleans.Concurrency;
using System.Threading.Tasks;
namespace Pole.Core.Services
{
public interface IUtcUID : IGrainWithStringKey
{
/// <summary>
/// 通过utc时间生成分布式唯一id
/// </summary>
[AlwaysInterleave]
Task<string> NewID();
}
}
using System.Threading.Tasks;
using Orleans;
namespace Pole.Core.Services
{
public interface IWeightHoldLock : IGrainWithStringKey
{
Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30);
Task<bool> Hold(long lockId, int holdingSeconds = 30);
Task Unlock(long lockId);
}
}
using System;
using System.Threading.Tasks;
using Orleans;
namespace Pole.Core.Services
{
public class HoldLockGrain : Grain, IHoldLock
{
long lockId = 0;
long expireTime = 0;
public Task<(bool isOk, long lockId)> Lock(int holdingSeconds = 30)
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (lockId == 0 || now > expireTime)
{
lockId = now;
expireTime = now + holdingSeconds * 1000;
return Task.FromResult((true, now));
}
else
{
return Task.FromResult((false, (long)0));
}
}
public Task<bool> Hold(long lockId, int holdingSeconds = 30)
{
if (this.lockId == lockId)
{
expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000;
return Task.FromResult(true);
}
else
{
return Task.FromResult(false);
}
}
public Task Unlock(long lockId)
{
if (this.lockId == lockId)
{
this.lockId = 0;
expireTime = 0;
}
return Task.CompletedTask;
}
}
}
using Orleans;
using Orleans.Concurrency;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.Services
{
[Reentrant]
public class LocalUIDGrain : Grain, ILocalUID
{
int start_id = 1;
string start_string;
long start_long;
const int length = 19;
public LocalUIDGrain()
{
start_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss");
start_long = long.Parse(start_string); ;
}
public Task<string> NewID()
{
return Task.FromResult(GenerateUtcId());
string GenerateUtcId()
{
var now_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss");
var now_Long = long.Parse(now_string);
if (now_Long > start_long)
{
Interlocked.Exchange(ref start_string, now_string);
Interlocked.Exchange(ref start_long, now_Long);
Interlocked.Exchange(ref start_id, 0);
}
var builder = new Span<char>(new char[length]);
var newTimes = Interlocked.Increment(ref start_id);
if (newTimes <= 99999)
{
start_string.AsSpan().CopyTo(builder);
var timesString = newTimes.ToString();
for (int i = start_string.Length; i < length - timesString.Length; i++)
{
builder[i] = '0';
}
var span = length - timesString.Length;
for (int i = span; i < length; i++)
{
builder[i] = timesString[i - span];
}
return builder.ToString();
}
else
{
return GenerateUtcId();
}
}
}
}
}
using System.Threading;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Pole.Core.Services
{
[Reentrant]
public class LockGrain : Grain, ILock
{
int locked = 0;
TaskCompletionSource<bool> taskSource;
public async Task<bool> Lock(int millisecondsDelay = 0)
{
if (locked == 0)
{
locked = 1;
return true;
}
else
{
taskSource = new TaskCompletionSource<bool>();
if (millisecondsDelay != 0)
{
using var tc = new CancellationTokenSource(millisecondsDelay);
tc.Token.Register(() =>
{
taskSource.TrySetCanceled();
});
}
return await taskSource.Task;
}
}
public Task Unlock()
{
locked = 0;
taskSource.TrySetResult(true);
return Task.CompletedTask;
}
}
}
using Orleans;
using Orleans.Concurrency;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.Services
{
[Reentrant]
public class UtcUIDGrain : Grain, IUtcUID
{
int start_id = 1;
string start_string;
long start_long;
const int length = 19;
public UtcUIDGrain()
{
start_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss");
start_long = long.Parse(start_string);
}
public Task<string> NewID()
{
return Task.FromResult(GenerateUtcId());
string GenerateUtcId()
{
var now_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss");
var now_Long = long.Parse(now_string);
if (now_Long > start_long)
{
Interlocked.Exchange(ref start_string, now_string);
Interlocked.Exchange(ref start_long, now_Long);
Interlocked.Exchange(ref start_id, 0);
}
var builder = new Span<char>(new char[length]);
var newTimes = Interlocked.Increment(ref start_id);
if (newTimes <= 99999)
{
start_string.AsSpan().CopyTo(builder);
var timesString = newTimes.ToString();
for (int i = start_string.Length; i < length - timesString.Length; i++)
{
builder[i] = '0';
}
var span = length - timesString.Length;
for (int i = span; i < length; i++)
{
builder[i] = timesString[i - span];
}
return builder.ToString();
}
else
{
return GenerateUtcId();
}
}
}
}
}
using System;
using System.Threading.Tasks;
using Orleans;
namespace Pole.Core.Services
{
public class WeightHoldLock : Grain, IWeightHoldLock
{
long lockId = 0;
long expireTime = 0;
int currentWeight = 0;
int maxWaitWeight = -1;
public Task<bool> Hold(long lockId, int holdingSeconds = 30)
{
if (this.lockId == lockId && currentWeight >= maxWaitWeight)
{
expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000;
return Task.FromResult(true);
}
else
{
return Task.FromResult(false);
}
}
public Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30)
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (lockId == 0 || now > expireTime)
{
lockId = now;
currentWeight = weight;
maxWaitWeight = -1;
expireTime = now + holdingSeconds * 1000;
return Task.FromResult((true, now, 0));
}
if (weight >= maxWaitWeight && weight > currentWeight)
{
maxWaitWeight = weight;
return Task.FromResult((false, (long)0, (int)(expireTime - now)));
}
return Task.FromResult((false, (long)0, 0));
}
public Task Unlock(long lockId)
{
if (this.lockId == lockId)
{
this.lockId = 0;
currentWeight = 0;
expireTime = 0;
}
return Task.CompletedTask;
}
}
}
......@@ -8,11 +8,11 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Abstraction;
using Pole.Core.Serialization;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Microsoft.Extensions.Options;
using Pole.Core.Utils.Abstraction;
namespace Pole.Core.UnitOfWork
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Abstraction
namespace Pole.Core.Utils.Abstraction
{
public interface IEventTypeFinder
{
......
using Pole.Core.Abstraction;
using System;
using System;
using System.Collections.Generic;
using System.Text;
......
using System;
namespace Pole.EventBus.RabbitMQ
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class ProducerAttribute : Attribute
{
public ProducerAttribute(string exchange = null, string routePrefix = null, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false)
{
Exchange = exchange;
RoutePrefix = routePrefix;
LBCount = lBCount;
AutoAck = autoAck;
Reenqueue = reenqueue;
Persistent = persistent;
}
public string Exchange { get; }
public string RoutePrefix { get; }
public int LBCount { get; }
public bool AutoAck { get; set; }
public bool Reenqueue { get; set; }
public bool Persistent { get; set; }
}
}
......@@ -13,7 +13,6 @@ using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler;
using Microsoft.Extensions.Options;
using System.Linq;
using Pole.Core.Abstraction;
namespace Pole.EventBus.RabbitMQ
{
......@@ -39,7 +38,6 @@ namespace Pole.EventBus.RabbitMQ
}
public async Task AutoRegister()
{
var observableList = new List<(Type type, ProducerAttribute config)>();
var eventList = new List<(Type type, EventAttribute config)>();
var evenHandlertList = new List<(Type type, EventHandlerAttribute config)>();
AddEventAndEventHandlerInfoList(eventList, evenHandlertList);
......
using Microsoft.EntityFrameworkCore.Storage;
using Pole.Core.Abstraction;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment