diff --git a/ArchiSteamFarm/IPC.cs b/ArchiSteamFarm/IPC.cs index 9dbb6a2ac..cb854df14 100644 --- a/ArchiSteamFarm/IPC.cs +++ b/ArchiSteamFarm/IPC.cs @@ -44,7 +44,7 @@ namespace ArchiSteamFarm { internal static bool IsRunning => IsHandlingRequests || IsListening; - private static readonly ConcurrentHashSet ActiveLogWebSockets = new ConcurrentHashSet(); + private static readonly ConcurrentDictionary ActiveLogWebSockets = new ConcurrentDictionary(); private static readonly SemaphoreSlim AuthorizationSemaphore = new SemaphoreSlim(1, 1); private static readonly HashSet CompressableContentTypes = new HashSet { @@ -582,12 +582,16 @@ namespace ArchiSteamFarm { try { HttpListenerWebSocketContext webSocketContext = await context.AcceptWebSocketAsync(null).ConfigureAwait(false); - ActiveLogWebSockets.Add(webSocketContext.WebSocket); + SemaphoreSlim sendSemaphore = new SemaphoreSlim(1, 1); + if (!ActiveLogWebSockets.TryAdd(webSocketContext.WebSocket, sendSemaphore)) { + sendSemaphore.Dispose(); + return true; + } try { // Push initial history if available if (HistoryTarget != null) { - await Task.WhenAll(HistoryTarget.ArchivedMessages.Select(archivedMessage => PostLoggedMessageUpdate(webSocketContext.WebSocket, archivedMessage))).ConfigureAwait(false); + await Task.WhenAll(HistoryTarget.ArchivedMessages.Select(archivedMessage => PostLoggedMessageUpdate(webSocketContext.WebSocket, sendSemaphore, archivedMessage))).ConfigureAwait(false); } while (webSocketContext.WebSocket.State == WebSocketState.Open) { @@ -602,9 +606,15 @@ namespace ArchiSteamFarm { break; } } finally { - ActiveLogWebSockets.Remove(webSocketContext.WebSocket); + if (ActiveLogWebSockets.TryRemove(webSocketContext.WebSocket, out SemaphoreSlim closedSemaphore)) { + await closedSemaphore.WaitAsync().ConfigureAwait(false); // Ensure that our semaphore is truly closed by now + closedSemaphore.Dispose(); + } } + return true; + } catch (HttpListenerException e) { + ASF.ArchiLogger.LogGenericDebuggingException(e); return true; } catch (WebSocketException e) { ASF.ArchiLogger.LogGenericDebuggingException(e); @@ -965,11 +975,11 @@ namespace ArchiSteamFarm { } string json = JsonConvert.SerializeObject(new GenericResponse(true, "OK", newHistoryEntryArgs.Message)); - await Task.WhenAll(ActiveLogWebSockets.Where(webSocket => webSocket.State == WebSocketState.Open).Select(webSocket => PostLoggedJsonUpdate(webSocket, json))).ConfigureAwait(false); + await Task.WhenAll(ActiveLogWebSockets.Where(kv => kv.Key.State == WebSocketState.Open).Select(kv => PostLoggedJsonUpdate(kv.Key, kv.Value, json))).ConfigureAwait(false); } - private static async Task PostLoggedJsonUpdate(WebSocket webSocket, string json) { - if ((webSocket == null) || string.IsNullOrEmpty(json)) { + private static async Task PostLoggedJsonUpdate(WebSocket webSocket, SemaphoreSlim sendSemaphore, string json) { + if ((webSocket == null) || (sendSemaphore == null) || string.IsNullOrEmpty(json)) { ASF.ArchiLogger.LogNullError(nameof(webSocket) + " || " + nameof(json)); return; } @@ -978,15 +988,25 @@ namespace ArchiSteamFarm { return; } + await sendSemaphore.WaitAsync().ConfigureAwait(false); + try { + if (webSocket.State != WebSocketState.Open) { + return; + } + await webSocket.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false); + } catch (HttpListenerException e) { + ASF.ArchiLogger.LogGenericDebuggingException(e); } catch (WebSocketException e) { ASF.ArchiLogger.LogGenericDebuggingException(e); + } finally { + sendSemaphore.Release(); } } - private static async Task PostLoggedMessageUpdate(WebSocket webSocket, string loggedMessage) { - if ((webSocket == null) || string.IsNullOrEmpty(loggedMessage)) { + private static async Task PostLoggedMessageUpdate(WebSocket webSocket, SemaphoreSlim sendSemaphore, string loggedMessage) { + if ((webSocket == null) || (sendSemaphore == null) || string.IsNullOrEmpty(loggedMessage)) { ASF.ArchiLogger.LogNullError(nameof(webSocket) + " || " + nameof(loggedMessage)); return; } @@ -996,7 +1016,7 @@ namespace ArchiSteamFarm { } string response = JsonConvert.SerializeObject(new GenericResponse(true, "OK", loggedMessage)); - await PostLoggedJsonUpdate(webSocket, response).ConfigureAwait(false); + await PostLoggedJsonUpdate(webSocket, sendSemaphore, response).ConfigureAwait(false); } private static async Task ResponseBase(HttpListenerRequest request, HttpListenerResponse response, byte[] content, HttpStatusCode statusCode = HttpStatusCode.OK) {