From ffaee93aa6a69227c5169cfc9dcd6e2cf8ebe635 Mon Sep 17 00:00:00 2001 From: MattMo Date: Mon, 24 Jul 2023 11:42:33 -0700 Subject: [PATCH] Bumped package version to 1.0.1, added more extensions and helper functions to process a list of items without getting a result back. Cleaned up the code, improved documentation. --- Parallel.Net.Tests/ParallelTests.cs | 31 ++++-- Parallel.Net/Parallel.Net.csproj | 2 +- Parallel.Net/ParallelTask.cs | 145 ++++++++++++++++++++++++- Parallel.Net/ParallelTaskExtensions.cs | 17 ++- Parallel.Net/ParallelTaskT.cs | 138 ++++++++++++----------- 5 files changed, 259 insertions(+), 74 deletions(-) diff --git a/Parallel.Net.Tests/ParallelTests.cs b/Parallel.Net.Tests/ParallelTests.cs index 89e0f1a..65a34e3 100644 --- a/Parallel.Net.Tests/ParallelTests.cs +++ b/Parallel.Net.Tests/ParallelTests.cs @@ -16,7 +16,7 @@ namespace MontoyaTech.Parallel.Net.Tests { var inputs = new List() { 0, 1, 2, 3, 4 }; - var results = inputs.Parallel(input => input * 5); + var results = inputs.ForEachParallel(input => input * 5); results.Count.Should().Be(inputs.Count); @@ -32,7 +32,7 @@ namespace MontoyaTech.Parallel.Net.Tests { var inputs = new List() { 0, 1, 2, 3, 4 }; - var results = inputs.Parallel(input => input * 5, maxTasks: 2); + var results = inputs.ForEachParallel(input => input * 5, maxTasks: 2); results.Count.Should().Be(inputs.Count); @@ -48,7 +48,7 @@ namespace MontoyaTech.Parallel.Net.Tests { var inputs = new List() { 0, 1, 2, 3, 4 }; - var results = inputs.Parallel(input => input * 5, maxTasks: 0); + var results = inputs.ForEachParallel(input => input * 5, maxTasks: 0); results.Count.Should().Be(inputs.Count); @@ -64,7 +64,7 @@ namespace MontoyaTech.Parallel.Net.Tests { var inputs = new List() { 0, 1, 2, 3, 4 }; - var action = new Action(() => inputs.Parallel(input => input * 5, timeout: TimeSpan.FromSeconds(0))); + var action = new Action(() => inputs.ForEachParallel(input => input * 5, timeout: TimeSpan.FromSeconds(0))); action.Should().Throw(); } @@ -74,7 +74,7 @@ namespace MontoyaTech.Parallel.Net.Tests { var inputs = new List() { 0, 1, 2, 3, 4 }; - var action = new Action(() => inputs.Parallel(input => input * 5, timeout: TimeSpan.FromSeconds(0), throwOnTimeout: false)); + var action = new Action(() => inputs.ForEachParallel(input => input * 5, timeout: TimeSpan.FromSeconds(0), throwOnTimeout: false)); action.Should().NotThrow(); } @@ -84,7 +84,7 @@ namespace MontoyaTech.Parallel.Net.Tests { var inputs = new List() { 0, 1, 2, 3, 4 }; - var action = new Action(() => inputs.Parallel(input => input * 5)); + var action = new Action(() => inputs.ForEachParallel(input => input * 5)); action.Should().NotThrow(); } @@ -96,7 +96,7 @@ namespace MontoyaTech.Parallel.Net.Tests var results = new List(); - var action = new Action(() => results = inputs.Parallel(input => { + var action = new Action(() => results = inputs.ForEachParallel(input => { if (input == 0) throw new Exception("Testing"); @@ -121,7 +121,7 @@ namespace MontoyaTech.Parallel.Net.Tests var results = new List(); - var action = new Action(() => results = inputs.Parallel(input => { + var action = new Action(() => results = inputs.ForEachParallel(input => { if (input == 0) throw new Exception("Testing"); @@ -132,5 +132,20 @@ namespace MontoyaTech.Parallel.Net.Tests results.Count.Should().BeGreaterThan(0); } + + [Fact] + public void Parallel_Inputs_Only_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + int results = 0; + + inputs.ForEachParallel(input => + { + results++; + }); + + results.Should().Be(5); + } } } diff --git a/Parallel.Net/Parallel.Net.csproj b/Parallel.Net/Parallel.Net.csproj index ae87881..945d1c3 100644 --- a/Parallel.Net/Parallel.Net.csproj +++ b/Parallel.Net/Parallel.Net.csproj @@ -14,7 +14,7 @@ Logo_Symbol_Black_Outline.png https://code.montoyatech.com/MontoyaTech/Parallel.Net True - 1.0.0 + 1.0.1 diff --git a/Parallel.Net/ParallelTask.cs b/Parallel.Net/ParallelTask.cs index df39276..17df7ae 100644 --- a/Parallel.Net/ParallelTask.cs +++ b/Parallel.Net/ParallelTask.cs @@ -12,7 +12,63 @@ namespace MontoyaTech.Parallel.Net public class ParallelTask { /// - /// Takes a set of inputes and creates a set of parallel tasks and runs them returning the results. The results will be out of order. + /// Whether or not this parallel task is currently running. + /// + public bool Running + { + get + { + return this.running && !this.completed && !this.failed; + } + } + + /// + /// Whether or not this parallel task completed successfully. + /// + public bool Completed + { + get + { + return this.completed; + } + } + + /// + /// Whether or not this parallel task failed due to an exception. + /// + public bool Failed + { + get + { + return this.failed; + } + } + + /// + /// The exception that was thrown while running this parallel task if it failed. + /// + public Exception ThrownException + { + get + { + return this.thrownException; + } + } + + protected bool running = false; + + protected bool completed = false; + + protected bool failed = false; + + protected bool exceptionStopped = false; + + protected ManualResetEvent handle; + + protected Exception thrownException = null; + + /// + /// Takes a set of inputs and creates a set of parallel tasks and runs them returning the results. The results will be out of order. /// /// The input type /// The result type @@ -88,6 +144,69 @@ namespace MontoyaTech.Parallel.Net return results; } + /// + /// Takes a set of inputs and creates a set of parallel tasks and runs them. + /// + /// The input type + /// The list of inputs to process + /// The action to run against each input + /// The max number of tasks to run in parallel. If null the number of inputs becomes the number of tasks. Default is null. + /// If set this function will return early if the timeout is reached even if the tasks are not done. Default is null. + /// If true an exception will be thrown if a timeout is reached if one was set. Default is true. + /// If true, rethrows any exceptions from the running tasks. Default is true. + public static void ForEach(IList inputs, Action action, int? maxTasks = null, TimeSpan? timeout = null, bool throwOnTimeout = true, bool bubbleExceptions = true) + { + if (inputs == null || inputs.Count == 0) + return; + + var tasks = new List>>(); + + if (!maxTasks.HasValue) + maxTasks = inputs.Count; + else if (maxTasks <= 0) + maxTasks = 1; + + int itemsPerChunk = (inputs.Count / maxTasks.Value) + 1; + + int itemIndex = 0; + + for (int i = 0; i < maxTasks.Value; i++) + { + var chunk = new List(); + + for (int p = 0; p < itemsPerChunk; p++) + { + chunk.Add(inputs[itemIndex++]); + + if (itemIndex >= inputs.Count) + break; + } + + if (chunk.Count > 0) + { + tasks.Add(new ParallelTask>(inputs => + { + if (inputs != null) + for (int i = 0; i < inputs.Count; i++) + action(inputs[i]); + }, chunk)); + } + + if (itemIndex >= inputs.Count) + break; + } + + if (tasks.Count == 0) + return; + + WhenAll(tasks, timeout, throwOnTimeout); + + if (bubbleExceptions) + for (int i = 0; i < tasks.Count; i++) + if (tasks[i].Failed && tasks[i].ThrownException != null) + throw tasks[i].ThrownException; + } + /// /// Runs a list of tasks and waits until they have been completed or failed and returns the results. /// @@ -120,5 +239,29 @@ namespace MontoyaTech.Parallel.Net return results; } + + /// + /// Runs a list of tasks and waits until they have been completed or failed. + /// + /// The input type + /// The list of tasks to run + /// If set this function will leave early if the timeout is met. Default is null. + /// If set an exception will be thrown if a timeout is reached. Default is true. + /// A timeout exception is thrown if throwOnTimeout is true and a timeout is reached. + public static void WhenAll(IList> tasks, TimeSpan? timeout = null, bool throwOnTimeout = true) + { + if (tasks == null || tasks.Count == 0) + return; + + var countDown = new CountdownEvent(tasks.Count); + + foreach (var task in tasks) + task.Run(timeout, false, throwOnTimeout, countDown); + + if (!timeout.HasValue) + countDown.Wait(); + else if (timeout.HasValue && !countDown.Wait(timeout.Value) && throwOnTimeout) + throw new Exception($"ParallelTasks did not complete within {timeout.Value.TotalMilliseconds} miliseconds"); + } } } diff --git a/Parallel.Net/ParallelTaskExtensions.cs b/Parallel.Net/ParallelTaskExtensions.cs index 4b87967..cc2e3dd 100644 --- a/Parallel.Net/ParallelTaskExtensions.cs +++ b/Parallel.Net/ParallelTaskExtensions.cs @@ -20,9 +20,24 @@ namespace MontoyaTech.Parallel.Net /// Whether or not to throw an exception on timeout. Default is true /// If true, rethrows any exceptions from the running tasks. Default is true. /// A list of results from the items processed. - public static List Parallel(this IList list, Func func, int? maxTasks = null, TimeSpan? timeout = null, bool throwOnTimeout = true, bool bubbleExceptions = true) + public static List ForEachParallel(this IList list, Func func, int? maxTasks = null, TimeSpan? timeout = null, bool throwOnTimeout = true, bool bubbleExceptions = true) { return ParallelTask.ForEach(list, func, maxTasks, timeout, throwOnTimeout, bubbleExceptions); } + + /// + /// Processes a list of items in parallel and runs an action against them. + /// + /// Input type + /// The list of items to process in parallel + /// The action to take an input and perform an operation on it + /// The max tasks to run in parallel if specified. Default is null. If null the number of items will be the number of tasks + /// If set a timeout will be thrown if this doesnt complete in time. Default is null + /// Whether or not to throw an exception on timeout. Default is true + /// If true, rethrows any exceptions from the running tasks. Default is true. + public static void ForEachParallel(this IList list, Action action, int? maxTasks = null, TimeSpan? timeout = null, bool throwOnTimeout = true, bool bubbleExceptions = true) + { + ParallelTask.ForEach(list, action, maxTasks, timeout, throwOnTimeout, bubbleExceptions); + } } } diff --git a/Parallel.Net/ParallelTaskT.cs b/Parallel.Net/ParallelTaskT.cs index 82b0fb4..296dfda 100644 --- a/Parallel.Net/ParallelTaskT.cs +++ b/Parallel.Net/ParallelTaskT.cs @@ -11,41 +11,8 @@ namespace MontoyaTech.Parallel.Net /// /// /// - public class ParallelTask + public class ParallelTask : ParallelTask { - /// - /// Whether or not this parallel task is currently running. - /// - public bool Running - { - get - { - return this.running && !this.completed && !this.failed; - } - } - - /// - /// Whether or not this parallel task completed successfully. - /// - public bool Completed - { - get - { - return this.completed; - } - } - - /// - /// Whether or not this parallel task failed due to an exception. - /// - public bool Failed - { - get - { - return this.failed; - } - } - /// /// The result of this parallel task if it completed successfully. /// @@ -57,43 +24,20 @@ namespace MontoyaTech.Parallel.Net } } - /// - /// The exception that was thrown while running this parallel task if it failed. - /// - public Exception ThrownException - { - get - { - return this.thrownException; - } - } - - protected bool running = false; - - protected bool completed = false; - - protected bool failed = false; - - protected Func work = null; - - protected InputT input = default(InputT); - protected ResultT result = default(ResultT); - protected bool exceptionStopped = false; + protected Func func = null; - protected ManualResetEvent handle; - - protected Exception thrownException = null; + protected InputT input = default(InputT); /// /// Creates a new parallel task with the worker function and input. /// - /// + /// /// - public ParallelTask(Func work, InputT input) + public ParallelTask(Func func, InputT input) { - this.work = work; + this.func = func; this.input = input; } @@ -117,7 +61,7 @@ namespace MontoyaTech.Parallel.Net { try { - this.result = this.work((InputT)state); + this.result = this.func((InputT)state); this.completed = true; } @@ -146,4 +90,72 @@ namespace MontoyaTech.Parallel.Net return this.result; } } + + /// + /// The outline of a ParallelTask that takes an input and processes it when ran. + /// + /// + public class ParallelTask : ParallelTask + { + protected Action action = null; + + protected InputT input = default(InputT); + + /// + /// Creates a new parallel task with the worker function and input. + /// + /// + /// + public ParallelTask(Action action, InputT input) + { + this.action = action; + + this.input = input; + } + + /// + /// Runs this parallel task and returns when it completes. + /// + /// Default is null, if set, this function will return if the timeout is reached regardless if the task is complete. + /// Default is true, whether or not to block the caller function and not return until this task is complete. + /// Whether or not to throw an exception if a timeout is reached, if one was specified. Default is true. + /// An optional count down that if specified is signaled when the task completes. + /// Throws a timeout exception if the task doesn't complete in time. + public void Run(TimeSpan? timeout = null, bool blocking = true, bool throwOnTimeout = true, CountdownEvent countDown = null) + { + this.running = true; + + this.handle = new ManualResetEvent(false); + + ThreadPool.QueueUserWorkItem((object state) => + { + try + { + this.action((InputT)state); + + this.completed = true; + } + catch (Exception ex) + { + this.failed = true; + + this.thrownException = ex; + } + + this.handle.Set(); + + if (countDown != null) + countDown.Signal(); + + }, this.input); + + if (blocking) + { + if (!timeout.HasValue) + this.handle.WaitOne(); + else if (!this.handle.WaitOne(timeout.Value) && throwOnTimeout) + throw new Exception($"ParallelTask did not complete within {timeout.Value.TotalMilliseconds} miliseconds"); + } + } + } }