PSParallelPipeline.psm1
using namespace System using namespace System.Collections using namespace System.Collections.Generic using namespace System.Management.Automation using namespace System.Management.Automation.Language using namespace System.Diagnostics using namespace System.Management.Automation.Host using namespace System.Text using namespace System.Threading using namespace System.Management.Automation.Runspaces #Region '.\private\CommandCompleter.ps1' 0 #using namespace System #using namespace System.Collections #using namespace System.Collections.Generic #using namespace System.Management.Automation #using namespace System.Management.Automation.Language class CommandCompleter : IArgumentCompleter { [IEnumerable[CompletionResult]] CompleteArgument( [string] $commandName, [string] $parameterName, [string] $wordToComplete, [CommandAst] $commandAst, [IDictionary] $fakeBoundParameters) { return [CompletionCompleters]::CompleteCommand( $wordToComplete, [NullString]::Value, [CommandTypes]::Function) } } #EndRegion '.\private\CommandCompleter.ps1' 21 #Region '.\private\InvocationManager.ps1' 0 #using namespace System.Collections.Generic #using namespace System.Diagnostics #using namespace System.Management.Automation #using namespace System.Management.Automation.Host #using namespace System.Management.Automation.Language #using namespace System.Text #using namespace System.Threading class InvocationManager : IDisposable { [int] $ThrottleLimit [PSHost] $PSHost [initialsessionstate] $InitialSessionState [Stack[runspace]] $Runspaces = [Stack[runspace]]::new() [List[PSParallelTask]] $Tasks = [List[PSParallelTask]]::new() [bool] $UseNewRunspace hidden [int] $TotalMade InvocationManager( [int] $ThrottleLimit, [PSHost] $PSHost, [initialsessionstate] $InitialSessionState, [bool] $UseNewRunspace ) { $this.ThrottleLimit = $ThrottleLimit $this.PSHost = $PSHost $this.InitialSessionState = $InitialSessionState $this.UseNewRunspace = $UseNewRunspace } [runspace] TryGet() { if ($this.Runspaces.Count) { return $this.Runspaces.Pop() } if ($this.TotalMade -ge $this.ThrottleLimit) { return $null } $this.TotalMade++ return $this.CreateRunspace() } [runspace] CreateRunspace() { $runspace = [runspacefactory]::CreateRunspace($this.PSHost, $this.InitialSessionState) $runspace.Open() return $runspace } [PSParallelTask] WaitAny() { if (-not $this.Tasks.Count) { return $null } do { $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200) } while ($id -eq [WaitHandle]::WaitTimeout) return $this.Tasks[$id] } [PSParallelTask] WaitAny([int] $TimeoutSeconds, [Stopwatch] $Timer) { if (-not $this.Tasks.Count) { return $null } do { if ($TimeoutSeconds -lt $Timer.Elapsed.TotalSeconds) { $this.Tasks[0].Stop() return $this.Tasks[0] } $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200) } while ($id -eq [WaitHandle]::WaitTimeout) return $this.Tasks[$id] } [void] GetTaskResult([PSParallelTask] $Task) { try { $this.Tasks.Remove($Task) $this.Release($Task.GetRunspace()) $Task.EndInvoke() } finally { if ($Task -is [IDisposable]) { $Task.Dispose() } } } [void] Release([runspace] $runspace) { if ($this.UseNewRunspace) { $runspace.Dispose() $runspace = $this.CreateRunspace() } $this.Runspaces.Push($runspace) } [void] AddTask([PSParallelTask] $Task) { $this.Tasks.Add($Task) $Task.Run() } [void] Dispose() { while ($runspace = $this.TryGet()) { $runspace.Dispose() } } static [hashtable] GetUsingStatements([scriptblock] $scriptblock, [PSCmdlet] $cmdlet) { $usingParams = @{} foreach ($usingstatement in $scriptblock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) { $variableAst = [UsingExpressionAst]::ExtractUsingVariable($usingstatement) $varPath = $variableAst.VariablePath.UserPath $varText = $usingstatement.ToString() if ($usingstatement.SubExpression -is [VariableExpressionAst]) { $varText = $varText.ToLowerInvariant() } $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText)) if ($usingParams.ContainsKey($key)) { continue } $value = $cmdlet.SessionState.PSVariable.GetValue($varPath) if ($value -is [scriptblock]) { $cmdlet.ThrowTerminatingError([ErrorRecord]::new( [PSArgumentException]::new('Passed-in script block variables are not supported.'), 'VariableCannotBeScriptBlock', [ErrorCategory]::InvalidType, $value)) } if ($usingstatement.SubExpression -is [IndexExpressionAst]) { $idx = $usingstatement.SubExpression.Index.SafeGetValue() $value = $value[$idx] } $usingParams.Add($key, $value) } return $usingParams } } #EndRegion '.\private\InvocationManager.ps1' 151 #Region '.\private\PSParallelTask.ps1' 0 #using namespace System.Management.Automation class PSParallelTask : IDisposable { [powershell] $Instance [IAsyncResult] $AsyncResult [PSCmdlet] $Cmdlet PSParallelTask([scriptblock] $Action, [object] $PipelineObject, [PSCmdlet] $Cmdlet) { # Thanks to Patrick Meinecke for his help here. # https://github.com/SeeminglyScience/ $this.Cmdlet = $Cmdlet $this.Instance = [powershell]::Create(). AddScript({ param([scriptblock] $Action, [object] $Context) $Action.InvokeWithContext($null, [psvariable]::new('_', $Context)) }). AddParameters(@{ Action = $Action.Ast.GetScriptBlock() Context = $PipelineObject }) } [PSParallelTask] AddUsingStatements([hashtable] $UsingStatements) { if ($UsingStatements.Count) { # Credits to Jordan Borean for his help here. # https://github.com/jborean93 $this.Instance.AddParameters(@{ '--%' = $UsingStatements }) } return $this } [void] Run() { $this.AsyncResult = $this.Instance.BeginInvoke() } [void] EndInvoke() { try { $this.Cmdlet.WriteObject($this.Instance.EndInvoke($this.AsyncResult), $true) $this.GetErrors() } catch [PipelineStoppedException] { $this.Cmdlet.WriteError($_) } catch { $this.Cmdlet.WriteError($_) } } [void] Stop() { $this.Instance.Stop() } [void] GetErrors() { if ($this.Instance.HadErrors) { foreach ($err in $this.Instance.Streams.Error) { $this.Cmdlet.WriteError($err) } } } [PSParallelTask] AssociateWith([runspace] $Runspace) { $this.Instance.Runspace = $Runspace return $this } [runspace] GetRunspace() { return $this.Instance.Runspace } [void] Dispose() { $this.Instance.Dispose() } } #EndRegion '.\private\PSParallelTask.ps1' 75 #Region '.\public\Invoke-Parallel.ps1' 0 #using namespace System.Management.Automation #using namespace System.Management.Automation.Language #using namespace System.Management.Automation.Runspaces #using namespace System.Text # .ExternalHelp PSParallelPipeline-help.xml function Invoke-Parallel { [CmdletBinding(PositionalBinding = $false)] [Alias('parallel')] param( [Parameter(Mandatory, ValueFromPipeline)] [object] $InputObject, [Parameter(Mandatory, Position = 0)] [scriptblock] $ScriptBlock, [Parameter()] [ValidateRange(1, 63)] [int] $ThrottleLimit = 5, [Parameter()] [ValidateNotNullOrEmpty()] [hashtable] $Variables, [Parameter()] [ValidateNotNullOrEmpty()] [ArgumentCompleter([CommandCompleter])] [string[]] $Functions, [Parameter()] [switch] $UseNewRunspace, [Parameter()] [ValidateRange(1, [int]::MaxValue)] [int] $TimeoutSeconds ) begin { $usingParams = [InvocationManager]::GetUsingStatements( $ScriptBlock, $PSCmdlet) try { $iss = [initialsessionstate]::CreateDefault2() foreach ($key in $Variables.PSBase.Keys) { if ($Variables[$key] -is [scriptblock]) { $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new( [PSArgumentException]::new('Passed-in script block variables are not supported.'), 'VariableCannotBeScriptBlock', [ErrorCategory]::InvalidType, $Variables[$key])) } $iss.Variables.Add( [SessionStateVariableEntry]::new($key, $Variables[$key], '')) } foreach ($function in $Functions) { $def = $PSCmdlet.InvokeCommand.GetCommand( $function, [CommandTypes]::Function) $iss.Commands.Add( [SessionStateFunctionEntry]::new($function, $def.Definition)) } $im = [InvocationManager]::new($ThrottleLimit, $Host, $iss, $UseNewRunspace.IsPresent) if ($withTimeout = $PSBoundParameters.ContainsKey('TimeoutSeconds')) { $timer = [Stopwatch]::StartNew() } } catch { $PSCmdlet.ThrowTerminatingError($_) } } process { try { do { if ($runspace = $im.TryGet()) { continue } if ($withTimeout) { if ($task = $im.WaitAny($TimeoutSeconds, $timer)) { $im.GetTaskResult($task) } continue } if ($task = $im.WaitAny()) { $im.GetTaskResult($task) } } until($runspace -or $TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) if ($TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) { return } $im.AddTask([PSParallelTask]::new($ScriptBlock, $InputObject, $PSCmdlet). AssociateWith($runspace). AddUsingStatements($usingParams)) } catch { $PSCmdlet.WriteError($_) } } end { try { if ($withTimeout) { while ($task = $im.WaitAny($TimeoutSeconds, $timer)) { $im.GetTaskResult($task) } return } while ($task = $im.WaitAny()) { $im.GetTaskResult($task) } } catch { $PSCmdlet.WriteError($_) } finally { if ($im -is [IDisposable]) { $im.Dispose() } } } } #EndRegion '.\public\Invoke-Parallel.ps1' 134 |