PSParallelPipeline.psm1

using namespace System.Text
using namespace System.Threading
using namespace System.Diagnostics
using namespace System.Collections.Generic
using namespace System.Management.Automation
using namespace System.Management.Automation.Host
using namespace System.Management.Automation.Language
using namespace System.Management.Automation.Runspaces

class PSParallelTask : IDisposable {
    [powershell] $Instance
    [IAsyncResult] $AsyncResult
    [PSCmdlet] $Cmdlet

    PSParallelTask([scriptblock] $Action, [object] $PipelineObject, [PSCmdlet] $Cmdlet) {
        # Thanks to Patrick Meinecke for his help here.
        # https://github.com/SeeminglyScience/
        $this.Cmdlet   = $Cmdlet
        $this.Instance = [powershell]::Create().AddScript({
            param([scriptblock] $Action, [object] $Context)

            $Action.InvokeWithContext($null, [psvariable]::new('_', $Context))
        }).AddParameters(@{
            Action  = $Action.Ast.GetScriptBlock()
            Context = $PipelineObject
        })
    }

    [PSParallelTask] AddUsingStatements([hashtable] $UsingStatements) {
        if($UsingStatements.Count) {
            # Credits to Jordan Borean for his help here.
            # https://github.com/jborean93
            $this.Instance.AddParameters(@{ '--%' = $UsingStatements })
        }
        return $this
    }

    [void] Run() {
        $this.AsyncResult = $this.Instance.BeginInvoke()
    }

    [void] EndInvoke() {
        try {
            $this.Cmdlet.WriteObject($this.Instance.EndInvoke($this.AsyncResult), $true)
            $this.GetErrors()
        }
        catch {
            $this.Cmdlet.WriteError($_)
        }
    }

    [void] Stop() {
        $this.Instance.Stop()
    }

    [void] GetErrors() {
        if($this.Instance.HadErrors) {
            foreach($err in $this.Instance.Streams.Error) {
                $this.Cmdlet.WriteError($err)
            }
        }
    }

    [PSParallelTask] AssociateWith([runspace] $Runspace) {
        $this.Instance.Runspace = $Runspace
        return $this
    }

    [runspace] GetRunspace() {
        return $this.Instance.Runspace
    }

    [void] Dispose() {
        $this.Instance.Dispose()
    }
}

class InvocationManager : IDisposable {
    [int] $ThrottleLimit
    [PSHost] $PSHost
    [initialsessionstate] $InitialSessionState
    [Stack[runspace]] $Runspaces = [Stack[runspace]]::new()
    [List[PSParallelTask]] $Tasks = [List[PSParallelTask]]::new()
    [bool] $UseNewRunspace
    hidden [int] $TotalMade

    InvocationManager(
        [int] $ThrottleLimit,
        [PSHost] $PSHost,
        [initialsessionstate] $InitialSessionState,
        [bool] $UseNewRunspace
    ) {
        $this.ThrottleLimit       = $ThrottleLimit
        $this.PSHost              = $PSHost
        $this.InitialSessionState = $InitialSessionState
        $this.UseNewRunspace      = $UseNewRunspace
    }

    [runspace] TryGet() {
        if($this.Runspaces.Count) {
            return $this.Runspaces.Pop()
        }

        if($this.TotalMade -ge $this.ThrottleLimit) {
            return $null
        }

        $this.TotalMade++
        return $this.CreateRunspace()
    }

    [runspace] CreateRunspace() {
        $runspace = [runspacefactory]::CreateRunspace($this.PSHost, $this.InitialSessionState)
        $runspace.Open()
        return $runspace
    }

    [PSParallelTask] WaitAny() {
        if(-not $this.Tasks.Count) {
            return $null
        }

        do {
            $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200)
        }
        while($id -eq [WaitHandle]::WaitTimeout)

        return $this.Tasks[$id]
    }

    [PSParallelTask] WaitAny([int] $TimeoutSeconds, [Stopwatch] $Timer) {
        if(-not $this.Tasks.Count) {
            return $null
        }

        do {
            if($TimeoutSeconds -lt $Timer.Elapsed.TotalSeconds) {
                $this.Tasks[0].Stop()
                return $this.Tasks[0]
            }

            $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200)
        }
        while($id -eq [WaitHandle]::WaitTimeout)

        return $this.Tasks[$id]
    }

    [void] GetTaskResult([PSParallelTask] $Task) {
        $this.Tasks.Remove($Task)
        $this.Release($Task.GetRunspace())
        $Task.EndInvoke()

        if($Task -is [IDisposable]) {
            $Task.Dispose()
        }
    }

    [void] Release([runspace] $runspace) {
        if($this.UseNewRunspace) {
            $runspace.Dispose()
            $runspace = $this.CreateRunspace()
        }

        $this.Runspaces.Push($runspace)
    }

    [void] AddTask([PSParallelTask] $Task) {
        $this.Tasks.Add($Task)
        $Task.Run()
    }

    [void] Dispose() {
        while($runspace = $this.TryGet()) {
            $runspace.Dispose()
        }
    }
}

<#
    .SYNOPSIS
    Enables parallel processing of pipeline input objects.
 
    .DESCRIPTION
    PowerShell function that intends to emulate `ForEach-Object -Parallel` for those stuck with Windows PowerShell.
    This function shares similar usage and capabilities than the ones available in the built-in cmdlet.
    This project is greatly inspired by RamblingCookieMonster's `Invoke-Parallel` and Boe Prox's `PoshRSJob`.
 
    .PARAMETER InputObject
    Specifies the input objects to be processed in the ScriptBlock.
    Note: This parameter is intended to be bound from pipeline.
 
    .PARAMETER ScriptBlock
    Specifies the operation that is performed on each input object.
    This script block is run for every object in the pipeline.
 
    .PARAMETER ThrottleLimit
    Specifies the number of script blocks that are invoked in parallel.
    Input objects are blocked until the running script block count falls below the ThrottleLimit.
    The default value is `5`.
 
    .PARAMETER Variables
    Specifies a hash table of variables to have available in the Script Block (Runspaces).
    The hash table `Keys` become the Variable Name inside the Script Block.
 
    .PARAMETER Functions
    Existing functions in the Local Session to have available in the Script Block (Runspaces).
 
    .PARAMETER TimeoutSeconds
    Specifies the number of seconds to wait for all input to be processed in parallel.
    After the specified timeout time, all running scripts are stopped and any remaining input objects to be processed are ignored.
 
    .PARAMETER UseNewRunspace
    Uses a new runspace for each parallel invocation instead of reusing them.
 
    .EXAMPLE
    $message = 'Hello world from {0}'
 
    0..10 | Invoke-Parallel {
        $using:message -f [runspace]::DefaultRunspace.InstanceId
        Start-Sleep 3
    } -ThrottleLimit 3
 
    Run slow script in parallel batches.
 
    .EXAMPLE
    $message = 'Hello world from {0}'
 
    0..10 | Invoke-Parallel {
        $message -f [runspace]::DefaultRunspace.InstanceId
        Start-Sleep 3
    } -Variables @{ message = $message } -ThrottleLimit 3
 
    Same as Example 1 but with `-Variables` parameter.
 
    .EXAMPLE
    $threadSafeDictionary = [System.Collections.Concurrent.ConcurrentDictionary[string,object]]::new()
 
    Get-Process | Invoke-Parallel {
        $dict = $using:threadSafeDictionary
        $dict.TryAdd($_.ProcessName, $_)
    }
 
    $threadSafeDictionary["pwsh"]
 
    Adding to a single thread safe instance.
 
    .EXAMPLE
    $threadSafeDictionary = [System.Collections.Concurrent.ConcurrentDictionary[string,object]]::new()
 
    Get-Process | Invoke-Parallel {
        $dict.TryAdd($_.ProcessName, $_)
    } -Variables @{ dict = $threadSafeDictionary }
 
    $threadSafeDictionary["pwsh"]
 
    Same as Example 3, but using `-Variables` to pass the reference instance to the runspaces.
 
    .EXAMPLE
    function Greet { param($s) "$s hey there!" }
 
    0..10 | Invoke-Parallel { Greet $_ } -Functions Greet
 
    This example demonstrates how to pass a locally defined Function to the Runspaces scope.
 
    .LINK
    https://github.com/santisq/PSParallelPipeline
#>


function Invoke-Parallel {
    [CmdletBinding(PositionalBinding = $false)]
    [Alias('parallel')]
    param(
        [Parameter(Mandatory, ValueFromPipeline)]
        [object] $InputObject,

        [Parameter(Mandatory, Position = 0)]
        [scriptblock] $ScriptBlock,

        [Parameter()]
        [ValidateRange(1, 63)]
        [int] $ThrottleLimit = 5,

        [Parameter()]
        [ValidateNotNullOrEmpty()]
        [hashtable] $Variables,

        [Parameter()]
        [ValidateNotNullOrEmpty()]
        [ArgumentCompleter({
            param(
                [string] $commandName,
                [string] $parameterName,
                [string] $wordToComplete
            )

            (Get-Command -CommandType Filter, Function).Name -like "$wordToComplete*"
        })]
        [string[]] $Functions,

        [Parameter()]
        [switch] $UseNewRunspace,

        [Parameter()]
        [ValidateRange(1, [int]::MaxValue)]
        [int] $TimeoutSeconds
    )

    begin {
        try {
            $iss = [initialsessionstate]::CreateDefault2()

            foreach($key in $Variables.PSBase.Keys) {
                $iss.Variables.Add([SessionStateVariableEntry]::new($key, $Variables[$key], ''))
            }

            foreach($function in $Functions) {
                $def = (Get-Command $function).Definition
                $iss.Commands.Add([SessionStateFunctionEntry]::new($function, $def))
            }

            $usingParams = @{}

            # Thanks to mklement0 for catching up a bug here.
            # https://github.com/mklement0
            foreach($usingstatement in $ScriptBlock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) {
                $varText = $usingstatement.Extent.Text
                $varPath = $usingstatement.SubExpression.VariablePath.UserPath

                $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText.ToLowerInvariant()))
                if(-not $usingParams.ContainsKey($key)) {
                    $usingParams.Add($key, $PSCmdlet.SessionState.PSVariable.GetValue($varPath))
                }
            }

            $im = [InvocationManager]::new($ThrottleLimit, $Host, $iss, $UseNewRunspace.IsPresent)

            if($withTimeout = $PSBoundParameters.ContainsKey('TimeoutSeconds')) {
                $timer = [Stopwatch]::StartNew()
            }
        }
        catch {
            $PSCmdlet.ThrowTerminatingError($_)
        }
    }

    process {
        try {
            do {
                if($runspace = $im.TryGet()) {
                    continue
                }

                if($withTimeout) {
                    if($task = $im.WaitAny($TimeoutSeconds, $timer)) {
                        $im.GetTaskResult($task)
                    }
                    continue
                }

                if($task = $im.WaitAny()) {
                    $im.GetTaskResult($task)
                }
            }
            until($runspace -or $TimeoutSeconds -lt $timer.Elapsed.TotalSeconds)

            if($TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) {
                return
            }

            $im.AddTask(
                [PSParallelTask]::new($ScriptBlock, $InputObject, $PSCmdlet).
                AssociateWith($runspace).
                AddUsingStatements($usingParams))
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
    }

    end {
        try {
            if($withTimeout) {
                while($task = $im.WaitAny($TimeoutSeconds, $timer)) {
                    $im.GetTaskResult($task)
                }
                return
            }

            while($task = $im.WaitAny()) {
                $im.GetTaskResult($task)
            }
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
        finally {
            if($im -is [IDisposable]) {
                $im.Dispose()
            }
        }
    }
}