AsyncSocketHandlerPool.cs
/*
* All intellectual rights of this framework, including this source file belong to Appicacy, René Vaessen. * Customers of Appicacy, may copy and change it, as long as this header remains. * */ using GenXdev.AsyncSockets.Handlers; using Ninject; using System.Collections.Concurrent; using GenXdev.MemoryManagement; using GenXdev.Configuration; using GenXdev.AsyncSockets.Configuration; using Ninject.Parameters; namespace GenXdev.AsyncSockets.Containers { public class AsyncSocketHandlerPool<HandlerType> : IAsyncSocketHandlerPool where HandlerType : SocketHandlerBase { #region Initialization IKernel Kernel; IMemoryManagerConfiguration MemoryConfiguration; IServiceMemoryManager MemoryManager; ISocketHandlerPoolConfiguration PoolConfiguration; #if (Logging) IServiceLogger Logger; #endif String ServiceName; [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors")] [Inject] public AsyncSocketHandlerPool( IKernel Kernel, IMemoryManagerConfiguration MemoryConfiguration, IServiceMemoryManager MemoryManager, #if (Logging) IServiceLogger Logger, #endif ISocketHandlerPoolConfiguration PoolConfiguration, String ServiceName, bool DoInitialize = true ) { this.Kernel = Kernel; #if (Logging) this.Logger = Logger; #endif this.MemoryConfiguration = MemoryConfiguration; this.MemoryManager = MemoryManager; this.PoolConfiguration = PoolConfiguration; this.ServiceName = ServiceName; if (DoInitialize) { Initialize(); } } ~AsyncSocketHandlerPool() { Dispose(false); } protected bool started = false; public EventHandler<HandlerClosedEventArgs> RemoveProtocolHandlerCallback { get; set; } virtual protected void OnHandlerClosed(object sender, HandlerClosedEventArgs e) { SocketHandlerBase handler = e.handler; var callback = RemoveProtocolHandlerCallback; if (callback != null) { callback(this, new HandlerClosedEventArgs(handler)); } else { Push(e.handler); } } protected void Initialize() { saeaBufferSize = Math.Max(MemoryConfiguration.ReceiveBufferSize, MemoryConfiguration.SendBufferSize); SocketQueue = new ConcurrentQueue<SocketHandlerBase>(); Start(); } HandlerType CreateNewHandler() { // create handler var handler = Kernel.Get<HandlerType>( new IParameter[2] { new ConstructorArgument("ServiceName", ServiceName), new ConstructorArgument("Pool", this)}); handler.RegisterHandler(handler.saeaHandler, OnHandlerClosed); return handler; } protected virtual void PerformMaintenance() { SocketHandlerBase[] activeSockets; lock (ActiveSockets) { activeSockets = ActiveSockets.ToArray<SocketHandlerBase>(); } foreach (var activeSocket in activeSockets) { if (IsExpired(activeSocket)) { #region ---------------------------------------------------------------------------------------------[LOG]- #if (Logging) Logger.LogHandlerFlowMessage(activeSocket.ControllingHandler.saeaHandler, () => { return "Socket has timed out!"; }); #endif #endregion ------------------------------------------------------------------------------------------------ activeSocket.ControllingHandler.Close(); } } } protected virtual bool IsExpired(SocketHandlerBase activeSocket) { return activeSocket.ControllingHandler.IsTimedOut; } protected bool Disposing; public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { started = false; if (disposing) { Disposing = true; if (SocketQueue == null) return; StopAllActiveConnections(); // empty SocketQueue while (SocketQueue.TryDequeue(out SocketHandlerBase handler)) { handler.Dispose(); } } } public virtual void Stop() { started = false; StopAllActiveConnections(); } public virtual void Start() { if (!started) { started = true; Task.Factory.StartNew(async delegate { while (started) { try { PerformMaintenance(); } catch { // ignore } await Task.Delay(1000); } }, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default ).Unwrap(); } } public void StopAllActiveConnections() { if (ActiveSockets != null) { lock (ActiveSockets) { var activeSockets = ActiveSockets.ToArray<SocketHandlerBase>(); foreach (var activeSocket in activeSockets) { activeSocket.ControllingHandler.Close(); } } } } #endregion #region Fields int saeaBufferSize; protected ConcurrentQueue<SocketHandlerBase> SocketQueue; protected HashSet<SocketHandlerBase> ActiveSockets = new HashSet<SocketHandlerBase>(); //protected Timer Timer; #endregion #region Protected protected void FireHandleCapturedSocketReset(SocketHandlerBase socketHandler, bool resetWasRequested) { socketHandler.HandleCapturedSocketReset(resetWasRequested); } #endregion #region Public public bool Pop(out SocketHandlerBase handler) { lock (ActiveSockets) { var count = SocketQueue.Count; if (!SocketQueue.TryDequeue(out handler) && ((count < this.PoolConfiguration.MaxConnections) || (this.PoolConfiguration.MaxConnections == 0))) { handler = CreateNewHandler(); } if (handler != null) { handler.OnDequeueHandler(); handler.RegisterHandler(handler.saeaHandler, OnHandlerClosed); handler.SetCurrentStageTimeoutSeconds(10); handler.SetActivityTimestamp(); ActiveSockets.Add(handler); return true; } } return false; } public void Push(SocketHandlerBase handler) { lock (ActiveSockets) { ActiveSockets.Remove(handler); handler.OnEnqueueHandler(); if (Disposing) { handler.Dispose(); } else { SocketQueue.Enqueue(handler); } } } #endregion } } |