SocketIOMockHandler.cs

using GenXdev.AsyncSockets.Containers;
using GenXdev.AsyncSockets.Handlers;
using GenXdev.Buffers;
using System.Net.Sockets;
using Ninject;
using GenXdev.AsyncSockets.Logging;
 
namespace GenXdev.AsyncSockets.Mocking
{
    public class SocketIOMockHandler : ISocketIOHandler
    {
        #region Fields
 
        EventHandler OnStartHandlingServer;
        volatile SocketAsyncEventArgs saea;
        volatile int bytesTransferredTx;
        object stateLock = new object();
        internal long state = (int)SocketIOHandlerState.Idle;
        internal long oldState = (int)SocketIOHandlerState.Idle;
 
        internal SocketIOMockHandler Remote;
        DynamicBuffer RxBuffer;
        DynamicBuffer TxBuffer;
        bool IsServer;
        volatile int LocalPort = 1234;
        volatile bool Linger = false;
        IServiceLogger Logger;
        volatile int LingerTime;
        volatile int ReceiveTimeout;
        volatile int SendTimeout;
        volatile int ReceiveBufferSize;
        volatile int SendBufferSize;
        volatile bool Blocking;
        volatile bool NoDelay;
 
        #endregion
 
        #region Events
 
        public event EventHandler<SocketAsyncEventArgs> Completed;
        internal void TriggerCompleted(object sender, SocketAsyncEventArgs saea)
        {
            var args = this.saea;
            this.saea = null;
 
            if (args == null)
                throw new InvalidOperationException();
 
            var handler = Completed;
 
            if (handler != null)
            {
                handler(sender, args);
            }
        }
        #endregion
 
        #region Initialization
 
        public SocketIOMockHandler()
        {
        }
 
        public void Initialize(IKernel Kernel, SocketIOMockHandler Remote, int? localPort = null, bool IsServer = true, EventHandler OnStartHandlingServer = null)
        {
            this.Logger = Kernel.Get<IServiceLogger>();
            this.Remote = Remote;
            Remote.Remote = this;
            this.TxBuffer = Remote.RxBuffer;
            TxBuffer.OnRemoved += TxBuffer_OnRemoved;
            this.RxBuffer = Remote.TxBuffer;
            this.IsServer = IsServer;
            this.LocalPort = localPort.HasValue ? (localPort.Value) : IsServer ? (new Random()).Next(50000) + 2000 : 1234;
            this.OnStartHandlingServer = OnStartHandlingServer;
        }
 
        public void Initialize(IKernel Kernel, int? localPort = null, bool IsServer = true, EventHandler OnStartHandlingServer = null)
        {
            this.Logger = Kernel.Get<IServiceLogger>();
            this.TxBuffer = Kernel.Get<DynamicBuffer>();
            TxBuffer.OnRemoved += TxBuffer_OnRemoved;
            this.RxBuffer = Kernel.Get<DynamicBuffer>();
            this.IsServer = IsServer;
            this.LocalPort = localPort.HasValue ? (localPort.Value) : IsServer ? (new Random()).Next(50000) + 2000 : 1234;
            this.OnStartHandlingServer = OnStartHandlingServer;
        }
 
        #endregion
 
        #region ISocketIOHandler
 
        public int ReceiveBlocking(System.Net.Sockets.SocketAsyncEventArgs saea, byte[] buffer, int offset, int count)
        {
            this.saea = saea;
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> ReceiveBlocking start, " +
                    "receiving " + count.ToString() + " bytes into buffer @ offset " + offset.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            SetNewState(SocketIOHandlerState.Connected, SocketIOHandlerState.ReceivingBlocking);
 
            int read = RxBuffer.Remove(buffer, offset, count);
            while ((read < count) && IsOperational() && Remote.IsOperational())
            {
                System.Threading.Thread.Sleep(10);
 
                read += RxBuffer.Remove(buffer, offset + read, count - read);
            }
 
            if (!(IsOperational() && Remote.IsOperational()))
            {
                throw new ObjectDisposedException("Socket is closed");
            }
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            var b = buffer;
            if (offset > 0)
            {
                b = new byte[count];
                Buffer.BlockCopy(buffer, offset, b, 0, count);
            }
 
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> ReceiveBlocking finished, " +
 
                    (IsServer ? "Server: " : "Client: ") +
                    "received " + count.ToString() + " bytes into buffer @ offset " + offset.ToString() + " => " +
                    GenXdev.Helpers.Hash.FormatBytesAsHexString(b);
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            SetNewState(SocketIOHandlerState.ReceivingBlocking, SocketIOHandlerState.Connected);
 
            return read;
        }
 
        public int SendBlocking(System.Net.Sockets.SocketAsyncEventArgs saea, byte[] buffer)
        {
            long currentState = Interlocked.Read(ref state);
            if (currentState != (int)SocketIOHandlerState.Connected)
            {
                throw new InvalidOperationException("SendBlocking called while in state: " +
                    Enum.GetName(typeof(SocketIOHandlerState), currentState));
            }
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> SendBlocking, " +
 
                    (IsServer ? "Server: " : "Client: ") +
                    "sending blocking: " + buffer.Length.ToString() + " bytes => " +
                    GenXdev.Helpers.Hash.FormatBytesAsHexString(buffer);
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            return TxBuffer.Add(buffer);
        }
 
        public bool SendAsync(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            this.saea = saea;
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> SendAsync";
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            SetNewState(SocketIOHandlerState.Connected, SocketIOHandlerState.Sending);
 
            ThreadPool.QueueUserWorkItem((obj) =>
            {
 
                lock (TxBuffer.SyncRoot)
                {
                    if (saea.Count == 0)
                        throw new InvalidOperationException();
 
                    bytesTransferredTx = TxBuffer.Add(saea.Buffer, saea.Offset, saea.Count);
 
                    if (Remote.TrySetUnIdle())
                    {
                        Monitor.Exit(TxBuffer.SyncRoot);
                        try
                        {
                            Remote.TriggerCompleted(Remote, Remote.saea);
                        }
                        finally
                        {
                            Monitor.Enter(TxBuffer.SyncRoot);
                        }
                    }
 
                    if ((TxBuffer.Count < SendBufferSize * 2) || !TrySetIdle(SocketIOHandlerState.Sending))
                    {
                        SetNewState(SocketIOHandlerState.Sending, SocketIOHandlerState.Connected);
 
                        ThreadPool.QueueUserWorkItem((obj2) =>
                        {
                            TriggerCompleted(this, this.saea);
                        });
                    }
                }
            });
 
            return true;
        }
 
        public bool ReceiveAsync(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            this.saea = saea;
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> ReceiveAsync";
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            SetNewState(SocketIOHandlerState.Connected, SocketIOHandlerState.Receiving);
 
            ThreadPool.QueueUserWorkItem((obj) =>
            {
                // no race conditions
                lock (RxBuffer.SyncRoot)
                {
                    if (RxBuffer.Count > 0)
                    {
                        SetNewState(SocketIOHandlerState.Receiving, SocketIOHandlerState.Connected);
 
                        ThreadPool.QueueUserWorkItem((obj2) =>
                        {
                            TriggerCompleted(this, this.saea);
                        });
                    }
                    else
                    {
                        SetNewState(SocketIOHandlerState.Receiving, SocketIOHandlerState.ReceivingIdle);
                    }
                }
            });
 
            return true;
        }
 
        public bool ConnectAsync(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            if (IsServer)
            {
                throw new InvalidOperationException("IsServer is true, but ConnectAsync is called");
            }
 
            this.saea = saea;
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> ConnectAsync";
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            SetNewState(SocketIOHandlerState.Idle, SocketIOHandlerState.Connecting);
 
            ThreadPool.QueueUserWorkItem((obj) =>
            {
 
                SetNewState(SocketIOHandlerState.Connecting, SocketIOHandlerState.Connected);
                Remote.SetNewState(SocketIOHandlerState.Idle, SocketIOHandlerState.Connected);
 
                if (OnStartHandlingServer == null)
                    throw new InvalidOperationException("Must assign OnStartHandlingServer for server iohandlers");
 
                ThreadPool.QueueUserWorkItem((obj2) =>
                {
                    TriggerCompleted(this, saea);
                });
 
                OnStartHandlingServer(this, EventArgs.Empty);
            });
 
            return true;
        }
 
        public bool DisconnectAsync(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            this.saea = saea;
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> DisconnectAsync";
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            SetNewState(SocketIOHandlerState.Connected, SocketIOHandlerState.Disconnecting);
 
            ThreadPool.QueueUserWorkItem((obj) =>
            {
                SetNewState(SocketIOHandlerState.Disconnecting, SocketIOHandlerState.Idle);
 
                TriggerCompleted(this, saea);
            });
 
            return true;
        }
 
        public void Close(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            Interlocked.Exchange(ref state, (int)SocketIOHandlerState.Disposed);
 
            if (Remote.TrySetNewState(SocketIOHandlerState.ReceivingIdle, SocketIOHandlerState.Disposed) ||
                Remote.TrySetNewState(SocketIOHandlerState.Connecting, SocketIOHandlerState.Disposed) ||
                Remote.TrySetNewState(SocketIOHandlerState.Disconnecting, SocketIOHandlerState.Disposed) ||
                Remote.TrySetNewState(SocketIOHandlerState.Sending, SocketIOHandlerState.Disposed) ||
                Remote.TrySetNewState(SocketIOHandlerState.SendingIdle, SocketIOHandlerState.Disposed)
            )
            {
                Remote.TriggerCompleted(Remote, Remote.saea);
            }
        }
 
        public void SetTcpKeepAlive(System.Net.Sockets.SocketAsyncEventArgs saea, uint keepaliveTime, uint keepaliveInterval)
        {
 
        }
 
        public void SetReceiveTimeout(System.Net.Sockets.SocketAsyncEventArgs saea, int timeout)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> Setting new receive timeout to " + timeout.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.ReceiveTimeout = timeout;
        }
 
        public void SetSendTimeout(System.Net.Sockets.SocketAsyncEventArgs saea, int timeout)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> Setting new send timeout to " + timeout.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.SendTimeout = timeout;
        }
 
        public void SetReceiveBufferSize(System.Net.Sockets.SocketAsyncEventArgs saea, int size)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> Setting new receivebuffer size to " + size.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.ReceiveBufferSize = size;
        }
 
        public void SetSendBufferSize(System.Net.Sockets.SocketAsyncEventArgs saea, int size)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> Setting new sendbuffer size to " + size.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.SendBufferSize = size;
        }
 
        public void SetBlocking(System.Net.Sockets.SocketAsyncEventArgs saea, bool blocking)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> set blocking: " + blocking.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.Blocking = blocking;
        }
 
        public void SetNoDelay(System.Net.Sockets.SocketAsyncEventArgs saea, bool nodelay)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> set nodelay: " + nodelay.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.NoDelay = nodelay;
        }
 
        public int GetReceiveTimeout(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            return ReceiveTimeout;
        }
 
        public bool GetNoDelay(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            return NoDelay;
        }
 
        public bool SocketHasDataAvailable(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            if (!IsOperational() || (!Remote.IsOperational()))
            {
                throw new ObjectDisposedException("Socket is closed");
            }
 
            return RxBuffer.Count > 0;
        }
 
        public int NrOfBytesSocketHasAvailable(SocketAsyncEventArgs saea)
        {
            return RxBuffer.Count;
        }
 
        public int GetLocalPort(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            return LocalPort;
        }
 
        public int GetRemotePort(System.Net.Sockets.SocketAsyncEventArgs saea)
        {
            return Remote.GetLocalPort(null);
        }
 
        public void SetLingerState(System.Net.Sockets.SocketAsyncEventArgs saea, bool linger)
        {
            this.Linger = linger;
        }
 
        public void Shutdown(System.Net.Sockets.SocketAsyncEventArgs saea, System.Net.Sockets.SocketShutdown how)
        {
            Close(saea);
        }
 
        public void SetLingerTime(System.Net.Sockets.SocketAsyncEventArgs saea, int lingerTime)
        {
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> Setting new lingertime to " + lingerTime.ToString();
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            this.LingerTime = lingerTime;
        }
 
        public bool IsConnectionlessSocket(System.Net.Sockets.SocketAsyncEventArgs saeaHandler)
        {
            return false;
        }
 
        public bool IsConnected(System.Net.Sockets.SocketAsyncEventArgs saeaHandler)
        {
            long currentState = Interlocked.Read(ref state);
 
            return (currentState != (int)SocketIOHandlerState.Idle) && (currentState != (int)SocketIOHandlerState.Disposed) && (currentState != (int)SocketIOHandlerState.Connecting);
        }
 
        public bool IsOperational()
        {
            lock (stateLock)
            {
                long currentState = Interlocked.Read(ref state);
                long oldState = Interlocked.Read(ref this.oldState);
 
                return ((state != (int)SocketIOHandlerState.Idle) || (oldState == (int)SocketIOHandlerState.Disconnecting)) &&
                        (state != (int)SocketIOHandlerState.Disposed);
            }
        }
 
        public System.Net.Sockets.SocketAsyncOperation GetLastOperation(System.Net.Sockets.SocketAsyncEventArgs saeaHandler)
        {
            switch ((SocketIOHandlerState)Interlocked.Read(ref oldState))
            {
                case SocketIOHandlerState.Connecting:
                    return SocketAsyncOperation.Connect;
 
                case SocketIOHandlerState.Disconnecting:
                    return SocketAsyncOperation.Disconnect;
 
                case SocketIOHandlerState.Receiving:
                case SocketIOHandlerState.ReceivingIdle:
                    return SocketAsyncOperation.Receive;
 
                case SocketIOHandlerState.Sending:
                case SocketIOHandlerState.SendingIdle:
                    return SocketAsyncOperation.Send;
 
                default:
                    return SocketAsyncOperation.None;
            }
        }
 
        public int GetBytesTransfered(System.Net.Sockets.SocketAsyncEventArgs saeaHandler)
        {
            switch (GetLastOperation(saeaHandler))
            {
                case SocketAsyncOperation.Receive:
                    return Math.Min(RxBuffer.Count, ReceiveBufferSize);
                case SocketAsyncOperation.Send:
                    return bytesTransferredTx;
            }
            return 0;
        }
 
        public System.Net.Sockets.SocketError GetSocketError(System.Net.Sockets.SocketAsyncEventArgs saeaHandler)
        {
            if (!(Remote.IsOperational() && IsOperational()))
                return SocketError.Shutdown;
 
            return SocketError.Success;
        }
 
        public int GetReceivedData(SocketAsyncEventArgs saea, DynamicBuffer rxBuffer)
        {
            var bytes = this.RxBuffer.RemoveBytes(GetBytesTransfered(saea));
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> " +
                    "Read " + bytes.Length.ToString() + " bytes from receivebuffer => " + GenXdev.Helpers.Hash.FormatBytesAsHexString(bytes);
            });
 
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            return rxBuffer.Add(bytes);
 
            //return this.RxBuffer.MoveTo(rxBuffer, bytesTransferred);
        }
 
        public void PrepareSendBuffer(SocketAsyncEventArgs saea, DynamicBuffer buffer, int count)
        {
            this.saea = saea;
            var bytes = buffer.RemoveBytes(count);
 
            #region ---------------------------------------------------------------------------------------------------
#if (Logging)
            Logger.LogSocketFlowMessage(saea, () =>
            {
                return "=> PrepareSendBuffer, " +
 
                    (IsServer ? "Server: " : "Client: ") +
                    "filling send buffer: " + bytes.Length.ToString() + " bytes => " +
                    GenXdev.Helpers.Hash.FormatBytesAsHexString(bytes);
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            bytesTransferredTx = bytes.Length;
            saea.SetBuffer(bytes, 0, count);
        }
 
 
        public void Unregister(SocketAsyncEventArgs saea)
        {
            this.saea = saea == this.saea ? null : this.saea;
        }
 
        #endregion
 
        #region Private
 
        void TxBuffer_OnRemoved(object sender, DynamicBufferEmptyEventArgs e)
        {
            ThreadPool.QueueUserWorkItem((obj2) =>
            {
                bool trigger = false;
 
                lock (TxBuffer.SyncRoot)
                {
                    if (TxBuffer.Count <= SendBufferSize * 2)
                    {
                        trigger = TrySetNewState(SocketIOHandlerState.SendingIdle, SocketIOHandlerState.Connected);
                    }
                }
 
                if (trigger)
                {
                    TriggerCompleted(this, this.saea);
                }
            });
 
        }
 
        internal void SetNewState(SocketIOHandlerState requiredCurrentState, SocketIOHandlerState newState)
        {
            lock (stateLock)
            {
                var result = Interlocked.CompareExchange(
                    ref this.state,
                    (int)newState,
                    (int)requiredCurrentState
                    );
 
                Interlocked.Exchange(ref oldState, result);
 
                if (result != (int)requiredCurrentState)
                {
                    throw new InvalidOperationException(String.Format("SetNewState, " +
                        (IsServer ? "Server" : "Client") + " handler wants to switch state from \"{0}\" to \"{1}\", but IOHandler is in state \"{2}\"",
                        Enum.GetName(typeof(SocketIOHandlerState), requiredCurrentState),
                        Enum.GetName(typeof(SocketIOHandlerState), newState),
                        Enum.GetName(typeof(SocketIOHandlerState), result)
                    ));
                }
 
                #region ---------------------------------------------------------------------------------------------------
#if (Logging)
                Logger.LogSocketFlowMessage(saea, () =>
                {
                    return String.Format("=> " +
                            "Statechange from \"{0}\" to \"{1}\"",
                            Enum.GetName(typeof(SocketIOHandlerState), requiredCurrentState),
                            Enum.GetName(typeof(SocketIOHandlerState), newState)
                    );
                });
#endif
                #endregion ------------------------------------------------------------------------------------------------
            }
        }
 
        bool TrySetIdle(SocketIOHandlerState requiredCurrentState)
        {
            lock (stateLock)
            {
                if (requiredCurrentState == SocketIOHandlerState.Sending)
                {
                    return
                         (Interlocked.Read(ref Remote.state) != (int)SocketIOHandlerState.SendingIdle) &&
                        TrySetNewState(requiredCurrentState, SocketIOHandlerState.SendingIdle);
                }
                if (requiredCurrentState == SocketIOHandlerState.Receiving)
                {
                    return
                        (Interlocked.Read(ref Remote.state) != (int)SocketIOHandlerState.ReceivingIdle) &&
                        TrySetNewState(requiredCurrentState, SocketIOHandlerState.ReceivingIdle);
                }
 
                throw new InvalidOperationException();
            }
        }
 
        bool TrySetUnIdle()
        {
            lock (stateLock)
            {
                return
                    TrySetNewState(SocketIOHandlerState.ReceivingIdle, SocketIOHandlerState.Connected) ||
                    TrySetNewState(SocketIOHandlerState.SendingIdle, SocketIOHandlerState.Connected);
            }
        }
 
        bool TrySetNewState(SocketIOHandlerState requiredCurrentState, SocketIOHandlerState newState)
        {
            lock (stateLock)
            {
                var oldState = Interlocked.CompareExchange(
                    ref this.state,
                    (int)newState,
                    (int)requiredCurrentState
                );
 
                if (oldState != (int)requiredCurrentState)
                {
                    return false;
                }
 
                #region ---------------------------------------------------------------------------------------------------
#if (Logging)
                Logger.LogSocketFlowMessage(saea, () =>
                {
                    return String.Format("=> " +
                            "Successfull statechange from \"{0}\" to \"{1}\"",
                            Enum.GetName(typeof(SocketIOHandlerState), requiredCurrentState),
                            Enum.GetName(typeof(SocketIOHandlerState), newState)
                    );
                });
#endif
                #endregion ------------------------------------------------------------------------------------------------
 
                Interlocked.Exchange(ref this.oldState, oldState);
 
                return true;
            }
        }
 
        internal void SetSaea(SocketAsyncEventArgs socketAsyncEventArgs)
        {
            saea = socketAsyncEventArgs;
        }
 
 
        #endregion
    }
}