Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.Core
  • Channels
  • MpscChannel.cs
Find file
BlameHistoryPermalink
  • 丁松杰's avatar
    完成 发送者 发送确认 以及部分 消费者 重试及错误队列的功能 · 8fc078fb
    丁松杰 committed 5 years ago
    8fc078fb
MpscChannel.cs 5.61 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Pole.Core.Channels
{
    /// <summary>
    /// multi producter single consumer channel
    /// </summary>
    /// <typeparam name="T">data type produced by producer</typeparam>
    public class MpscChannel<T> : IMpscChannel<T>
    {
        readonly BufferBlock<T> buffer = new BufferBlock<T>();
        private Func<List<T>, Task> consumer;
        readonly List<IBaseMpscChannel> consumerSequence = new List<IBaseMpscChannel>();
        private Task<bool> waitToReadTask;
        readonly ILogger logger;
        /// <summary>
        /// 是否在自动消费中
        /// </summary>
        private int _autoConsuming = 0;
        public MpscChannel(ILogger<MpscChannel<T>> logger, IOptions<ChannelOptions> options)
        {
            this.logger = logger;
            MaxBatchSize = options.Value.MaxBatchSize;
            MaxMillisecondsDelay = options.Value.MaxMillisecondsDelay;
        }
        /// <summary>
        /// 批量数据处理每次处理的最大数据量
        /// </summary>
        public int MaxBatchSize { get; set; }
        /// <summary>
        /// 批量数据接收的最大延时
        /// </summary>
        public int MaxMillisecondsDelay { get; set; }
        public bool IsComplete { get; private set; }
        public bool IsChildren { get; set; }

        public void BindConsumer(Func<List<T>, Task> consumer)
        {
            if (this.consumer is null)
                this.consumer = consumer;
            else
                throw new RebindConsumerException(GetType().Name);
        }
        public void BindConsumer(Func<List<T>, Task> consumer, int maxBatchSize, int maxMillisecondsDelay)
        {
            if (this.consumer is null)
            {
                this.consumer = consumer;
                MaxBatchSize = maxBatchSize;
                MaxMillisecondsDelay = maxMillisecondsDelay;
            }
            else
                throw new RebindConsumerException(GetType().Name);
        }
        public void Config(int maxBatchSize, int maxMillisecondsDelay)
        {
            MaxBatchSize = maxBatchSize;
            MaxMillisecondsDelay = maxMillisecondsDelay;
        }
        public async ValueTask<bool> WriteAsync(T data)
        {
            if (consumer is null)
                throw new NoBindConsumerException(GetType().Name);

            if (!buffer.Post(data))
                return await buffer.SendAsync(data);

            if (!IsChildren && _autoConsuming == 0)
                ActiveAutoConsumer();

            return true;
        }
        private void ActiveAutoConsumer()
        {
            if (!IsChildren && _autoConsuming == 0)
                ThreadPool.QueueUserWorkItem(ActiveConsumer);
            async void ActiveConsumer(object state)
            {
                if (Interlocked.CompareExchange(ref _autoConsuming, 1, 0) == 0)
                {
                    try
                    {
                        while (await WaitToReadAsync())
                        {
                            try
                            {
                                await ManualConsume();
                            }
                            catch (Exception ex)
                            {
                                logger.LogError(ex, ex.Message);
                            }
                        }
                    }
                    finally
                    {
                        Interlocked.Exchange(ref _autoConsuming, 0);
                    }
                }
            }
        }
        public void JoinConsumerSequence(IBaseMpscChannel channel)
        {
            if (consumerSequence.IndexOf(channel) == -1)
            {
                channel.IsChildren = true;
                consumerSequence.Add(channel);
            }
        }
        public async Task ManualConsume()
        {
            if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result)
            {
                var dataList = new List<T>();
                var startTime = DateTimeOffset.UtcNow;
                while (buffer.TryReceive(out var value))
                {
                    dataList.Add(value);
                    if (dataList.Count > MaxBatchSize)
                    {
                        break;
                    }
                    else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > MaxMillisecondsDelay)
                    {
                        break;
                    }
                }
                if (dataList.Count > 0)
                    await consumer(dataList);
            }
            foreach (var joinConsumer in consumerSequence)
            {
                await joinConsumer.ManualConsume();
            }
        }
        public async Task<bool> WaitToReadAsync()
        {
            waitToReadTask = buffer.OutputAvailableAsync();
            if (consumerSequence.Count == 0)
            {
                return await waitToReadTask;
            }
            else
            {
                var taskList = consumerSequence.Select(c => c.WaitToReadAsync()).ToList();
                taskList.Add(waitToReadTask);
                return await await Task.WhenAny(taskList);
            }
        }
        public void Complete()
        {
            IsComplete = true;
            foreach (var joinConsumer in consumerSequence)
            {
                joinConsumer.Complete();
            }
            buffer.Complete();
        }
    }
}