Commit 51b2b230 by dingsongjie

fix bug

parent 559380c1
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
namespace Pole.Core.Utils
{
public class ConsistentHash
{
readonly SortedDictionary<int, string> circle = new SortedDictionary<int, string>();
int _replicate = 200; //default _replicate count
int[] ayKeys = null; //cache the ordered keys for better performance
//it's better you override the GetHashCode() of T.
//we will use GetHashCode() to identify different node.
public ConsistentHash(IEnumerable<string> nodes)
{
Init(nodes, _replicate);
}
public ConsistentHash(IEnumerable<string> nodes, int replicate)
{
Init(nodes, replicate);
}
private void Init(IEnumerable<string> nodes, int replicate)
{
_replicate = replicate;
foreach (string node in nodes)
{
Add(node, false);
}
ayKeys = circle.Keys.ToArray();
}
public void Add(string node)
{
Add(node, true);
}
public void Add(string node, bool updateKeyArray)
{
for (int i = 0; i < _replicate; i++)
{
int hash = BetterHash(node + i);
circle[hash] = node;
}
if (updateKeyArray)
{
ayKeys = circle.Keys.ToArray();
}
}
public void Remove(string node)
{
for (int i = 0; i < _replicate; i++)
{
int hash = BetterHash(node + i);
if (!circle.Remove(hash))
{
throw new Exception("can not remove a node that not added");
}
}
ayKeys = circle.Keys.ToArray();
}
public string GetNode(string key)
{
int first = First_ge(ayKeys, BetterHash(key));
return circle[ayKeys[first]];
}
//return the index of first item that >= val.
//if not exist, return 0;
//ay should be ordered arPole.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int First_ge(int[] ay, int val)
{
int begin = 0;
int end = ay.Length - 1;
if (ay[end] < val || ay[0] > val)
{
return 0;
}
while (end - begin > 1)
{
int mid = (end + begin) / 2;
if (ay[mid] >= val)
{
end = mid;
}
else
{
begin = mid;
}
}
if (ay[begin] > val || ay[end] < val)
{
throw new Exception("should not happen");
}
return end;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int BetterHash(string key)
{
return (int)MurmurHash2.Hash(Encoding.UTF8.GetBytes(key));
}
}
}
......@@ -19,6 +19,7 @@ namespace Pole.EventBus.RabbitMQ
readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel;
readonly ISerializer serializer;
readonly RabbitOptions rabbitOptions;
List<ulong> errorMessageDeliveryTags = new List<ulong>();
public ConsumerRunner(
IRabbitMQClient client,
IServiceProvider provider,
......@@ -93,7 +94,17 @@ namespace Pole.EventBus.RabbitMQ
}
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
if (errorMessageDeliveryTags.Count == 0)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
else
{
list.ForEach(m =>
{
Model.Model.BasicAck(m.DeliveryTag, false);
});
}
}
}
}
......@@ -121,8 +132,11 @@ namespace Pole.EventBus.RabbitMQ
private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception)
{
// todo 这里需要添加断路器 防止超量的 Task.Delay
if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj))
{
errorMessageDeliveryTags.Add(ea.DeliveryTag);
var retryTimesStr = Encoding.UTF8.GetString((byte[])retryTimesObj);
var retryTimes = Convert.ToInt32(retryTimesStr);
if (retryTimes < Consumer.Config.MaxReenqueueTimes)
......@@ -135,6 +149,8 @@ namespace Pole.EventBus.RabbitMQ
using var channel = Client.PullChannel();
channel.Publish(ea.Body, ea.BasicProperties.Headers, Queue.Queue, string.Empty, true);
Model.Model.BasicAck(ea.DeliveryTag, false);
errorMessageDeliveryTags.Remove(ea.DeliveryTag);
});
}
else
......
......@@ -13,7 +13,6 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Application\Pole.Application.csproj" />
<ProjectReference Include="..\Pole.Domain\Pole.Domain.csproj" />
</ItemGroup>
</Project>
......@@ -6,6 +6,7 @@ using Pole.Samples.Backet.Api.Benchmarks;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Samples.Backet.Api
......@@ -18,13 +19,31 @@ namespace Pole.Samples.Backet.Api
//await grainWithEntityframeworkCoreAndPgTest.SingleOrDefaultAsync();
//Summary summary = BenchmarkRunner.Run<GrainWithEntityframeworkCoreAndPgTest>();
//Console.ReadLine();
using ( var connection = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"))
//using ( var connection = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"))
//{
// var uploader = new Pole.EventStorage.PostgreSql.PoleNpgsqlBulkUploader(connection);
// var events = new List<EventEntity>();
// events.Add(new EventEntity { Id = "111", Retries = 20, ExpiresAt = DateTime.Now, StatusName = "333" });
// await uploader.UpdateAsync("\"pole\".\"Events\"", events);
//}
// Queue the task.
for (var i = 0; i < 100; i++)
{
var uploader = new Pole.EventStorage.PostgreSql.PoleNpgsqlBulkUploader(connection);
var events = new List<EventEntity>();
events.Add(new EventEntity { Id = "111", Retries = 20, ExpiresAt = DateTime.Now, StatusName = "333" });
await uploader.UpdateAsync("\"pole\".\"Events\"", events);
ThreadPool.QueueUserWorkItem(ThreadProc, i);
}
Console.WriteLine("Main thread does some work, then sleeps.");
Thread.Sleep(1000);
Console.WriteLine("Main thread exits.");
}
// This thread procedure performs the task.
static void ThreadProc(Object stateInfo)
{
var i = Convert.ToInt32(stateInfo);
// No state object was passed to QueueUserWorkItem, so stateInfo is null.
Console.WriteLine($"Hello from the thread pool.{i}");
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment