PSParallelPipeline.psm1
|
using namespace System using namespace System.Collections using namespace System.Collections.Generic using namespace System.Management.Automation using namespace System.Management.Automation.Language using namespace System.Diagnostics using namespace System.Management.Automation.Host using namespace System.Text using namespace System.Threading using namespace System.Management.Automation.Runspaces #Region '.\private\CommandCompleter.ps1' 0 #using namespace System #using namespace System.Collections #using namespace System.Collections.Generic #using namespace System.Management.Automation #using namespace System.Management.Automation.Language class CommandCompleter : IArgumentCompleter { [IEnumerable[CompletionResult]] CompleteArgument( [string] $commandName, [string] $parameterName, [string] $wordToComplete, [CommandAst] $commandAst, [IDictionary] $fakeBoundParameters) { return [CompletionCompleters]::CompleteCommand( $wordToComplete, [NullString]::Value, [CommandTypes]::Function) } } #EndRegion '.\private\CommandCompleter.ps1' 21 #Region '.\private\InvocationManager.ps1' 0 #using namespace System.Collections.Generic #using namespace System.Diagnostics #using namespace System.Management.Automation #using namespace System.Management.Automation.Host #using namespace System.Management.Automation.Language #using namespace System.Text #using namespace System.Threading class InvocationManager : IDisposable { [int] $ThrottleLimit [PSHost] $PSHost [initialsessionstate] $InitialSessionState [Stack[runspace]] $Runspaces = [Stack[runspace]]::new() [List[PSParallelTask]] $Tasks = [List[PSParallelTask]]::new() [bool] $UseNewRunspace hidden [int] $TotalMade InvocationManager( [int] $ThrottleLimit, [PSHost] $PSHost, [initialsessionstate] $InitialSessionState, [bool] $UseNewRunspace ) { $this.ThrottleLimit = $ThrottleLimit $this.PSHost = $PSHost $this.InitialSessionState = $InitialSessionState $this.UseNewRunspace = $UseNewRunspace } [runspace] TryGet() { if ($this.Runspaces.Count) { return $this.Runspaces.Pop() } if ($this.TotalMade -ge $this.ThrottleLimit) { return $null } $this.TotalMade++ return $this.CreateRunspace() } [runspace] CreateRunspace() { $runspace = [runspacefactory]::CreateRunspace($this.PSHost, $this.InitialSessionState) $runspace.Open() return $runspace } [PSParallelTask] WaitAny() { if (-not $this.Tasks.Count) { return $null } do { $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200) } while ($id -eq [WaitHandle]::WaitTimeout) return $this.Tasks[$id] } [PSParallelTask] WaitAny([int] $TimeoutSeconds, [Stopwatch] $Timer) { if (-not $this.Tasks.Count) { return $null } do { if ($TimeoutSeconds -lt $Timer.Elapsed.TotalSeconds) { $this.Tasks[0].Stop() return $this.Tasks[0] } $id = [WaitHandle]::WaitAny($this.Tasks.AsyncResult.AsyncWaitHandle, 200) } while ($id -eq [WaitHandle]::WaitTimeout) return $this.Tasks[$id] } [void] GetTaskResult([PSParallelTask] $Task) { try { $this.Tasks.Remove($Task) $this.Release($Task.GetRunspace()) $Task.EndInvoke() } finally { if ($Task -is [IDisposable]) { $Task.Dispose() } } } [void] Release([runspace] $runspace) { if ($this.UseNewRunspace) { $runspace.Dispose() $runspace = $this.CreateRunspace() } $this.Runspaces.Push($runspace) } [void] AddTask([PSParallelTask] $Task) { $this.Tasks.Add($Task) $Task.Run() } [void] Dispose() { while ($runspace = $this.TryGet()) { $runspace.Dispose() } } static [hashtable] GetUsingStatements([scriptblock] $scriptblock, [PSCmdlet] $cmdlet) { $usingParams = @{} foreach ($usingstatement in $scriptblock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) { $variableAst = [UsingExpressionAst]::ExtractUsingVariable($usingstatement) $varPath = $variableAst.VariablePath.UserPath $varText = $usingstatement.ToString() if ($usingstatement.SubExpression -is [VariableExpressionAst]) { $varText = $varText.ToLowerInvariant() } $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText)) if ($usingParams.ContainsKey($key)) { continue } $value = $cmdlet.SessionState.PSVariable.GetValue($varPath) if ($value -is [scriptblock]) { $cmdlet.ThrowTerminatingError([ErrorRecord]::new( [PSArgumentException]::new('Passed-in script block variables are not supported.'), 'VariableCannotBeScriptBlock', [ErrorCategory]::InvalidType, $value)) } if ($usingstatement.SubExpression -isnot [VariableExpressionAst]) { [Stack[Ast]] $subexpressionStack = $usingstatement.SubExpression.FindAll({ $args[0] -is [IndexExpressionAst] -or $args[0] -is [MemberExpressionAst] }, $false) while ($subexpressionStack.Count) { $subexpression = $subexpressionStack.Pop() if ($subexpression -is [IndexExpressionAst]) { $idx = $subexpression.Index.SafeGetValue() $value = $value[$idx] continue } if ($subexpression -is [MemberExpressionAst]) { $member = $subexpression.Member.SafeGetValue() $value = $value.$member } } } $usingParams.Add($key, $value) } return $usingParams } } #EndRegion '.\private\InvocationManager.ps1' 167 #Region '.\private\PSParallelTask.ps1' 0 #using namespace System.Management.Automation class PSParallelTask : IDisposable { [powershell] $Instance [IAsyncResult] $AsyncResult [PSCmdlet] $Cmdlet PSParallelTask([scriptblock] $Action, [object] $PipelineObject, [PSCmdlet] $Cmdlet) { # Thanks to Patrick Meinecke for his help here. # https://github.com/SeeminglyScience/ $this.Cmdlet = $Cmdlet $this.Instance = [powershell]::Create(). AddScript({ param([scriptblock] $Action, [object] $Context) $Action.InvokeWithContext($null, [psvariable]::new('_', $Context)) }). AddParameters(@{ Action = $Action.Ast.GetScriptBlock() Context = $PipelineObject }) } [PSParallelTask] AddUsingStatements([hashtable] $UsingStatements) { if ($UsingStatements.Count) { # Credits to Jordan Borean for his help here. # https://github.com/jborean93 $this.Instance.AddParameters(@{ '--%' = $UsingStatements }) } return $this } [void] Run() { $this.AsyncResult = $this.Instance.BeginInvoke() } [void] EndInvoke() { try { $this.Cmdlet.WriteObject($this.Instance.EndInvoke($this.AsyncResult), $true) $this.GetErrors() } catch [PipelineStoppedException] { $this.Cmdlet.WriteError($_) } catch { $this.Cmdlet.WriteError($_) } } [void] Stop() { $this.Instance.Stop() } [void] GetErrors() { if ($this.Instance.HadErrors) { foreach ($err in $this.Instance.Streams.Error) { $this.Cmdlet.WriteError($err) } } } [PSParallelTask] AssociateWith([runspace] $Runspace) { $this.Instance.Runspace = $Runspace return $this } [runspace] GetRunspace() { return $this.Instance.Runspace } [void] Dispose() { $this.Instance.Dispose() } } #EndRegion '.\private\PSParallelTask.ps1' 75 #Region '.\public\Invoke-Parallel.ps1' 0 #using namespace System.Management.Automation #using namespace System.Management.Automation.Runspaces # .ExternalHelp PSParallelPipeline-help.xml function Invoke-Parallel { [CmdletBinding(PositionalBinding = $false)] [Alias('parallel')] param( [Parameter(Mandatory, ValueFromPipeline)] [object] $InputObject, [Parameter(Mandatory, Position = 0)] [scriptblock] $ScriptBlock, [Parameter()] [ValidateRange(1, 63)] [int] $ThrottleLimit = 5, [Parameter()] [ValidateNotNullOrEmpty()] [hashtable] $Variables, [Parameter()] [ValidateNotNullOrEmpty()] [ArgumentCompleter([CommandCompleter])] [string[]] $Functions, [Parameter()] [switch] $UseNewRunspace, [Parameter()] [ValidateRange(1, [int]::MaxValue)] [int] $TimeoutSeconds ) begin { $usingParams = [InvocationManager]::GetUsingStatements( $ScriptBlock, $PSCmdlet) try { $iss = [initialsessionstate]::CreateDefault2() foreach ($key in $Variables.PSBase.Keys) { if ($Variables[$key] -is [scriptblock]) { $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new( [PSArgumentException]::new('Passed-in script block variables are not supported.'), 'VariableCannotBeScriptBlock', [ErrorCategory]::InvalidType, $Variables[$key])) } $iss.Variables.Add( [SessionStateVariableEntry]::new($key, $Variables[$key], '')) } foreach ($function in $Functions) { $def = $PSCmdlet.InvokeCommand.GetCommand( $function, [CommandTypes]::Function) $iss.Commands.Add( [SessionStateFunctionEntry]::new($function, $def.Definition)) } $im = [InvocationManager]::new($ThrottleLimit, $Host, $iss, $UseNewRunspace.IsPresent) if ($withTimeout = $PSBoundParameters.ContainsKey('TimeoutSeconds')) { $timer = [Stopwatch]::StartNew() } } catch { $PSCmdlet.ThrowTerminatingError($_) } } process { try { do { if ($runspace = $im.TryGet()) { continue } if ($withTimeout) { if ($task = $im.WaitAny($TimeoutSeconds, $timer)) { $im.GetTaskResult($task) } continue } if ($task = $im.WaitAny()) { $im.GetTaskResult($task) } } until($runspace -or $TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) if ($TimeoutSeconds -lt $timer.Elapsed.TotalSeconds) { return } $im.AddTask([PSParallelTask]::new($ScriptBlock, $InputObject, $PSCmdlet). AssociateWith($runspace). AddUsingStatements($usingParams)) } catch { $PSCmdlet.WriteError($_) } } end { try { if ($withTimeout) { while ($task = $im.WaitAny($TimeoutSeconds, $timer)) { $im.GetTaskResult($task) } return } while ($task = $im.WaitAny()) { $im.GetTaskResult($task) } } catch { $PSCmdlet.WriteError($_) } finally { if ($im -is [IDisposable]) { $im.Dispose() } } } } #EndRegion '.\public\Invoke-Parallel.ps1' 132 |