PSConsumerPipeline.psm1
Function Invoke-ConsumerPipeline { [CmdletBinding()] param( [Parameter(ValueFromPipeline=$true)][psobject[]]$Item, [Parameter()][ScriptBlock]$ProcessBlock, [Parameter()][int]$MaxWorkers=8, [Parameter()][int]$OutputBufferSize=100 ) begin{ $InputQueue = [System.Collections.Concurrent.BlockingCollection[PSObject]]::new($OutputBufferSize) $OutputQueue = [System.Collections.Concurrent.BlockingCollection[PSObject]]::new() $RunspacePool = [runspacefactory]::CreateRunspacePool(1,$MaxWorkers,$Host) $RunspacePool.Open() $Runspaces = @(1..$MaxWorkers)|foreach-object { write-verbose "Staring worker $_" $Runspace = [PowerShell]::Create().AddScript([ScriptBlock]{ param( [System.Collections.Concurrent.BlockingCollection[PSObject]]$inputCollection, [System.Collections.Concurrent.BlockingCollection[PSObject]]$outputCollection, [ScriptBlock]$ProcessBlock ) $inputCollection.GetConsumingEnumerable()|foreach-object{ $x=$_ try{ $result = $ProcessBlock.Ast.GetScriptBlock().Invoke($x) $outputCollection.Add((new-object psobject -Property @{ Input=$x Result=$result Error=$null IsSuccessful=$true })) } catch{ $outputCollection.Add((new-object psobject -Property @{ Input=$x Error=$_ IsSuccessful=$false })) } } }).AddArgument($InputQueue).AddArgument($OutputQueue).AddArgument($ProcessBlock) $Runspace.RunspacePool = $RunspacePool $Handle = $Runspace.BeginInvoke() New-Object psobject -Property @{ Runspace = $Runspace Handle = $Handle } } } process{ $Item|ForEach-Object{ [PSObject]$x=$null if($OutputQueue.Count -ge $OutputBufferSize){ while($OutputQueue.TryTake([ref]$x,10)) { $x|Write-Output } } write-verbose "Adding item $_" $InputQueue.Add($_) write-verbose "Item $_ added" } } end{ write-verbose "Complete Adding: Remaining items: $($InputQueue.Count)" $InputQueue.CompleteAdding() write-verbose "Completing $($Runspaces.Count) workers." $Runspaces|foreach-object { $_.Runspace.EndInvoke($_.Handle) $_.Runspace.Dispose() } $RunspacePool.Close() $RunspacePool.Dispose() write-verbose "Remaining items: $($InputQueue.Count)" $OutputQueue.CompleteAdding() $OutputQueue.GetConsumingEnumerable()|Write-Output } } |