PSParallelPipeline.psm1
using namespace System using namespace System.Collections using namespace System.Collections.Generic using namespace System.Management.Automation using namespace System.Management.Automation.Language using namespace System.Diagnostics using namespace System.Management.Automation.Host using namespace System.Threading using namespace System.Management.Automation.Runspaces using namespace System.Text #Region '.\private\CommandCompleter.ps1' 0 #using namespace System #using namespace System.Collections #using namespace System.Collections.Generic #using namespace System.Management.Automation #using namespace System.Management.Automation.Language class CommandCompleter : IArgumentCompleter { [IEnumerable[CompletionResult]] CompleteArgument( [string] $commandName, [string] $parameterName, [string] $wordToComplete, [CommandAst] $commandAst, [IDictionary] $fakeBoundParameters) { return [CompletionCompleters]::CompleteCommand( $wordToComplete, [NullString]::Value, [CommandTypes]::Function) } } #EndRegion '.\private\CommandCompleter.ps1' 21 #Region '.\private\InvocationManager.ps1' 0 #using namespace System.Collections.Generic #using namespace System.Diagnostics #using namespace System.Management.Automation #using namespace System.Management.Automation.Host #using namespace System.Threading class InvocationManager : IDisposable { [int] $ThrottleLimit [PSHost] $PSHost [initialsessionstate] $InitialSessionState [Stack[runspace]] $Runspaces = [Stack[runspace]]::new() [List[PSParallelTask]] $Tasks = [List[PSParallelTask]]::new() [bool] $UseNewRunspace hidden [int] $TotalMade InvocationManager( [int] $ThrottleLimit, [PSHost] $PSHost, [initialsessionstate] $InitialSessionState, [bool] $UseNewRunspace ) { $this.ThrottleLimit = $ThrottleLimit $this.PSHost = $PSHost $this.InitialSessionState = $InitialSessionState $this.UseNewRunspace = $UseNewRunspace } [runspace] TryGet() { if ($this.Runspaces.Count) { return $this.Runspaces.Pop() } if ($this.TotalMade -ge $this.ThrottleLimit) { return $null } $this.TotalMade++ return $this.CreateRunspace() } [runspace] CreateRunspace() { $runspace = [runspacefactory]::CreateRunspace($this.PSHost, $this.InitialSessionState) $runspace.Open() return $runspace } [PSParallelTask] WaitAny() { if (-not $this.Tasks.Count) { return $null } do { $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200) } while ($id -eq [WaitHandle]::WaitTimeout) return $this.Tasks[$id] } [PSParallelTask] WaitAny([int] $TimeoutSeconds, [Stopwatch] $Timer) { if (-not $this.Tasks.Count) { return $null } do { if ($TimeoutSeconds -lt $Timer.Elapsed.TotalSeconds) { $this.Tasks[0].Stop() return $this.Tasks[0] } $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200) } while ($id -eq [WaitHandle]::WaitTimeout) return $this.Tasks[$id] } [void] GetTaskResult([PSParallelTask] $Task) { try { $this.Tasks.Remove($Task) $this.Release($Task.GetRunspace()) $Task.EndInvoke() } finally { if ($Task -is [IDisposable]) { $Task.Dispose() } } } [void] Release([runspace] $runspace) { if ($this.UseNewRunspace) { $runspace.Dispose() $runspace = $this.CreateRunspace() } $this.Runspaces.Push($runspace) } [void] AddTask([PSParallelTask] $Task) { $this.Tasks.Add($Task) $Task.Run() } [void] Dispose() { while ($runspace = $this.TryGet()) { $runspace.Dispose() } } } #EndRegion '.\private\InvocationManager.ps1' 111 #Region '.\private\PSParallelTask.ps1' 0 #using namespace System.Management.Automation class PSParallelTask : IDisposable { [powershell] $Instance [IAsyncResult] $AsyncResult [PSCmdlet] $Cmdlet PSParallelTask([scriptblock] $Action, [object] $PipelineObject, [PSCmdlet] $Cmdlet) { # Thanks to Patrick Meinecke for his help here. # https://github.com/SeeminglyScience/ $this.Cmdlet = $Cmdlet $this.Instance = [powershell]::Create(). AddScript({ param([scriptblock] $Action, [object] $Context) $Action.InvokeWithContext($null, [psvariable]::new('_', $Context)) }). AddParameters(@{ Action = $Action.Ast.GetScriptBlock() Context = $PipelineObject }) } [PSParallelTask] AddUsingStatements([hashtable] $UsingStatements) { if ($UsingStatements.Count) { # Credits to Jordan Borean for his help here. # https://github.com/jborean93 $this.Instance.AddParameters(@{ '--%' = $UsingStatements }) } return $this } [void] Run() { $this.AsyncResult = $this.Instance.BeginInvoke() } [void] EndInvoke() { try { $this.Cmdlet.WriteObject($this.Instance.EndInvoke($this.AsyncResult), $true) $this.GetErrors() } catch [PipelineStoppedException] { $this.Cmdlet.WriteError($_) } catch { $this.Cmdlet.WriteError($_) } } [void] Stop() { $this.Instance.Stop() } [void] GetErrors() { if ($this.Instance.HadErrors) { foreach ($err in $this.Instance.Streams.Error) { $this.Cmdlet.WriteError($err) } } } [PSParallelTask] AssociateWith([runspace] $Runspace) { $this.Instance.Runspace = $Runspace return $this } [runspace] GetRunspace() { return $this.Instance.Runspace } [void] Dispose() { $this.Instance.Dispose() } } #EndRegion '.\private\PSParallelTask.ps1' 75 #Region '.\public\Invoke-Parallel.ps1' 0 #using namespace System.Management.Automation.Language #using namespace System.Management.Automation.Runspaces #using namespace System.Text # .ExternalHelp PSParallelPipeline-help.xml function Invoke-Parallel { [CmdletBinding(PositionalBinding = $false)] [Alias('parallel')] param( [Parameter(Mandatory, ValueFromPipeline)] [object] $InputObject, [Parameter(Mandatory, Position = 0)] [scriptblock] $ScriptBlock, [Parameter()] [ValidateRange(1, 63)] [int] $ThrottleLimit = 5, [Parameter()] [ValidateNotNullOrEmpty()] [hashtable] $Variables, [Parameter()] [ValidateNotNullOrEmpty()] [ArgumentCompleter([CommandCompleter])] [string[]] $Functions, [Parameter()] [switch] $UseNewRunspace, [Parameter()] [ValidateRange(1, [int]::MaxValue)] [int] $TimeoutSeconds ) begin { try { $iss = [initialsessionstate]::CreateDefault2() foreach ($key in $Variables.PSBase.Keys) { $iss.Variables.Add( [SessionStateVariableEntry]::new($key, $Variables[$key], '')) } foreach ($function in $Functions) { $def = $PSCmdlet.InvokeCommand.GetCommand( $function, [System.Management.Automation.CommandTypes]::Function) $iss.Commands.Add( [SessionStateFunctionEntry]::new($function, $def.Definition)) } $usingParams = @{} # Thanks to mklement0 for catching up a bug here. # https://github.com/mklement0 foreach ($usingstatement in $ScriptBlock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) { $varText = $usingstatement.Extent.Text $varPath = $usingstatement.SubExpression.VariablePath.UserPath $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText.ToLowerInvariant())) if (-not $usingParams.ContainsKey($key)) { $usingParams.Add($key, $PSCmdlet.SessionState.PSVariable.GetValue($varPath)) } } $im = [InvocationManager]::new($ThrottleLimit, $Host, $iss, $UseNewRunspace.IsPresent) if ($withTimeout = $PSBoundParameters.ContainsKey('TimeoutSeconds')) { $timer = [Stopwatch]::StartNew() } } catch { $PSCmdlet.ThrowTerminatingError($_) } } process { try { do { if ($runspace = $im.TryGet()) { continue } if ($withTimeout) { if ($task = $im.WaitAny($TimeoutSeconds, $timer)) { $im.GetTaskResult($task) } continue } if ($task = $im.WaitAny()) { $im.GetTaskResult($task) } } until($runspace -or $TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) if ($TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) { return } $im.AddTask([PSParallelTask]::new($ScriptBlock, $InputObject, $PSCmdlet). AssociateWith($runspace). AddUsingStatements($usingParams)) } catch { $PSCmdlet.WriteError($_) } } end { try { if ($withTimeout) { while ($task = $im.WaitAny($TimeoutSeconds, $timer)) { $im.GetTaskResult($task) } return } while ($task = $im.WaitAny()) { $im.GetTaskResult($task) } } catch { $PSCmdlet.WriteError($_) } finally { if ($im -is [IDisposable]) { $im.Dispose() } } } } #EndRegion '.\public\Invoke-Parallel.ps1' 136 |