diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f5fbf7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,263 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# DNX +project.lock.json +project.fragment.lock.json + +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.pch +*.pdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# TODO: Comment the next line if you want to checkin your web deploy settings +# but database connection strings (with potential passwords) will be unencrypted +#*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/packages/* +# except build/, which is used as an MSBuild target. +!**/packages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/packages/repositories.config +# NuGet v3's project.json files produces more ignoreable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +node_modules/ +orleans.codegen.cs + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm + +# SQL Server files +*.mdf +*.ldf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush +.cr/ + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +#Mac Store files +.DS_Store diff --git a/Logo_Symbol_Black_Outline.png b/Logo_Symbol_Black_Outline.png new file mode 100644 index 0000000..d8ecce0 Binary files /dev/null and b/Logo_Symbol_Black_Outline.png differ diff --git a/Parallel.Net.Tests/Parallel.Net.Tests.csproj b/Parallel.Net.Tests/Parallel.Net.Tests.csproj new file mode 100644 index 0000000..303973d --- /dev/null +++ b/Parallel.Net.Tests/Parallel.Net.Tests.csproj @@ -0,0 +1,32 @@ + + + + net6.0 + enable + enable + + false + true + MontoyaTech.Parallel.Net.Tests + MontoyaTech.Parallel.Net.Tests + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + diff --git a/Parallel.Net.Tests/ParallelTests.cs b/Parallel.Net.Tests/ParallelTests.cs new file mode 100644 index 0000000..89e0f1a --- /dev/null +++ b/Parallel.Net.Tests/ParallelTests.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Xunit; +using FluentAssertions; +using MontoyaTech.Parallel.Net; + +namespace MontoyaTech.Parallel.Net.Tests +{ + public class ParallelTests + { + [Fact] + public void Parallel_NoMaxTasks_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var results = inputs.Parallel(input => input * 5); + + results.Count.Should().Be(inputs.Count); + + results.Should().Contain(0); + results.Should().Contain(5); + results.Should().Contain(10); + results.Should().Contain(15); + results.Should().Contain(20); + } + + [Fact] + public void Parallel_MaxTasks_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var results = inputs.Parallel(input => input * 5, maxTasks: 2); + + results.Count.Should().Be(inputs.Count); + + results.Should().Contain(0); + results.Should().Contain(5); + results.Should().Contain(10); + results.Should().Contain(15); + results.Should().Contain(20); + } + + [Fact] + public void Parallel_MaxTasks_Zero_Ignore_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var results = inputs.Parallel(input => input * 5, maxTasks: 0); + + results.Count.Should().Be(inputs.Count); + + results.Should().Contain(0); + results.Should().Contain(5); + results.Should().Contain(10); + results.Should().Contain(15); + results.Should().Contain(20); + } + + [Fact] + public void Parallel_Timeout_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var action = new Action(() => inputs.Parallel(input => input * 5, timeout: TimeSpan.FromSeconds(0))); + + action.Should().Throw(); + } + + [Fact] + public void Parallel_Timeout_NoException_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var action = new Action(() => inputs.Parallel(input => input * 5, timeout: TimeSpan.FromSeconds(0), throwOnTimeout: false)); + + action.Should().NotThrow(); + } + + [Fact] + public void Parallel_NoTimeout_Exception_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var action = new Action(() => inputs.Parallel(input => input * 5)); + + action.Should().NotThrow(); + } + + [Fact] + public void Parallel_Exception_Should_Bubble() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var results = new List(); + + var action = new Action(() => results = inputs.Parallel(input => { + if (input == 0) + throw new Exception("Testing"); + + return input * 5; + })); + + action.Should().Throw(); + + results.Count.Should().Be(0); + + inputs.RemoveAt(0); + + action.Should().NotThrow(); + + results.Count.Should().BeGreaterThan(0); + } + + [Fact] + public void Parallel_Exception_NoBubble_Should_Work() + { + var inputs = new List() { 0, 1, 2, 3, 4 }; + + var results = new List(); + + var action = new Action(() => results = inputs.Parallel(input => { + if (input == 0) + throw new Exception("Testing"); + + return input * 5; + }, bubbleExceptions: false)); + + action.Should().NotThrow(); + + results.Count.Should().BeGreaterThan(0); + } + } +} diff --git a/Parallel.Net.sln b/Parallel.Net.sln new file mode 100644 index 0000000..e2d19b0 --- /dev/null +++ b/Parallel.Net.sln @@ -0,0 +1,31 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.6.33815.320 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Parallel.Net", "Parallel.Net\Parallel.Net.csproj", "{985B6E54-9C5F-4560-BF0A-8EAF63ACEF44}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Parallel.Net.Tests", "Parallel.Net.Tests\Parallel.Net.Tests.csproj", "{5F1D1944-9FBC-40F6-8861-083839CEC00A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {985B6E54-9C5F-4560-BF0A-8EAF63ACEF44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {985B6E54-9C5F-4560-BF0A-8EAF63ACEF44}.Debug|Any CPU.Build.0 = Debug|Any CPU + {985B6E54-9C5F-4560-BF0A-8EAF63ACEF44}.Release|Any CPU.ActiveCfg = Release|Any CPU + {985B6E54-9C5F-4560-BF0A-8EAF63ACEF44}.Release|Any CPU.Build.0 = Release|Any CPU + {5F1D1944-9FBC-40F6-8861-083839CEC00A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5F1D1944-9FBC-40F6-8861-083839CEC00A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5F1D1944-9FBC-40F6-8861-083839CEC00A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5F1D1944-9FBC-40F6-8861-083839CEC00A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {A1921B0D-A0B8-4094-8B29-6B23CEAE18C1} + EndGlobalSection +EndGlobal diff --git a/Parallel.Net/Parallel.Net.csproj b/Parallel.Net/Parallel.Net.csproj new file mode 100644 index 0000000..d098137 --- /dev/null +++ b/Parallel.Net/Parallel.Net.csproj @@ -0,0 +1,30 @@ + + + + net6.0 + enable + enable + MontoyaTech.Parallel.Net + MontoyaTech.Parallel.Net + True + MontoyaTech.Parallel.Net + A simple library with a set of extensions for helping do work on a set of items in parallel. + MontoyaTech + https://code.montoyatech.com/MontoyaTech/Parallel.Net + Logo_Symbol_Black_Outline.png + https://code.montoyatech.com/MontoyaTech/Parallel.Net + True + + + + + True + \ + + + + + + + + diff --git a/Parallel.Net/ParallelTask.cs b/Parallel.Net/ParallelTask.cs new file mode 100644 index 0000000..df39276 --- /dev/null +++ b/Parallel.Net/ParallelTask.cs @@ -0,0 +1,124 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MontoyaTech.Parallel.Net +{ + /// + /// A set of helper functions for working with ParallelTasks. + /// + public class ParallelTask + { + /// + /// Takes a set of inputes and creates a set of parallel tasks and runs them returning the results. The results will be out of order. + /// + /// The input type + /// The result type + /// The list of inputs to process + /// The function to run against each input and produce a result + /// The max number of tasks to run in parallel. If null the number of inputs becomes the number of tasks. Default is null. + /// If set this function will return early if the timeout is reached even if the tasks are not done. Default is null. + /// If true an exception will be thrown if a timeout is reached if one was set. Default is true. + /// If true, rethrows any exceptions from the running tasks. Default is true. + /// The list of results from the inputs processed. + public static List ForEach(IList inputs, Func func, int? maxTasks = null, TimeSpan? timeout = null, bool throwOnTimeout = true, bool bubbleExceptions = true) + { + if (inputs == null || inputs.Count == 0) + return new List(); + + var tasks = new List, List>>(); + + if (!maxTasks.HasValue) + maxTasks = inputs.Count; + else if (maxTasks <= 0) + maxTasks = 1; + + int itemsPerChunk = (inputs.Count / maxTasks.Value) + 1; + + int itemIndex = 0; + + for (int i = 0; i < maxTasks.Value; i++) + { + var chunk = new List(); + + for (int p = 0; p < itemsPerChunk; p++) + { + chunk.Add(inputs[itemIndex++]); + + if (itemIndex >= inputs.Count) + break; + } + + if (chunk.Count > 0) + { + tasks.Add(new ParallelTask, List>(inputs => + { + var results = new List(); + + if (inputs != null) + for (int i = 0; i < inputs.Count; i++) + results.Add(func(inputs[i])); + + return results; + }, chunk)); + } + + if (itemIndex >= inputs.Count) + break; + } + + if (tasks.Count == 0) + return new List(); + + var chunkResults = WhenAll(tasks, timeout, throwOnTimeout); + + if (bubbleExceptions) + for (int i = 0; i < tasks.Count; i++) + if (tasks[i].Failed && tasks[i].ThrownException != null) + throw tasks[i].ThrownException; + + var results = new List(); + + foreach (var chunk in chunkResults) + if (chunk != null) + results.AddRange(chunk); + + return results; + } + + /// + /// Runs a list of tasks and waits until they have been completed or failed and returns the results. + /// + /// The input type + /// The result type + /// The list of tasks to run + /// If set this function will leave early if the timeout is met. Default is null. + /// If set an exception will be thrown if a timeout is reached. Default is true. + /// The list of results from the tasks. + /// A timeout exception is thrown if throwOnTimeout is true and a timeout is reached. + public static List WhenAll(IList> tasks, TimeSpan? timeout = null, bool throwOnTimeout = true) + { + if (tasks == null || tasks.Count == 0) + return new List(); + + var countDown = new CountdownEvent(tasks.Count); + + foreach (var task in tasks) + task.Run(timeout, false, throwOnTimeout, countDown); + + if (!timeout.HasValue) + countDown.Wait(); + else if (timeout.HasValue && !countDown.Wait(timeout.Value) && throwOnTimeout) + throw new Exception($"ParallelTasks did not complete within {timeout.Value.TotalMilliseconds} miliseconds"); + + var results = new List(); + + foreach (var task in tasks) + results.Add(task.Result); + + return results; + } + } +} diff --git a/Parallel.Net/ParallelTaskExtensions.cs b/Parallel.Net/ParallelTaskExtensions.cs new file mode 100644 index 0000000..4b87967 --- /dev/null +++ b/Parallel.Net/ParallelTaskExtensions.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MontoyaTech.Parallel.Net +{ + public static class ParallelTaskExtensions + { + /// + /// Process a list of items in parallel and returns the results. The results will be out of order. + /// + /// Input type + /// Result type + /// The list of items to process in parallel + /// The function to take an input and return a result + /// The max tasks to run in parallel if specified. Default is null. If null the number of items will be the number of tasks + /// If set a timeout will be thrown if this doesnt complete in time. Default is null + /// Whether or not to throw an exception on timeout. Default is true + /// If true, rethrows any exceptions from the running tasks. Default is true. + /// A list of results from the items processed. + public static List Parallel(this IList list, Func func, int? maxTasks = null, TimeSpan? timeout = null, bool throwOnTimeout = true, bool bubbleExceptions = true) + { + return ParallelTask.ForEach(list, func, maxTasks, timeout, throwOnTimeout, bubbleExceptions); + } + } +} diff --git a/Parallel.Net/ParallelTaskT.cs b/Parallel.Net/ParallelTaskT.cs new file mode 100644 index 0000000..82b0fb4 --- /dev/null +++ b/Parallel.Net/ParallelTaskT.cs @@ -0,0 +1,149 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MontoyaTech.Parallel.Net +{ + /// + /// The outline of a ParallelTask that takes in an input, and produces and output when ran. + /// + /// + /// + public class ParallelTask + { + /// + /// Whether or not this parallel task is currently running. + /// + public bool Running + { + get + { + return this.running && !this.completed && !this.failed; + } + } + + /// + /// Whether or not this parallel task completed successfully. + /// + public bool Completed + { + get + { + return this.completed; + } + } + + /// + /// Whether or not this parallel task failed due to an exception. + /// + public bool Failed + { + get + { + return this.failed; + } + } + + /// + /// The result of this parallel task if it completed successfully. + /// + public ResultT Result + { + get + { + return this.result; + } + } + + /// + /// The exception that was thrown while running this parallel task if it failed. + /// + public Exception ThrownException + { + get + { + return this.thrownException; + } + } + + protected bool running = false; + + protected bool completed = false; + + protected bool failed = false; + + protected Func work = null; + + protected InputT input = default(InputT); + + protected ResultT result = default(ResultT); + + protected bool exceptionStopped = false; + + protected ManualResetEvent handle; + + protected Exception thrownException = null; + + /// + /// Creates a new parallel task with the worker function and input. + /// + /// + /// + public ParallelTask(Func work, InputT input) + { + this.work = work; + + this.input = input; + } + + /// + /// Runs this parallel task and returns the result when it completes. + /// + /// Default is null, if set, this function will return if the timeout is reached regardless if the task is complete. + /// Default is true, whether or not to block the caller function and not return until this task is complete. + /// Whether or not to throw an exception if a timeout is reached, if one was specified. Default is true. + /// An optional count down that if specified is signaled when the task completes. + /// The result from the worker function ran in this task. + /// Throws a timeout exception if the task doesn't complete in time. + public ResultT Run(TimeSpan? timeout = null, bool blocking = true, bool throwOnTimeout = true, CountdownEvent countDown = null) + { + this.running = true; + + this.handle = new ManualResetEvent(false); + + ThreadPool.QueueUserWorkItem((object state) => + { + try + { + this.result = this.work((InputT)state); + + this.completed = true; + } + catch (Exception ex) + { + this.failed = true; + + this.thrownException = ex; + } + + this.handle.Set(); + + if (countDown != null) + countDown.Signal(); + + }, this.input); + + if (blocking) + { + if (!timeout.HasValue) + this.handle.WaitOne(); + else if (!this.handle.WaitOne(timeout.Value) && throwOnTimeout) + throw new Exception($"ParallelTask did not complete within {timeout.Value.TotalMilliseconds} miliseconds"); + } + + return this.result; + } + } +}