SocketHandlerListener.cs

/*
 * All intellectual rights of this framework, including this source file belong to Appicacy, René Vaessen.
 * Customers of Appicacy, may copy and change it, as long as this header remains.
 *
 */
using Ninject;
using System.Net.Sockets;
using GenXdev.AsyncSockets.Handlers;
using System.Net;
using GenXdev.MemoryManagement;
using GenXdev.AsyncSockets.Containers;
using GenXdev.Configuration;
using GenXdev.AsyncSockets.Configuration;
 
namespace GenXdev.AsyncSockets.Listener
{
    public class SocketHandlerListener<HandlerType> : ISocketHandlerListener where HandlerType : SocketHandlerBase
    {
        #region Initialization
 
        IKernel Kernel;
        IServiceMemoryManager MemoryManager;
        ISocketListenerConfiguration ListenerConfiguration;
        IAsyncSocketListenerPool ListenerPool;
 
#if (Logging)
        IServiceLogger Logger;
#endif
 
 
        [Inject]
        public SocketHandlerListener(
            IKernel Kernel,
            IServiceMemoryManager MemoryManager,
            ISocketListenerConfiguration ListenerConfiguration,
            IAsyncSocketListenerPool ListenerPool,
            IAsyncSocketHandlerPool HandlerPool
#if (Logging)
, IServiceLogger Logger
#endif
 
    )
        {
 
            this.Kernel = Kernel;
            this.MemoryManager = MemoryManager;
            this.ListenerConfiguration = ListenerConfiguration;
            this.ListenerPool = ListenerPool;
            this.HandlerPool = HandlerPool;
#if (Logging)
            this.Logger = Logger;
#endif
            this.ListenerTimer = new System.Threading.Timer(new TimerCallback(ListenerTimerCallback), null, System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
 
            this.ListenerConfiguration.OnPropertyChanged += ListenerConfiguration_OnPropertyChanged;
 
            Initialize();
        }
 
        void ListenerConfiguration_OnPropertyChanged(object sender, OnPropertyChangedEventArgs e)
        {
            this.ActivateNewConfiguration(this.ListenerConfiguration);
        }
 
        void Initialize()
        {
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogProgramFlowInfoMessage(() =>
            {
                return "Initializing socket listener for ports: " + String.Join(", ", (from q in ListenerConfiguration.ListeningPorts orderby q ascending select q.ToString()).ToList<string>());
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            #region Sockets
 
            // Pool of reusable SocketAsyncEventArgs objects for accept operations
            ListenerPool.OnListenerSocketIOCompleted = OnListenerSocketIOCompleted;
 
            // Pool of reusable SocketAsyncEventArgs objects for socket operations
            HandlerPool.RemoveProtocolHandlerCallback = OnRemoveSocket;
 
            #endregion
 
            #region Locks
 
            // Total nr of incoming connections enforcer
            MaxIncomingSocketsEnforcer = new Semaphore(ListenerConfiguration.MaxConnections, ListenerConfiguration.MaxConnections);
 
            #endregion
        }
 
        protected virtual IPEndPoint CreateEndPoint(int Port)
        {
            try
            {
                return new IPEndPoint(IPAddress.Any, Port);
            }
            catch (InvalidOperationException)
            {
                return new IPEndPoint(IPAddress.Any, Port);
            }
        }
 
        #endregion
 
        #region Fields
 
        #region Statistics
 
        #region Connections count
 
        // Total number of incoming connections
        long TotalNrOfIncomingSockets;
 
 
        #endregion
 
        #region Performance Logging
 
#if (Logging)
        // Last performance measurement time stamp
        double LastPerformanceMeasurementTimestamp;
 
        // Total number of incoming connections accepted since last measurement
        long TotalNrOfIncomingSocketsAccepted;
 
        // Total number of incoming connections closed since last measurement
        long TotalNrOfIncomingSocketsClosed;
 
#endif
 
        #endregion
 
        #endregion
 
        #region Misc
 
        // Timer for high precision time measurement
        GenXdev.Additional.HiPerfTimer.NativeMethods Timer = new GenXdev.Additional.HiPerfTimer.NativeMethods(true);
 
        #endregion
 
        #region Locks
 
        // Total nr of incoming connections enforcer
        Semaphore MaxIncomingSocketsEnforcer;
 
        #endregion
 
        #region Sockets
 
        // listener socket
        List<Socket> SocketListeners = new List<Socket>();
        private System.Threading.Timer ListenerTimer;
 
        #endregion
 
        #endregion
 
        #region Public
        public bool Started { get; private set; }
        public bool ListenerStarted { get; private set; }
        public IAsyncSocketHandlerPool HandlerPool { get; private set; }
 
        // starts the proxy server
        public void Start()
        {
            if (ListenerStarted)
                return;
 
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogProgramFlowInfoMessage(() =>
            {
                return "Starting socket listener for ports: " + String.Join(", ", (from q in ListenerConfiguration.ListeningPorts orderby q ascending select q.ToString()).ToList<string>());
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            Started = true;
 
            // Create listener
            SocketListeners = new List<Socket>();
 
            ActivateNewConfiguration(this.ListenerConfiguration);
 
            ListenerTimer.Change(1000, 1000);
        }
 
        // stops the proxy server
        public void Stop()
        {
            if (!Started)
                return;
 
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogProgramFlowInfoMessage(() =>
            {
                return "Stopping socket listener for ports: " + String.Join(", ", (from q in ListenerConfiguration.ListeningPorts orderby q ascending select q.ToString()).ToList<string>());
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            Started = false;
            ListenerTimer.Change(Timeout.Infinite, Timeout.Infinite);
 
            // Stop listener
            foreach (var socketListener in SocketListeners)
            {
                try
                {
                    socketListener.Close();
                }
                catch { }
            }
            SocketListeners.Clear();
 
            HandlerPool.Stop();
            HandlerPool.StopAllActiveConnections();
 
            ListenerStarted = false;
            DateTime start = DateTime.UtcNow;
 
            // wait for tasks to complete
            while ((Interlocked.Read(ref TotalNrOfIncomingSockets) > 0) && (DateTime.UtcNow - start < TimeSpan.FromSeconds(2)))
            {
                // yield
                Thread.Sleep(10);
            }
 
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogProgramFlowInfoMessage(() =>
            {
                return "Listener stopped";
            });
#endif
            #endregion ----------------------
        }
        public void ActivateNewConfiguration(ISocketListenerConfiguration NewConfiguration)
        {
            lock (SocketListeners)
            {
                var newPorts = NewConfiguration.ListeningPorts.ToList<int>();
 
                var currentPorts = (
                    from q in SocketListeners
                    select ((IPEndPoint)q.LocalEndPoint).Port
                ).ToList<int>();
 
                var addedPorts = (
                    from q in newPorts
                    where currentPorts.IndexOf(q) < 0
                    select q
                ).ToList<int>();
 
                var removedPorts = (
                    from q in currentPorts
                    where newPorts.IndexOf(q) < 0
                    select q
                ).ToList<int>();
 
                var removedSockets = (
                    from q in SocketListeners
                    where removedPorts.IndexOf(((IPEndPoint)q.LocalEndPoint).Port) >= 0
                    select q
                ).ToArray<Socket>();
 
                foreach (var removedSocket in removedSockets)
                {
                    #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
                    Logger.LogProgramFlowInfoMessage(() =>
                    {
                        return String.Format("New listening configuration removed listening port {0}, listener stopped",
                            ((IPEndPoint)removedSocket.LocalEndPoint).Port);
                    });
#endif
                    #endregion ------------------------------------------------------------------------------------------------
 
                    try
                    {
                        removedSocket.Dispose();
                    }
                    catch { }
 
                    SocketListeners.Remove(removedSocket);
                }
 
                foreach (var newPort in addedPorts)
                {
                    #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
                    Logger.LogProgramFlowInfoMessage(() =>
                    {
                        return String.Format("New listening configuration added listening port {0}, listener starting..",
                            newPort);
                    });
#endif
                    #endregion ------------------------------------------------------------------------------------------------
 
                    try
                    {
                        var newSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                        newSocket.ExclusiveAddressUse = false;
                        newSocket.Bind(CreateEndPoint(newPort));
 
                        // Start listening
                        newSocket.Listen(newPort);
 
                        SocketListeners.Add(newSocket);
 
                        StartAcceptSockets(newSocket);
 
                        #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
                        Logger.LogProgramFlowInfoMessage(() =>
                        {
                            return "..listener started successfully";
                        });
#endif
                        #endregion ------------------------------------------------------------------------------------------------
                    }
#if (Logging)
                    catch (Exception e)
#else
                    catch
#endif
                    {
                        #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
                        Logger.LogException(e, String.Format("..could not bind to port {0}", newPort));
#endif
                        #endregion ------------------------------------------------------------------------------------------------
                    }
                }
 
                ListenerStarted = true;
                Started = true;
            }
        }
 
        // disposes the proxy server
        public void Dispose()
        {
            Dispose(true);
 
            GC.SuppressFinalize(this);
        }
 
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (Started)
                    Stop();
 
                #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
                Logger.LogProgramFlowInfoMessage(() =>
                {
                    return "Starting server disposal";
                });
#endif
                #endregion ------------------------------------------------------------------------------------------------
 
                #region Sockets
 
                if (SocketListeners != null)
                {
                    // listener
                    foreach (var socket in SocketListeners)
                    {
                        socket.Dispose();
                    }
 
                    SocketListeners.Clear();
                    SocketListeners = null;
                }
 
                if (ListenerPool != null)
                {
                    // Pool of reusable SocketAsyncEventArgs objects for accept operations
                    ListenerPool.Dispose();
                    ListenerPool = null;
                }
 
                if (HandlerPool != null)
                {
                    // Pool of reusable SocketAsyncEventArgs objects for socket operations
                    HandlerPool.Dispose();
                    HandlerPool = null;
                }
 
                #endregion
 
                #region Memory
 
                if (MemoryManager != null)
                {
                    // Reserve large chunk of memory and reuse this without reallocations during runtime
                    MemoryManager.Dispose();
                    MemoryManager = null;
                }
                #endregion
            }
        }
 
        #endregion
 
        #region Listening
 
        void StartAcceptSockets(Socket socket)
        {
            lock (SocketListeners)
            {
                if (!Started || (SocketListeners.IndexOf(socket) < 0))
                    return;
            }
 
            // protect against to many connections
            MaxIncomingSocketsEnforcer.WaitOne();
 
            if (!Started)
            {
                MaxIncomingSocketsEnforcer.Release();
                return;
            }
 
            // get saea
 
            if (ListenerPool.Pop(out SocketAsyncEventArgs acceptEventArg))
            {
                // start accept
                if (!socket.AcceptAsync(acceptEventArg))
                {
                    HandleAccept(socket, acceptEventArg);
                }
            }
            else
            {
                // should stop the appdomain
                throw new Exception("Enforcer failure");
            }
        }
 
 
        void HandleBadAccept(SocketAsyncEventArgs AcceptEventArgs)
        {
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogSocketFlowMessage(AcceptEventArgs, () =>
            {
                return "Handling bad accept operation";
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            //This method closes the socket and releases all resources, both
            //managed and unmanaged. It internally calls Dispose
            try
            {
                if (AcceptEventArgs.AcceptSocket != null)
                {
                    AcceptEventArgs.AcceptSocket.LingerState.Enabled = false;
                    AcceptEventArgs.AcceptSocket.Close();
                }
            }
            catch { }
 
            // dereference
            AcceptEventArgs.AcceptSocket = null;
 
            //Put the saea back in the pool
            ListenerPool.Push(AcceptEventArgs);
 
            //Release Semaphore so that its connection counter will be decremented.
            //This must be done AFTER putting the SocketAsyncEventArg back into the pool,
            //or you can run into problems.
            MaxIncomingSocketsEnforcer.Release();
        }
 
        #endregion
 
        #region Handling
 
        void OnListenerSocketIOCompleted(object sender, SocketAsyncEventArgs saea)
        {
            switch (saea.LastOperation)
            {
                case SocketAsyncOperation.Accept:
 
                    HandleAccept((Socket)sender, saea);
                    break;
 
                default:
                    throw new InvalidOperationException();
            }
        }
 
        void HandleAccept(Socket socket, SocketAsyncEventArgs AcceptEventArgs)
        {
            if (!Started) return;
 
            lock (SocketListeners)
            {
                if (SocketListeners.IndexOf(socket) < 0)
                    return;
            }
 
            // Not successfull?
            if (AcceptEventArgs.SocketError != SocketError.Success)
            {
                //Let's destroy this socket, since it could be bad.
                HandleBadAccept(AcceptEventArgs);
 
                // Start another accept operation
                StartAcceptSockets(socket);
                return;
            }
 
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogProgramFlowInfoMessage(() =>
            {
                return String.Format(
                    "New connection on listing port {0} from remote address {1}:{2}",
                    ((IPEndPoint)socket.LocalEndPoint).Port,
                    ((IPEndPoint)AcceptEventArgs.AcceptSocket.RemoteEndPoint).Address.ToString(),
                    ((IPEndPoint)AcceptEventArgs.AcceptSocket.RemoteEndPoint).Port
                    );
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
 
            // Get an operation saea
 
            if (HandlerPool.Pop(out SocketHandlerBase operationHandler))
            {
                // Update statistics
                Interlocked.Increment(ref TotalNrOfIncomingSockets);
#if (Logging)
                Interlocked.Increment(ref TotalNrOfIncomingSocketsAccepted);
#endif
 
                // Assign socket to it
                operationHandler.CreateSaea();
                operationHandler.saeaHandler.AcceptSocket = AcceptEventArgs.AcceptSocket;
 
                // Reset accept socket
                AcceptEventArgs.AcceptSocket = null;
 
                // Put the accept socket back in the pool
                ListenerPool.Push(AcceptEventArgs);
 
                #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
                try
                {
                    Logger.LogSocketFlowMessage(AcceptEventArgs, () =>
                    {
                        return "New connection on port " +
                            ((IPEndPoint)operationHandler.saeaHandler.AcceptSocket.LocalEndPoint).Port.ToString() + " from " +
                            ((IPEndPoint)operationHandler.saeaHandler.AcceptSocket.RemoteEndPoint).Address.ToString() + ":" +
                            ((IPEndPoint)operationHandler.saeaHandler.AcceptSocket.RemoteEndPoint).Port.ToString();
                    });
                }
                catch { }
#endif
                #endregion ------------------------------------------------------------------------------------------------
 
                // Start handling this socket
                ThreadPool.QueueUserWorkItem(operationHandler.StartHandlingSocket);
 
                // Start another accept operation
                StartAcceptSockets(socket);
            }
            else
            {
                // should stop the appdomain
                throw new Exception("Enforcer failure");
            }
        }
 
        void OnRemoveSocket(object sender, HandlerClosedEventArgs e)
        {
            SocketHandlerBase handler = e.handler;
 
            // Update statistics
            Interlocked.Decrement(ref TotalNrOfIncomingSockets);
#if (Logging)
            Interlocked.Increment(ref TotalNrOfIncomingSocketsClosed);
#endif
 
            #region ---------------------------------------------------------------------------------------------[LOG]-
#if (Logging)
            Logger.LogSocketFlowMessage(handler.saeaHandler, () =>
            {
                return "Socket closed";
            });
#endif
            #endregion ------------------------------------------------------------------------------------------------
 
            // Put the SocketAsyncEventArg back into the pool,
            // to be used by another client.
            HandlerPool.Push(handler);
 
            //Release Semaphore so that its connection counter will be decremented.
            //This must be done AFTER putting the SocketAsyncEventArg back into the pool,
            //or you can run into problems.
            MaxIncomingSocketsEnforcer.Release();
        }
 
        #endregion
 
        #region Performance logging
 
        void ListenerTimerCallback(object state)
        {
            this.ActivateNewConfiguration(this.ListenerConfiguration);
 
#if (Logging)
            Logger.LogPerformanceStatusMessage(() =>
            {
                return String.Format("Pool of {0} In {1} | ps InA {2} | InC {3} | M {4}",
 
                    typeof(HandlerType).Name.Replace("SocketHandler", "").Replace("ServerHandler", "").PadRight(20, ' '),
                    Interlocked.Read(ref TotalNrOfIncomingSockets).ToString().PadLeft(4, ' '),
                    (Convert.ToDouble(Interlocked.Read(ref TotalNrOfIncomingSocketsAccepted)) / (Timer.Duration - LastPerformanceMeasurementTimestamp)).ToString("0").PadLeft(4, ' '),
                    (Convert.ToDouble(Interlocked.Read(ref TotalNrOfIncomingSocketsClosed)) / (Timer.Duration - LastPerformanceMeasurementTimestamp)).ToString("0").PadLeft(4, ' '),
                    Interlocked.Read(ref ((ServiceMemoryManager)MemoryManager).mem).ToString()
                    );
            });
 
            this.LastPerformanceMeasurementTimestamp = Timer.Duration;
            Interlocked.Exchange(ref this.TotalNrOfIncomingSocketsAccepted, 0);
            Interlocked.Exchange(ref this.TotalNrOfIncomingSocketsClosed, 0);
#endif
        }
 
        #endregion
    }
}