Complete Notifications when Plc is disposed

This commit is contained in:
Peter Butzhammer
2024-02-07 08:51:40 +01:00
parent d678924b6e
commit 956f39cc66
3 changed files with 17 additions and 38 deletions

View File

@@ -85,7 +85,8 @@ namespace Sharp7.Rx.Basics
return; return;
if (disposing && dictionary != null) if (disposing && dictionary != null)
{ {
dictionary.Values.DisposeItems(); foreach (var subjectWithRefCounter in dictionary)
subjectWithRefCounter.Value.Subject.OnCompleted();
dictionary.Clear(); dictionary.Clear();
dictionary = null; dictionary = null;
} }

View File

@@ -1,6 +1,4 @@
using System; using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables; using System.Reactive.Disposables;
namespace Sharp7.Rx.Extensions namespace Sharp7.Rx.Extensions
@@ -11,11 +9,5 @@ namespace Sharp7.Rx.Extensions
{ {
compositeDisposable.Add(disposable); compositeDisposable.Add(disposable);
} }
public static void DisposeItems(this IEnumerable<object> disposables)
{
foreach (IDisposable disposable in disposables.OfType<IDisposable>())
disposable?.Dispose();
}
} }
} }

View File

@@ -5,7 +5,6 @@ using System.Linq;
using System.Reactive; using System.Reactive;
using System.Reactive.Disposables; using System.Reactive.Disposables;
using System.Reactive.Linq; using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -22,7 +21,6 @@ namespace Sharp7.Rx
{ {
private readonly IS7VariableNameParser varaibleNameParser = new CacheVariableNameParser(new S7VariableNameParser()); private readonly IS7VariableNameParser varaibleNameParser = new CacheVariableNameParser(new S7VariableNameParser());
private bool disposed; private bool disposed;
private ISubject<Unit> disposingSubject = new Subject<Unit>();
private IS7Connector s7Connector; private IS7Connector s7Connector;
private readonly PlcConnectionSettings plcConnectionSettings; private readonly PlcConnectionSettings plcConnectionSettings;
private readonly ConcurrentSubjectDictionary<string, byte[]> multiVariableSubscriptions = new ConcurrentSubjectDictionary<string, byte[]>(StringComparer.InvariantCultureIgnoreCase); private readonly ConcurrentSubjectDictionary<string, byte[]> multiVariableSubscriptions = new ConcurrentSubjectDictionary<string, byte[]>(StringComparer.InvariantCultureIgnoreCase);
@@ -182,33 +180,26 @@ namespace Sharp7.Rx
public void Dispose() public void Dispose()
{ {
Dispose(true); Dispose(true);
GC.SuppressFinalize(this);
} }
protected virtual void Dispose(bool disposing) protected virtual void Dispose(bool disposing)
{ {
if (!disposed) if (disposed) return;
{ disposed = true;
if (disposing)
{
Disposables.Dispose();
if (disposingSubject != null) if (disposing)
{ {
disposingSubject.OnNext(Unit.Default); Disposables.Dispose();
disposingSubject.OnCompleted();
var disposable = (disposingSubject as IDisposable); if (s7Connector != null)
if (disposable != null) disposable.Dispose(); {
disposingSubject = null; s7Connector.Disconnect().Wait();
} s7Connector.Dispose();
if (s7Connector != null) s7Connector = null;
{
s7Connector.Disconnect().Wait();
s7Connector.Dispose();
s7Connector = null;
}
} }
disposed = true; multiVariableSubscriptions.Dispose();
} }
} }
@@ -224,7 +215,6 @@ namespace Sharp7.Rx
.SelectMany(connected => GetAllValues(connected, connector)) .SelectMany(connected => GetAllValues(connected, connector))
.RepeatAfterDelay(cycle) .RepeatAfterDelay(cycle)
.LogAndRetryAfterDelay(Logger, cycle, "Error while getting batch notifications from plc") .LogAndRetryAfterDelay(Logger, cycle, "Error while getting batch notifications from plc")
.TakeUntil(disposingSubject)
.Subscribe(); .Subscribe();
} }
@@ -239,15 +229,11 @@ namespace Sharp7.Rx
var stopWatch = Stopwatch.StartNew(); var stopWatch = Stopwatch.StartNew();
foreach (var partsOfMultiVarRequest in multiVariableSubscriptions.ExistingKeys.Buffer(MultiVarRequestMaxItems)) foreach (var partsOfMultiVarRequest in multiVariableSubscriptions.ExistingKeys.Buffer(MultiVarRequestMaxItems))
{ {
var multiVarRequest = await connector.ExecuteMultiVarRequest(partsOfMultiVarRequest as IReadOnlyList<string>??partsOfMultiVarRequest.ToList()); var multiVarRequest = await connector.ExecuteMultiVarRequest(partsOfMultiVarRequest as IReadOnlyList<string>);
foreach (var pair in multiVarRequest) foreach (var pair in multiVarRequest)
{
if (multiVariableSubscriptions.TryGetObserver(pair.Key, out var subject)) if (multiVariableSubscriptions.TryGetObserver(pair.Key, out var subject))
{
subject.OnNext(pair.Value); subject.OnNext(pair.Value);
}
}
} }
stopWatch.Stop(); stopWatch.Stop();