Commands/Get-WebSocket.ps1

function Get-WebSocket {
    <#
    .SYNOPSIS
        WebSockets in PowerShell.
    .DESCRIPTION
        Get-WebSocket allows you to connect to a websocket and handle the output.
    .EXAMPLE
        # Create a WebSocket job that connects to a WebSocket and outputs the results.
        Get-WebSocket -WebSocketUri "wss://localhost:9669"
    .EXAMPLE
        # Get is the default verb, so we can just say WebSocket.
        websocket wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post
    .EXAMPLE
        websocket jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post -Tail |
            Foreach-Object {
                $in = $_
                if ($in.commit.record.text -match '[\p{IsHighSurrogates}\p{IsLowSurrogates}]+') {
                    Write-Host $matches.0 -NoNewline
                }
            }
    .EXAMPLE
        $emojiPattern = '[\p{IsHighSurrogates}\p{IsLowSurrogates}\p{IsVariationSelectors}\p{IsCombiningHalfMarks}]+)'
        websocket jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post -Tail |
            Foreach-Object {
                $in = $_
                if ($in.commit.record.text -match "(?>(?:$emojiPattern|\#\w+)") {
                    Write-Host $matches.0 -NoNewline
                }
            }
    #>

    [CmdletBinding(PositionalBinding=$false)]
    param(
    # The Uri of the WebSocket to connect to.
    [Parameter(Position=0,ValueFromPipelineByPropertyName)]
    [Alias('Url','Uri')]
    [uri]$WebSocketUri,

    # A ScriptBlock that will handle the output of the WebSocket.
    [ScriptBlock]
    $Handler,

    # Any variables to declare in the WebSocket job.
    [Collections.IDictionary]
    $Variable = @{},

    # The name of the WebSocket job.
    [string]
    $Name,

    # The script to run when the WebSocket job starts.
    [ScriptBlock]
    $InitializationScript = {},

    # The buffer size. Defaults to 16kb.
    [int]
    $BufferSize = 16kb,

    # The ScriptBlock to run after connection to a websocket.
    # This can be useful for making any initial requests.
    [ScriptBlock]
    $OnConnect,

    # The ScriptBlock to run when an error occurs.
    [ScriptBlock]
    $OnError,

    # The ScriptBlock to run when the WebSocket job outputs an object.
    [ScriptBlock]
    $OnOutput,

    # The Scriptblock to run when the WebSocket job produces a warning.
    [ScriptBlock]
    $OnWarning,

    # If set, will watch the output of the WebSocket job, outputting results continuously instead of outputting a websocket job.
    [Alias('Tail')]
    [switch]
    $Watch,

    # The maximum time to wait for a connection to be established.
    # By default, this is 7 seconds.
    [TimeSpan]
    $ConnectionTimeout = '00:00:07',

    # The Runspace where the handler should run.
    # Runspaces allow you to limit the scope of the handler.
    [Runspace]
    $Runspace,

    # The RunspacePool where the handler should run.
    # RunspacePools allow you to limit the scope of the handler to a pool of runspaces.
    [Management.Automation.Runspaces.RunspacePool]
    [Alias('Pool')]
    $RunspacePool
    )

    begin {
        $SocketJob = {
            param([Collections.IDictionary]$Variable)
            
            foreach ($keyValue in $variable.GetEnumerator()) {
                $ExecutionContext.SessionState.PSVariable.Set($keyValue.Key, $keyValue.Value)
            }

            if ((-not $WebSocketUri) -or $webSocket) {
                throw "No WebSocketUri"
            }

            if (-not $WebSocketUri.Scheme) {
                $WebSocketUri = [uri]"wss://$WebSocketUri"
            }

            if (-not $BufferSize) {
                $BufferSize = 16kb
            }

            $CT = [Threading.CancellationToken]::None
            
            if (-not $webSocket) {
                $ws = [Net.WebSockets.ClientWebSocket]::new()
                $null = $ws.ConnectAsync($WebSocketUri, $CT).Wait()
            } else {
                $ws = $WebSocket
            }

            $Variable.WebSocket = $ws
                                    
            
            while ($true) {
                if ($ws.State -ne 'Open') {break }
                $Buf = [byte[]]::new($BufferSize)
                $Seg = [ArraySegment[byte]]::new($Buf)
                $null = $ws.ReceiveAsync($Seg, $CT).Wait()
                $JS = $OutputEncoding.GetString($Buf, 0, $Buf.Count)
                if ([string]::IsNullOrWhitespace($JS)) { continue }
                try { 
                    $webSocketMessage = ConvertFrom-Json $JS
                    if ($handler) {
                        $psCmd =  
                            if ($runspace.LanguageMode -eq 'NoLanguage' -or 
                                $runspacePool.InitialSessionState.LanguageMode -eq 'NoLanguage') {
                                $handler.GetPowerShell()
                            } elseif ($Runspace -or $RunspacePool) {
                                [PowerShell]::Create().AddScript($handler)
                            }
                        if ($psCmd) {
                            if ($Runspace) {
                                $psCmd.Runspace = $Runspace
                            } elseif ($RunspacePool) {
                                $psCmd.RunspacePool = $RunspacePool
                            }
                        } else {
                            $webSocketMessage | . $handler
                        }
                        
                    } else {
                        $webSocketMessage
                    }                    
                } catch { 
                    Write-Error $_
                }
            }
        }                        
    }

    process {
        foreach ($keyValuePair in $PSBoundParameters.GetEnumerator()) {
            $Variable[$keyValuePair.Key] = $keyValuePair.Value
        }
        $webSocketJob = 
            if ($WebSocketUri) {
                if (-not $name) {
                    $Name = $WebSocketUri
                }
                
                Start-ThreadJob -ScriptBlock $SocketJob -Name $Name -InitializationScript $InitializationScript -ArgumentList $Variable
            } elseif ($WebSocket) {
                if (-not $name) {
                    $name = "websocket"
                }
                Start-ThreadJob -ScriptBlock $SocketJob -Name $Name -InitializationScript $InitializationScript -ArgumentList $Variable
            }

        $subscriptionSplat = @{
            EventName = 'DataAdded'
            MessageData = $webSocketJob
            SupportEvent = $true
        }
        $eventSubscriptions = @(
            if ($OnOutput) {
                Register-ObjectEvent @subscriptionSplat -InputObject $webSocketJob.Output -Action $OnOutput
            }
            if ($OnError) {
                Register-ObjectEvent @subscriptionSplat -InputObject $webSocketJob.Error -Action $OnError
            }
            if ($OnWarning) {
                Register-ObjectEvent @subscriptionSplat -InputObject $webSocketJob.Warning -Action $OnWarning
            }
        )
        if ($eventSubscriptions) {
            $variable['EventSubscriptions'] = $eventSubscriptions
        }

        $webSocketConnectTimeout = [DateTime]::Now + $ConnectionTimeout
        while (-not $variable['WebSocket'] -and 
            ([DateTime]::Now -lt $webSocketConnectTimeout)) {
            Start-Sleep -Milliseconds 0
        }
        
        foreach ($keyValuePair in $Variable.GetEnumerator()) {
            $webSocketJob.psobject.properties.add(
                [psnoteproperty]::new($keyValuePair.Key, $keyValuePair.Value), $true
            )
        }
        $webSocketJob.pstypenames.insert(0, 'WebSocketJob')
        if ($Watch) {
            do {                
                $webSocketJob | Receive-Job
                Start-Sleep -Milliseconds (
                    7, 11, 13, 17, 19, 23 | Get-Random
                ) 
            } while ($webSocketJob.State -in 'Running','NotStarted')
        } else {
            $webSocketJob
        }        
    }
}