PSParallelPipeline.psm1

using namespace System
using namespace System.Collections
using namespace System.Collections.Generic
using namespace System.Management.Automation
using namespace System.Management.Automation.Language
using namespace System.Diagnostics
using namespace System.Management.Automation.Host
using namespace System.Threading
using namespace System.Management.Automation.Runspaces
using namespace System.Text
#Region '.\private\CommandCompleter.ps1' 0
#using namespace System
#using namespace System.Collections
#using namespace System.Collections.Generic
#using namespace System.Management.Automation
#using namespace System.Management.Automation.Language

class CommandCompleter : IArgumentCompleter {
    [IEnumerable[CompletionResult]] CompleteArgument(
        [string] $commandName,
        [string] $parameterName,
        [string] $wordToComplete,
        [CommandAst] $commandAst,
        [IDictionary] $fakeBoundParameters) {

        return [CompletionCompleters]::CompleteCommand(
            $wordToComplete,
            [NullString]::Value,
            [CommandTypes]::Function)
    }
}
#EndRegion '.\private\CommandCompleter.ps1' 21
#Region '.\private\InvocationManager.ps1' 0
#using namespace System.Collections.Generic
#using namespace System.Diagnostics
#using namespace System.Management.Automation
#using namespace System.Management.Automation.Host
#using namespace System.Threading

class InvocationManager : IDisposable {
    [int] $ThrottleLimit
    [PSHost] $PSHost
    [initialsessionstate] $InitialSessionState
    [Stack[runspace]] $Runspaces = [Stack[runspace]]::new()
    [List[PSParallelTask]] $Tasks = [List[PSParallelTask]]::new()
    [bool] $UseNewRunspace
    hidden [int] $TotalMade

    InvocationManager(
        [int] $ThrottleLimit,
        [PSHost] $PSHost,
        [initialsessionstate] $InitialSessionState,
        [bool] $UseNewRunspace
    ) {
        $this.ThrottleLimit = $ThrottleLimit
        $this.PSHost = $PSHost
        $this.InitialSessionState = $InitialSessionState
        $this.UseNewRunspace = $UseNewRunspace
    }

    [runspace] TryGet() {
        if ($this.Runspaces.Count) {
            return $this.Runspaces.Pop()
        }

        if ($this.TotalMade -ge $this.ThrottleLimit) {
            return $null
        }

        $this.TotalMade++
        return $this.CreateRunspace()
    }

    [runspace] CreateRunspace() {
        $runspace = [runspacefactory]::CreateRunspace($this.PSHost, $this.InitialSessionState)
        $runspace.Open()
        return $runspace
    }

    [PSParallelTask] WaitAny() {
        if (-not $this.Tasks.Count) {
            return $null
        }

        do {
            $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200)
        }
        while ($id -eq [WaitHandle]::WaitTimeout)

        return $this.Tasks[$id]
    }

    [PSParallelTask] WaitAny([int] $TimeoutSeconds, [Stopwatch] $Timer) {
        if (-not $this.Tasks.Count) {
            return $null
        }

        do {
            if ($TimeoutSeconds -lt $Timer.Elapsed.TotalSeconds) {
                $this.Tasks[0].Stop()
                return $this.Tasks[0]
            }

            $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200)
        }
        while ($id -eq [WaitHandle]::WaitTimeout)

        return $this.Tasks[$id]
    }

    [void] GetTaskResult([PSParallelTask] $Task) {
        try {
            $this.Tasks.Remove($Task)
            $this.Release($Task.GetRunspace())
            $Task.EndInvoke()
        }
        finally {
            if ($Task -is [IDisposable]) {
                $Task.Dispose()
            }
        }
    }

    [void] Release([runspace] $runspace) {
        if ($this.UseNewRunspace) {
            $runspace.Dispose()
            $runspace = $this.CreateRunspace()
        }

        $this.Runspaces.Push($runspace)
    }

    [void] AddTask([PSParallelTask] $Task) {
        $this.Tasks.Add($Task)
        $Task.Run()
    }

    [void] Dispose() {
        while ($runspace = $this.TryGet()) {
            $runspace.Dispose()
        }
    }
}
#EndRegion '.\private\InvocationManager.ps1' 111
#Region '.\private\PSParallelTask.ps1' 0
#using namespace System.Management.Automation

class PSParallelTask : IDisposable {
    [powershell] $Instance
    [IAsyncResult] $AsyncResult
    [PSCmdlet] $Cmdlet

    PSParallelTask([scriptblock] $Action, [object] $PipelineObject, [PSCmdlet] $Cmdlet) {
        # Thanks to Patrick Meinecke for his help here.
        # https://github.com/SeeminglyScience/
        $this.Cmdlet = $Cmdlet
        $this.Instance = [powershell]::Create().
            AddScript({
                param([scriptblock] $Action, [object] $Context)

                $Action.InvokeWithContext($null, [psvariable]::new('_', $Context))
            }).
            AddParameters(@{
                Action  = $Action.Ast.GetScriptBlock()
                Context = $PipelineObject
            })
    }

    [PSParallelTask] AddUsingStatements([hashtable] $UsingStatements) {
        if ($UsingStatements.Count) {
            # Credits to Jordan Borean for his help here.
            # https://github.com/jborean93
            $this.Instance.AddParameters(@{ '--%' = $UsingStatements })
        }
        return $this
    }

    [void] Run() {
        $this.AsyncResult = $this.Instance.BeginInvoke()
    }

    [void] EndInvoke() {
        try {
            $this.Cmdlet.WriteObject($this.Instance.EndInvoke($this.AsyncResult), $true)
            $this.GetErrors()
        }
        catch [PipelineStoppedException] {
            $this.Cmdlet.WriteError($_)
        }
        catch {
            $this.Cmdlet.WriteError($_)
        }
    }

    [void] Stop() {
        $this.Instance.Stop()
    }

    [void] GetErrors() {
        if ($this.Instance.HadErrors) {
            foreach ($err in $this.Instance.Streams.Error) {
                $this.Cmdlet.WriteError($err)
            }
        }
    }

    [PSParallelTask] AssociateWith([runspace] $Runspace) {
        $this.Instance.Runspace = $Runspace
        return $this
    }

    [runspace] GetRunspace() {
        return $this.Instance.Runspace
    }

    [void] Dispose() {
        $this.Instance.Dispose()
    }
}
#EndRegion '.\private\PSParallelTask.ps1' 75
#Region '.\public\Invoke-Parallel.ps1' 0
#using namespace System.Management.Automation
#using namespace System.Management.Automation.Language
#using namespace System.Management.Automation.Runspaces
#using namespace System.Text

# .ExternalHelp PSParallelPipeline-help.xml
function Invoke-Parallel {
    [CmdletBinding(PositionalBinding = $false)]
    [Alias('parallel')]
    param(
        [Parameter(Mandatory, ValueFromPipeline)]
        [object] $InputObject,

        [Parameter(Mandatory, Position = 0)]
        [scriptblock] $ScriptBlock,

        [Parameter()]
        [ValidateRange(1, 63)]
        [int] $ThrottleLimit = 5,

        [Parameter()]
        [ValidateNotNullOrEmpty()]
        [hashtable] $Variables,

        [Parameter()]
        [ValidateNotNullOrEmpty()]
        [ArgumentCompleter([CommandCompleter])]
        [string[]] $Functions,

        [Parameter()]
        [switch] $UseNewRunspace,

        [Parameter()]
        [ValidateRange(1, [int]::MaxValue)]
        [int] $TimeoutSeconds
    )

    begin {
        try {
            $iss = [initialsessionstate]::CreateDefault2()

            foreach ($key in $Variables.PSBase.Keys) {
                if ($Variables[$key] -is [scriptblock]) {
                    $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
                        [PSArgumentException]::new('Passed-in script block variables are not supported.'),
                        'VariableCannotBeScriptBlock',
                        [ErrorCategory]::InvalidType,
                        $Variables[$key]))
                }

                $iss.Variables.Add(
                    [SessionStateVariableEntry]::new($key, $Variables[$key], ''))
            }

            foreach ($function in $Functions) {
                $def = $PSCmdlet.InvokeCommand.GetCommand(
                    $function, [CommandTypes]::Function)

                $iss.Commands.Add(
                    [SessionStateFunctionEntry]::new($function, $def.Definition))
            }

            $usingParams = @{}

            # Thanks to mklement0 for catching up a bug here.
            # https://github.com/mklement0
            foreach ($usingstatement in $ScriptBlock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) {
                $varText = $usingstatement.Extent.Text
                $varPath = $usingstatement.SubExpression.VariablePath.UserPath

                $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText.ToLowerInvariant()))
                if (-not $usingParams.ContainsKey($key)) {
                    $value = $PSCmdlet.SessionState.PSVariable.GetValue($varPath)

                    if ($value -is [scriptblock]) {
                        $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
                            [PSArgumentException]::new('Passed-in script block variables are not supported.'),
                            'VariableCannotBeScriptBlock',
                            [ErrorCategory]::InvalidType,
                            $value))
                    }

                    $usingParams.Add($key, $value)
                }
            }

            $im = [InvocationManager]::new($ThrottleLimit, $Host, $iss, $UseNewRunspace.IsPresent)

            if ($withTimeout = $PSBoundParameters.ContainsKey('TimeoutSeconds')) {
                $timer = [Stopwatch]::StartNew()
            }
        }
        catch {
            $PSCmdlet.ThrowTerminatingError($_)
        }
    }

    process {
        try {
            do {
                if ($runspace = $im.TryGet()) {
                    continue
                }

                if ($withTimeout) {
                    if ($task = $im.WaitAny($TimeoutSeconds, $timer)) {
                        $im.GetTaskResult($task)
                    }
                    continue
                }

                if ($task = $im.WaitAny()) {
                    $im.GetTaskResult($task)
                }
            }
            until($runspace -or $TimeoutSeconds -lt $timer.Elapsed.TotalSeconds)

            if ($TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) {
                return
            }

            $im.AddTask([PSParallelTask]::new($ScriptBlock, $InputObject, $PSCmdlet).
                AssociateWith($runspace).
                AddUsingStatements($usingParams))
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
    }

    end {
        try {
            if ($withTimeout) {
                while ($task = $im.WaitAny($TimeoutSeconds, $timer)) {
                    $im.GetTaskResult($task)
                }
                return
            }

            while ($task = $im.WaitAny()) {
                $im.GetTaskResult($task)
            }
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
        finally {
            if ($im -is [IDisposable]) {
                $im.Dispose()
            }
        }
    }
}
#EndRegion '.\public\Invoke-Parallel.ps1' 154