diff --git a/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs b/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 0000000..76ab8e1 --- /dev/null +++ b/Sharp7.Rx/Basics/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,146 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Sharp7.Rx.Basics +{ + /// + /// Provides a task scheduler that ensures a maximum concurrency level while + /// running on top of the ThreadPool. + /// from http://msdn.microsoft.com/en-us/library/ee789351.aspx + /// + public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler + { + /// Whether the current thread is processing work items. + [ThreadStatic] private static bool currentThreadIsProcessingItems; + + /// The maximum concurrency level allowed by this scheduler. + private readonly int maxDegreeOfParallelism; + + /// The list of tasks to be executed. + private readonly LinkedList tasks = new LinkedList(); // protected by lock(_tasks) + + /// Whether the scheduler is currently processing work items. + private int delegatesQueuedOrRunning; // protected by lock(_tasks) + + /// + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. + /// + /// The maximum degree of parallelism provided by this scheduler. + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + { + if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); + this.maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + /// Gets the maximum concurrency level supported by this scheduler. + public sealed override int MaximumConcurrencyLevel => maxDegreeOfParallelism; + + /// Gets an enumerable of the tasks currently scheduled on this scheduler. + /// An enumerable of the tasks currently scheduled. + protected sealed override IEnumerable GetScheduledTasks() + { + var lockTaken = false; + try + { + Monitor.TryEnter(tasks, ref lockTaken); + if (lockTaken) return tasks.ToArray(); + else throw new NotSupportedException(); + } + finally + { + if (lockTaken) Monitor.Exit(tasks); + } + } + + /// Queues a task to the scheduler. + /// The task to be queued. + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (tasks) + { + tasks.AddLast(task); + if (delegatesQueuedOrRunning < maxDegreeOfParallelism) + { + ++delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// Attempts to remove a previously scheduled task from the scheduler. + /// The task to be removed. + /// Whether the task could be found and removed. + protected sealed override bool TryDequeue(Task task) + { + lock (tasks) + { + return tasks.Remove(task); + } + } + + /// Attempts to execute the specified task on the current thread. + /// The task to be executed. + /// + /// Whether the task could be executed on the current thread. + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!currentThreadIsProcessingItems) return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) TryDequeue(task); + + // Try to run the task. + return TryExecuteTask(task); + } + + /// + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (tasks.Count == 0) + { + --delegatesQueuedOrRunning; + break; + } + + // Get the next item from the queue + item = tasks.First.Value; + tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally + { + currentThreadIsProcessingItems = false; + } + }, null); + } + } +} \ No newline at end of file diff --git a/Sharp7.Rx/Sharp7Connector.cs b/Sharp7.Rx/Sharp7Connector.cs index ea46cff..89e7921 100644 --- a/Sharp7.Rx/Sharp7Connector.cs +++ b/Sharp7.Rx/Sharp7Connector.cs @@ -8,6 +8,7 @@ using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Sharp7.Rx.Basics; using Sharp7.Rx.Enums; using Sharp7.Rx.Extensions; using Sharp7.Rx.Interfaces; @@ -23,7 +24,7 @@ namespace Sharp7.Rx private ConcurrentDictionary s7VariableAddresses = new ConcurrentDictionary(); private readonly CompositeDisposable disposables = new CompositeDisposable(); - private readonly TaskScheduler scheduler = TaskScheduler.Current; + private readonly LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism:1); private readonly string ipAddress; private readonly int rackNr; private readonly int cpuSlotNr;