Private/ConvertTo-ConsumerCommand.ps1

function ConvertTo-ConsumerCommand
{
    [cmdletbinding()]
    param(
        [Parameter(Mandatory=$true)]
        [string[]]$BrokerList,
        [Parameter(Mandatory=$true)]
        [string]$TopicName,
        [string]$Arguments,
        [string]$ConsumerGroup,
        [switch]$FromBeginning,
        [uint64]$MessageCount = 0,
        [switch]$Persist
    )

    if ($MessageCount -gt 0 -and $Persist) {
        throw $([System.InvalidOperationException]::new('Cannot specify both `-MessageCount` and `-Persist`'))
    }

    [pscustomobject]$kafka = [pscustomobject]@{'path'=$null;'args'=$null}

    $kafka.path = Get-KafkaHome

    [string]$kafkacat = [System.IO.Path]::Combine($kafka.path, 'kafkacat')

    [bool]$is_win = $($PSVersionTable.PSVersion.Major -lt 6 -or $IsWindows)

    if ($is_win) {
        $kafkacat += '.exe'
    }

    if (Test-Path $kafkacat) {
        $kafka.path = $kafkacat
        $kafka.args = "-b $($BrokerList -join ',') -q -u"

        if ($FromBeginning) {
            if ($ConsumerGroup) {
                $kafka.args += ' -X auto.offset.reset=earliest'
            }
            else {
                $kafka.args += ' -o beginning'
            }
        }

        if ($MessageCount -gt 0) {
            $kafka.args += " -c $MessageCount -e"
        }
        elseif (-not $Persist) {
            $kafka.args += ' -e'
        }

        if ($ConsumerGroup) {
            $kafka.args += " -G $ConsumerGroup $TopicName"
        }
        else {
            $kafka.args += " -C -t $TopicName"
        }
    }
    else {
        if ($is_win) {
            $kafka.path = [System.IO.Path]::Combine($kafka.path, 'bin', 'windows', 'kafka-console-consumer.bat')
        }
        else {
            $kafka.path = [System.IO.Path]::Combine($kafka.path, 'bin', 'kafka-console-consumer.sh')
        }
        
        if (-not (Test-Path $kafka.path)) {
            Write-Error -Exception $([System.IO.FileNotFoundException]::new($kafka.path))
        }

        $kafka.args = "--bootstrap-server $($BrokerList -join ',') --topic $TopicName"

        if ($ConsumerGroup) {
            $kafka.args += " --group $ConsumerGroup"
        }

        if ($FromBeginning) {
            $kafka.args += ' --from-beginning'
        }

        if ($MessageCount -gt 0) {
            $kafka.args += " --max-messages $MessageCount"
        }
        elseif (-not $Persist) {
            $kafka.args += " --timeout-ms 10000"
        }
    }

    if ($Arguments) {
        $kafka.args += ' ' + $Arguments
    }

    Write-Verbose $("{0} {1}" -f $kafka.path, $kafka.args)

    return $kafka
}