Fix async/await with ConcurrentHashSet

Now this is a nice bug that was found accidentally by ArchiBoT...
ReaderWriterLockSlim() is very decent solution, but it's thread-based, and we're using our ConcurrentHashSet in mixed async/sync context. This means that if we use something like:
foreach (var item in concHashSet) {
    await AnythingAsync().ConfigureAwait(false);
}
It's totally possible that we'll request read lock as thread 1, and release the read lock as thread 2, which will lead to RWLock exception => System.Threading.SynchronizationLockException: The read lock is being released without being held.
Fortunately it looks like we didn't have any scenario like this in ASF, as this was possible only when we async/await while enumerating over ConcurrentHashSet, so that specific bug didn't affect ASF codebase (yet). Still, I must fix this as current implementation is not thread-safe, so our HashSet is in fact not concurrent in the first place.
I analyzed possible solutions and there are basically 3: either using ConcurrentDictionary and wrapping around it, replacing lock with SemaphoreSlim, or using third-party AsyncReaderWriterLock from StephenCleary. SemaphoreSlim entirely kills the concept of multiple readers one writer, and could affect performance negatively, moreover - it doesn't support upgreadable lock scenario we have with ReplaceIfNeededWith(). Concurrent dictionary would be nice if I didn't have that awful memory hit from storing mandatory pointless value, plus I don't really like concept of wrapping around conc dictionary if I can simply use it right away and drop my conc hashset entirely. AsyncReaderWriterLock seem to be really well written, and works on Mono + should be compatible with .NET core in the future, so we should go for it as it's the best bet both performance-wise and memory-wise.
This brings another package dependency and changes a bit backend of ConcurrentHashSet
This commit is contained in:
JustArchi
2017-02-07 20:14:51 +01:00
parent f97379bf60
commit a8045ac50b
206 changed files with 114696 additions and 74 deletions

View File

@@ -22,21 +22,16 @@
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using Nito.AsyncEx;
namespace ArchiSteamFarm {
internal sealed class ConcurrentHashSet<T> : ICollection<T>, IDisposable {
internal sealed class ConcurrentHashSet<T> : ICollection<T> {
public int Count {
get {
Lock.EnterReadLock();
try {
using (Lock.ReaderLock()) {
return HashSet.Count;
} finally {
Lock.ExitReadLock();
}
}
}
@@ -44,48 +39,31 @@ namespace ArchiSteamFarm {
public bool IsReadOnly => false;
private readonly HashSet<T> HashSet = new HashSet<T>();
private readonly ReaderWriterLockSlim Lock = new ReaderWriterLockSlim();
private readonly AsyncReaderWriterLock Lock = new AsyncReaderWriterLock();
public void Clear() {
Lock.EnterWriteLock();
try {
using (Lock.WriterLock()) {
HashSet.Clear();
} finally {
Lock.ExitWriteLock();
}
}
public bool Contains(T item) {
Lock.EnterReadLock();
try {
using (Lock.ReaderLock()) {
return HashSet.Contains(item);
} finally {
Lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex) {
Lock.EnterReadLock();
try {
using (Lock.ReaderLock()) {
HashSet.CopyTo(array, arrayIndex);
} finally {
Lock.ExitReadLock();
}
}
public void Dispose() => Lock.Dispose();
public IEnumerator<T> GetEnumerator() => new ConcurrentEnumerator<T>(HashSet, Lock);
public bool Remove(T item) {
Lock.EnterWriteLock();
try {
using (Lock.WriterLock()) {
return HashSet.Remove(item);
} finally {
Lock.ExitWriteLock();
}
}
@@ -94,45 +72,43 @@ namespace ArchiSteamFarm {
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
internal void Add(T item) {
Lock.EnterWriteLock();
try {
using (Lock.WriterLock()) {
HashSet.Add(item);
} finally {
Lock.ExitWriteLock();
}
}
internal void ClearAndTrim() {
Lock.EnterWriteLock();
try {
using (Lock.WriterLock()) {
HashSet.Clear();
HashSet.TrimExcess();
} finally {
Lock.ExitWriteLock();
}
}
internal bool ReplaceIfNeededWith(ICollection<T> items) {
Lock.EnterUpgradeableReadLock();
try {
using (AsyncReaderWriterLock.UpgradeableReaderKey readerKey = Lock.UpgradeableReaderLock()) {
if (HashSet.SetEquals(items)) {
return false;
}
ReplaceWith(items);
ReplaceWith(items, readerKey);
return true;
} finally {
Lock.ExitUpgradeableReadLock();
}
}
internal void ReplaceWith(IEnumerable<T> items) {
Lock.EnterWriteLock();
try {
using (Lock.WriterLock()) {
HashSet.Clear();
foreach (T item in items) {
HashSet.Add(item);
}
HashSet.TrimExcess();
}
}
private void ReplaceWith(IEnumerable<T> items, AsyncReaderWriterLock.UpgradeableReaderKey readerKey) {
using (readerKey.Upgrade()) {
HashSet.Clear();
foreach (T item in items) {
@@ -140,8 +116,6 @@ namespace ArchiSteamFarm {
}
HashSet.TrimExcess();
} finally {
Lock.ExitWriteLock();
}
}
}