Fix multithreadign issue with lingering subscriptions

This commit is contained in:
Peter Butzhammer
2024-02-09 11:31:08 +01:00
parent 280a894b1f
commit dd0af70262
3 changed files with 25 additions and 20 deletions

View File

@@ -53,11 +53,14 @@ internal class ConcurrentSubjectDictionary<TKey, TValue> : IDisposable
{
lock (dictionaryLock)
{
var subject = dictionary.AddOrUpdate(key, k => new SubjectWithRefCounter {Counter = 1, Subject = CreateSubject(k)}, (key1, counter) =>
{
counter.Counter = counter.Counter + 1;
return counter;
});
var subject = dictionary.AddOrUpdate(
key,
k => new SubjectWithRefCounter(CreateSubject(k)),
(_, subjectWithRefCounter) =>
{
subjectWithRefCounter.IncreaseCount();
return subjectWithRefCounter;
});
return new DisposableItem<TValue>(subject.Subject.AsObservable(), () => RemoveIfNoLongerInUse(key));
}
@@ -65,8 +68,7 @@ internal class ConcurrentSubjectDictionary<TKey, TValue> : IDisposable
public bool TryGetObserver(TKey key, out IObserver<TValue> subject)
{
SubjectWithRefCounter subjectWithRefCount;
if (dictionary.TryGetValue(key, out subjectWithRefCount))
if (dictionary.TryGetValue(key, out var subjectWithRefCount))
{
subject = subjectWithRefCount.Subject.AsObserver();
return true;
@@ -101,15 +103,9 @@ internal class ConcurrentSubjectDictionary<TKey, TValue> : IDisposable
private void RemoveIfNoLongerInUse(TKey variableName)
{
lock (dictionaryLock)
{
SubjectWithRefCounter subjectWithRefCount;
if (dictionary.TryGetValue(variableName, out subjectWithRefCount))
{
if (subjectWithRefCount.Counter == 1)
dictionary.TryRemove(variableName, out subjectWithRefCount);
else subjectWithRefCount.Counter--;
}
}
if (dictionary.TryGetValue(variableName, out var subjectWithRefCount))
if (subjectWithRefCount.DecreaseCount() < 1)
dictionary.TryRemove(variableName, out _);
}
~ConcurrentSubjectDictionary()
@@ -119,7 +115,16 @@ internal class ConcurrentSubjectDictionary<TKey, TValue> : IDisposable
class SubjectWithRefCounter
{
public int Counter { get; set; }
public ISubject<TValue> Subject { get; set; }
private int counter = 1;
public SubjectWithRefCounter(ISubject<TValue> subject)
{
Subject = subject;
}
public ISubject<TValue> Subject { get; }
public int DecreaseCount() => Interlocked.Decrement(ref counter);
public int IncreaseCount() => Interlocked.Increment(ref counter);
}
}

View File

@@ -17,5 +17,5 @@ internal interface IS7Connector : IDisposable
Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNo, CancellationToken token);
Task WriteBytes(Operand operand, ushort startByteAddress, byte[] data, ushort dbNo, CancellationToken token);
Task<Dictionary<string, byte[]>> ExecuteMultiVarRequest(IReadOnlyList<string> variableNames);
Task<IReadOnlyDictionary<string, byte[]>> ExecuteMultiVarRequest(IReadOnlyList<string> variableNames);
}

View File

@@ -98,7 +98,7 @@ internal class Sharp7Connector : IS7Connector
await CloseConnection();
}
public async Task<Dictionary<string, byte[]>> ExecuteMultiVarRequest(IReadOnlyList<string> variableNames)
public async Task<IReadOnlyDictionary<string, byte[]>> ExecuteMultiVarRequest(IReadOnlyList<string> variableNames)
{
if (variableNames.IsEmpty())
return new Dictionary<string, byte[]>();