Use the cancellation token logic also with sends

This commit is contained in:
JustArchi
2020-12-16 12:47:46 +01:00
parent 79a9f97157
commit 626fa7f059

View File

@@ -39,7 +39,7 @@ using Newtonsoft.Json;
namespace ArchiSteamFarm.IPC.Controllers.Api {
[Route("Api/NLog")]
public sealed class NLogController : ArchiController {
private static readonly ConcurrentDictionary<WebSocket, SemaphoreSlim> ActiveLogWebSockets = new();
private static readonly ConcurrentDictionary<WebSocket, (SemaphoreSlim Semaphore, CancellationToken CancellationToken)> ActiveLogWebSockets = new();
/// <summary>
/// Fetches ASF log in realtime.
@@ -66,7 +66,7 @@ namespace ArchiSteamFarm.IPC.Controllers.Api {
SemaphoreSlim sendSemaphore = new(1, 1);
if (!ActiveLogWebSockets.TryAdd(webSocket, sendSemaphore)) {
if (!ActiveLogWebSockets.TryAdd(webSocket, (sendSemaphore, cancellationToken))) {
sendSemaphore.Dispose();
return new EmptyResult();
@@ -76,7 +76,7 @@ namespace ArchiSteamFarm.IPC.Controllers.Api {
// Push initial history if available
if (ArchiKestrel.HistoryTarget != null) {
// ReSharper disable once AccessToDisposedClosure - we're waiting for completion with Task.WhenAll(), we're not going to exit using block
await Task.WhenAll(ArchiKestrel.HistoryTarget.ArchivedMessages.Select(archivedMessage => PostLoggedMessageUpdate(webSocket, sendSemaphore, archivedMessage))).ConfigureAwait(false);
await Task.WhenAll(ArchiKestrel.HistoryTarget.ArchivedMessages.Select(archivedMessage => PostLoggedMessageUpdate(webSocket, archivedMessage, sendSemaphore, cancellationToken))).ConfigureAwait(false);
}
while (webSocket.State == WebSocketState.Open) {
@@ -93,9 +93,9 @@ namespace ArchiSteamFarm.IPC.Controllers.Api {
break;
}
} finally {
if (ActiveLogWebSockets.TryRemove(webSocket, out SemaphoreSlim? closedSemaphore)) {
await closedSemaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false); // Ensure that our semaphore is truly closed by now
closedSemaphore.Dispose();
if (ActiveLogWebSockets.TryRemove(webSocket, out (SemaphoreSlim Semaphore, CancellationToken CancellationToken) entry)) {
await entry.Semaphore.WaitAsync(CancellationToken.None).ConfigureAwait(false); // Ensure that our semaphore is truly closed by now
entry.Semaphore.Dispose();
}
}
} catch (ConnectionAbortedException e) {
@@ -120,34 +120,44 @@ namespace ArchiSteamFarm.IPC.Controllers.Api {
string json = JsonConvert.SerializeObject(new GenericResponse<string>(newHistoryEntryArgs.Message));
await Task.WhenAll(ActiveLogWebSockets.Where(kv => kv.Key.State == WebSocketState.Open).Select(kv => PostLoggedJsonUpdate(kv.Key, kv.Value, json))).ConfigureAwait(false);
await Task.WhenAll(ActiveLogWebSockets.Where(kv => kv.Key.State == WebSocketState.Open).Select(kv => PostLoggedJsonUpdate(kv.Key, json, kv.Value.Semaphore, kv.Value.CancellationToken))).ConfigureAwait(false);
}
private static async Task PostLoggedJsonUpdate(WebSocket webSocket, SemaphoreSlim sendSemaphore, string json) {
private static async Task PostLoggedJsonUpdate(WebSocket webSocket, string json, SemaphoreSlim sendSemaphore, CancellationToken cancellationToken) {
if (webSocket == null) {
throw new ArgumentNullException(nameof(webSocket));
}
if (sendSemaphore == null) {
throw new ArgumentNullException(nameof(sendSemaphore));
}
if (string.IsNullOrEmpty(json)) {
throw new ArgumentNullException(nameof(json));
}
if (webSocket.State != WebSocketState.Open) {
if (sendSemaphore == null) {
throw new ArgumentNullException(nameof(sendSemaphore));
}
if (cancellationToken.IsCancellationRequested || (webSocket.State != WebSocketState.Open)) {
return;
}
await sendSemaphore.WaitAsync().ConfigureAwait(false);
try {
await sendSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
} catch (OperationCanceledException e) {
ASF.ArchiLogger.LogGenericDebuggingException(e);
return;
}
try {
if (webSocket.State != WebSocketState.Open) {
if (cancellationToken.IsCancellationRequested || (webSocket.State != WebSocketState.Open)) {
return;
}
await webSocket.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
await webSocket.SendAsync(Encoding.UTF8.GetBytes(json), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
} catch (ConnectionAbortedException e) {
ASF.ArchiLogger.LogGenericDebuggingException(e);
} catch (OperationCanceledException e) {
ASF.ArchiLogger.LogGenericDebuggingException(e);
} catch (WebSocketException e) {
ASF.ArchiLogger.LogGenericDebuggingException(e);
} finally {
@@ -155,26 +165,26 @@ namespace ArchiSteamFarm.IPC.Controllers.Api {
}
}
private static async Task PostLoggedMessageUpdate(WebSocket webSocket, SemaphoreSlim sendSemaphore, string loggedMessage) {
private static async Task PostLoggedMessageUpdate(WebSocket webSocket, string loggedMessage, SemaphoreSlim sendSemaphore, CancellationToken cancellationToken) {
if (webSocket == null) {
throw new ArgumentNullException(nameof(webSocket));
}
if (sendSemaphore == null) {
throw new ArgumentNullException(nameof(sendSemaphore));
}
if (string.IsNullOrEmpty(loggedMessage)) {
throw new ArgumentNullException(nameof(loggedMessage));
}
if (webSocket.State != WebSocketState.Open) {
if (sendSemaphore == null) {
throw new ArgumentNullException(nameof(sendSemaphore));
}
if (cancellationToken.IsCancellationRequested || (webSocket.State != WebSocketState.Open)) {
return;
}
string response = JsonConvert.SerializeObject(new GenericResponse<string>(loggedMessage));
await PostLoggedJsonUpdate(webSocket, sendSemaphore, response).ConfigureAwait(false);
await PostLoggedJsonUpdate(webSocket, response, sendSemaphore, cancellationToken).ConfigureAwait(false);
}
}
}