From 6e8c3f79c479a39ac52bc982651f581a0930252b Mon Sep 17 00:00:00 2001 From: Federico Barresi Date: Fri, 17 Jan 2020 14:49:01 +0100 Subject: [PATCH] Added multivar create notification --- .../Basics/ConcurrentSubjectDictionary.cs | 128 ++++++++++++ Sharp7.Rx/Basics/DisposableItem.cs | 38 ++++ Sharp7.Rx/Extensions/DisposableExtensions.cs | 8 + Sharp7.Rx/Interfaces/IS7Connector.cs | 2 + Sharp7.Rx/Interfaces/IS7VariableNameParser.cs | 7 + Sharp7.Rx/S7VariableNameParser.cs | 3 +- Sharp7.Rx/Settings/PlcConnectionSettings.cs | 10 + Sharp7.Rx/Sharp7.Rx.csproj | 1 + Sharp7.Rx/Sharp7Connector.cs | 47 ++++- Sharp7.Rx/Sharp7Plc.cs | 197 +++++++++++++++--- 10 files changed, 401 insertions(+), 40 deletions(-) create mode 100644 Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs create mode 100644 Sharp7.Rx/Basics/DisposableItem.cs create mode 100644 Sharp7.Rx/Interfaces/IS7VariableNameParser.cs create mode 100644 Sharp7.Rx/Settings/PlcConnectionSettings.cs diff --git a/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs b/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs new file mode 100644 index 0000000..fa5184e --- /dev/null +++ b/Sharp7.Rx/Basics/ConcurrentSubjectDictionary.cs @@ -0,0 +1,128 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using Sharp7.Rx.Extensions; + +namespace Sharp7.Rx.Basics +{ + internal class ConcurrentSubjectDictionary : IDisposable + { + private readonly object dictionaryLock = new object(); + private readonly Func valueFactory; + private ConcurrentDictionary dictionary; + + public ConcurrentSubjectDictionary() + { + dictionary = new ConcurrentDictionary(); + } + + public ConcurrentSubjectDictionary(IEqualityComparer comparer) + { + dictionary = new ConcurrentDictionary(comparer); + } + + public ConcurrentSubjectDictionary(TValue initialValue, IEqualityComparer comparer) + { + valueFactory = _ => initialValue; + dictionary = new ConcurrentDictionary(comparer); + } + + public ConcurrentSubjectDictionary(TValue initialValue) + { + valueFactory = _ => initialValue; + dictionary = new ConcurrentDictionary(); + } + + public ConcurrentSubjectDictionary(Func valueFactory = null) + { + this.valueFactory = valueFactory; + dictionary = new ConcurrentDictionary(); + } + + public IEnumerable ExistingKeys => dictionary.Keys; + + public bool IsDisposed { get; private set; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public DisposableItem GetOrCreateObservable(TKey key) + { + lock (dictionaryLock) + { + var subject = dictionary.AddOrUpdate(key, k => new SubjectWithRefCounter {Counter = 1, Subject = CreateSubject(k)}, (key1, counter) => + { + counter.Counter = counter.Counter + 1; + return counter; + }); + + return new DisposableItem(subject.Subject.AsObservable(), () => RemoveIfNoLongerInUse(key)); + } + } + + public bool TryGetObserver(TKey key, out IObserver subject) + { + SubjectWithRefCounter subjectWithRefCount; + if (dictionary.TryGetValue(key, out subjectWithRefCount)) + { + subject = subjectWithRefCount.Subject.AsObserver(); + return true; + } + + subject = null; + return false; + } + + protected virtual void Dispose(bool disposing) + { + if (IsDisposed) + return; + if (disposing && dictionary != null) + { + dictionary.Values.DisposeItems(); + dictionary.Clear(); + dictionary = null; + } + + IsDisposed = true; + } + + private ISubject CreateSubject(TKey key) + { + if (valueFactory == null) + return new Subject(); + return new BehaviorSubject(valueFactory(key)); + } + + private void RemoveIfNoLongerInUse(TKey variableName) + { + lock (dictionaryLock) + { + SubjectWithRefCounter subjectWithRefCount; + if (dictionary.TryGetValue(variableName, out subjectWithRefCount)) + { + if (subjectWithRefCount.Counter == 1) + dictionary.TryRemove(variableName, out subjectWithRefCount); + else subjectWithRefCount.Counter--; + } + } + } + + ~ConcurrentSubjectDictionary() + { + Dispose(false); + } + + class SubjectWithRefCounter + { + public int Counter { get; set; } + public ISubject Subject { get; set; } + } + } +} \ No newline at end of file diff --git a/Sharp7.Rx/Basics/DisposableItem.cs b/Sharp7.Rx/Basics/DisposableItem.cs new file mode 100644 index 0000000..f6ab006 --- /dev/null +++ b/Sharp7.Rx/Basics/DisposableItem.cs @@ -0,0 +1,38 @@ +using System; +using System.Linq; + +namespace Sharp7.Rx.Basics +{ + internal class DisposableItem : IDisposable + { + private readonly Action disposeAction; + + bool disposed; + + public DisposableItem(IObservable observable, Action disposeAction) + { + this.disposeAction = disposeAction; + Observable = observable; + } + + public IObservable Observable { get; } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposed) return; + + if (disposing) + { + disposeAction(); + } + + disposed = true; + } + } +} \ No newline at end of file diff --git a/Sharp7.Rx/Extensions/DisposableExtensions.cs b/Sharp7.Rx/Extensions/DisposableExtensions.cs index 89f3c75..0ad5c06 100644 --- a/Sharp7.Rx/Extensions/DisposableExtensions.cs +++ b/Sharp7.Rx/Extensions/DisposableExtensions.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Reactive.Disposables; namespace Sharp7.Rx.Extensions @@ -9,5 +11,11 @@ namespace Sharp7.Rx.Extensions { compositeDisposable.Add(disposable); } + + public static void DisposeItems(this IEnumerable disposables) + { + foreach (IDisposable disposable in disposables.OfType()) + disposable?.Dispose(); + } } } diff --git a/Sharp7.Rx/Interfaces/IS7Connector.cs b/Sharp7.Rx/Interfaces/IS7Connector.cs index 43d393b..74767a0 100644 --- a/Sharp7.Rx/Interfaces/IS7Connector.cs +++ b/Sharp7.Rx/Interfaces/IS7Connector.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -20,5 +21,6 @@ namespace Sharp7.Rx.Interfaces Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token); Task WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token); ILogger Logger { get; } + Task> ExecuteMultiVarRequest(IEnumerable variableNames); } } \ No newline at end of file diff --git a/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs b/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs new file mode 100644 index 0000000..dd8e272 --- /dev/null +++ b/Sharp7.Rx/Interfaces/IS7VariableNameParser.cs @@ -0,0 +1,7 @@ +namespace Sharp7.Rx.Interfaces +{ + internal interface IS7VariableNameParser + { + S7VariableAddress Parse(string input); + } +} \ No newline at end of file diff --git a/Sharp7.Rx/S7VariableNameParser.cs b/Sharp7.Rx/S7VariableNameParser.cs index bbe5e0e..b5213b3 100644 --- a/Sharp7.Rx/S7VariableNameParser.cs +++ b/Sharp7.Rx/S7VariableNameParser.cs @@ -4,10 +4,11 @@ using System.Globalization; using System.Linq; using System.Text.RegularExpressions; using Sharp7.Rx.Enums; +using Sharp7.Rx.Interfaces; namespace Sharp7.Rx { - internal class S7VariableNameParser + internal class S7VariableNameParser : IS7VariableNameParser { private readonly Regex regex = new Regex(@"^(?db{1})(?\d{1,4})\.?(?dbx|x|s|string|b|dbb|d|int|dbw|w|dint|dul|dulint|dulong|){1}(?\d+)(\.(?\d+))?$", RegexOptions.IgnoreCase); diff --git a/Sharp7.Rx/Settings/PlcConnectionSettings.cs b/Sharp7.Rx/Settings/PlcConnectionSettings.cs new file mode 100644 index 0000000..b4b5f85 --- /dev/null +++ b/Sharp7.Rx/Settings/PlcConnectionSettings.cs @@ -0,0 +1,10 @@ +namespace Sharp7.Rx.Settings +{ + internal class PlcConnectionSettings + { + public string IpAddress { get; set; } + public int RackNumber { get; set; } + public int CpuMpiAddress { get; set; } + public int Port { get; set; } + } +} \ No newline at end of file diff --git a/Sharp7.Rx/Sharp7.Rx.csproj b/Sharp7.Rx/Sharp7.Rx.csproj index 3b8b25d..7a71dbe 100644 --- a/Sharp7.Rx/Sharp7.Rx.csproj +++ b/Sharp7.Rx/Sharp7.Rx.csproj @@ -17,6 +17,7 @@ + diff --git a/Sharp7.Rx/Sharp7Connector.cs b/Sharp7.Rx/Sharp7Connector.cs index d4ce062..8a648c7 100644 --- a/Sharp7.Rx/Sharp7Connector.cs +++ b/Sharp7.Rx/Sharp7Connector.cs @@ -1,4 +1,7 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; @@ -9,12 +12,16 @@ using Sharp7.Rx.Enums; using Sharp7.Rx.Extensions; using Sharp7.Rx.Interfaces; using Sharp7.Rx.Resources; +using Sharp7.Rx.Settings; namespace Sharp7.Rx { internal class Sharp7Connector : IS7Connector { + private readonly IS7VariableNameParser variableNameParser; private readonly BehaviorSubject connectionStateSubject = new BehaviorSubject(Enums.ConnectionState.Initial); + private ConcurrentDictionary s7VariableAddresses = new ConcurrentDictionary(); + private readonly CompositeDisposable disposables = new CompositeDisposable(); private readonly TaskScheduler scheduler = TaskScheduler.Current; private readonly string ipAddress; @@ -26,13 +33,43 @@ namespace Sharp7.Rx private bool disposed; public ILogger Logger { get; set; } + public async Task> ExecuteMultiVarRequest(IEnumerable variableNames) + { + var enumerable = variableNames as string[] ?? variableNames.ToArray(); - public Sharp7Connector(string ipAddress, int rackNr = 0, int cpuSlotNr = 2, int port = 102) + if (enumerable.IsEmpty()) + return new Dictionary(); + + var s7MultiVar = new S7MultiVar(sharp7); + + var buffers = enumerable.Select(key => new {VariableName = key, Address = s7VariableAddresses.GetOrAdd(key, s => variableNameParser.Parse(s))}) + .Select(x => + { + var buffer = new byte[x.Address.Length]; + s7MultiVar.Add(S7Consts.S7AreaDB, S7Consts.S7WLByte, x.Address.DbNr, x.Address.Start,x.Address.Length, ref buffer); + return new { x.VariableName, Buffer = buffer}; + }) + .ToArray(); + + var result = await Task.Factory.StartNew(() => s7MultiVar.Read(), CancellationToken.None, TaskCreationOptions.None, scheduler); + if (result != 0) + { + await EvaluateErrorCode(result); + throw new InvalidOperationException($"Error in MultiVar request for variables: {string.Join(",", enumerable)}"); + } + + return buffers.ToDictionary(arg => arg.VariableName, arg => arg.Buffer); + } + + + + public Sharp7Connector(PlcConnectionSettings settings, IS7VariableNameParser variableNameParser) { - this.ipAddress = ipAddress; - this.cpuSlotNr = cpuSlotNr; - this.port = port; - this.rackNr = rackNr; + this.variableNameParser = variableNameParser; + this.ipAddress = settings.IpAddress; + this.cpuSlotNr = settings.CpuMpiAddress; + this.port = settings.Port; + this.rackNr = settings.Port; ReconnectDelay = TimeSpan.FromSeconds(5); } diff --git a/Sharp7.Rx/Sharp7Plc.cs b/Sharp7.Rx/Sharp7Plc.cs index 2772877..6bc4f38 100644 --- a/Sharp7.Rx/Sharp7Plc.cs +++ b/Sharp7.Rx/Sharp7Plc.cs @@ -1,16 +1,21 @@ using System; +using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; +using Sharp7.Rx.Basics; using Sharp7.Rx.Enums; using Sharp7.Rx.Extensions; using Sharp7.Rx.Interfaces; using Sharp7.Rx.Resources; +using Sharp7.Rx.Settings; namespace Sharp7.Rx { @@ -19,16 +24,26 @@ namespace Sharp7.Rx private readonly string ipAddress; private readonly int rackNumber; private readonly int cpuMpiAddress; - private readonly S7VariableNameParser varaibleNameParser; + private readonly int port; + private readonly IS7VariableNameParser varaibleNameParser; private bool disposed; private ISubject disposingSubject = new Subject(); private IS7Connector s7Connector; + private readonly PlcConnectionSettings plcConnectionSettings; + private readonly ConcurrentSubjectDictionary multiVariableSubscriptions = new ConcurrentSubjectDictionary(StringComparer.InvariantCultureIgnoreCase); + protected readonly CompositeDisposable Disposables = new CompositeDisposable(); + private readonly List performanceCoutner = new List(1000); - public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress) + + + public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress, int port = 102) { this.ipAddress = ipAddress; this.rackNumber = rackNumber; this.cpuMpiAddress = cpuMpiAddress; + this.port = port; + + plcConnectionSettings = new PlcConnectionSettings(){IpAddress = ipAddress, RackNumber = rackNumber, CpuMpiAddress = cpuMpiAddress, Port = port}; varaibleNameParser = new S7VariableNameParser(); } @@ -37,10 +52,13 @@ namespace Sharp7.Rx public async Task InitializeAsync() { - s7Connector = new Sharp7Connector(ipAddress, rackNumber, cpuMpiAddress); + s7Connector = new Sharp7Connector(plcConnectionSettings, varaibleNameParser); ConnectionState = s7Connector.ConnectionState; await s7Connector.InitializeAsync(); + + RunNotifications(s7Connector, TimeSpan.FromMilliseconds(100)) + .AddDisposableTo(Disposables); #pragma warning disable 4014 Task.Run(async () => @@ -64,6 +82,72 @@ namespace Sharp7.Rx return GetValue(variableName, CancellationToken.None); } + private TValue ConvertToType(byte[] buffer, S7VariableAddress address) + { + if (typeof(TValue) == typeof(bool)) + { + return (TValue) (object) Convert.ToBoolean(buffer[0] & (1 << address.Bit)); + } + + if (typeof(TValue) == typeof(int)) + { + if (address.Length == 2) + return (TValue)(object)((buffer[0] << 8) + buffer[1]); + if (address.Length == 4) + { + Array.Reverse(buffer); + return (TValue)(object)BitConverter.ToInt32(buffer,0); + } + + throw new InvalidOperationException($"length must be 2 or 4 but is {address.Length}"); + } + + if (typeof(TValue) == typeof(long)) + { + Array.Reverse(buffer); + return (TValue)(object)BitConverter.ToInt64(buffer,0); + } + + if (typeof(TValue) == typeof(ulong)) + { + Array.Reverse(buffer); + return (TValue)(object)BitConverter.ToUInt64(buffer, 0); + } + + if (typeof(TValue) == typeof(short)) + { + return (TValue)(object)(short)((buffer[0] << 8) + buffer[1]); + } + + if (typeof(TValue) == typeof(byte) || typeof(TValue) == typeof(char)) + { + return (TValue)(object)buffer[0]; + } + + if (typeof(TValue) == typeof(byte[])) + { + return (TValue)(object)buffer; + } + + if (typeof(TValue) == typeof(double) || typeof(TValue) == typeof(float)) + { + var d = BitConverter.ToSingle(buffer.Reverse().ToArray(),0); + return (TValue)(object)d; + } + + if (typeof(TValue) == typeof(string)) + if (address.Type == DbType.String) + { + return (TValue) (object) Encoding.ASCII.GetString(buffer); + } + else + { + return (TValue) (object) Encoding.ASCII.GetString(buffer).Trim(); + } + + throw new InvalidOperationException(string.Format("type '{0}' not supported.", typeof(TValue))); + } + public async Task GetValue(string variableName, CancellationToken token) { var address = varaibleNameParser.Parse(variableName); @@ -235,44 +319,30 @@ namespace Sharp7.Rx } } - public IObservable CreateNotification(string variableName, TransmissionMode transmissionMode, TimeSpan cycle) + public IObservable CreateNotification(string variableName, TransmissionMode transmissionMode, TimeSpan cycleTime) { - var address = varaibleNameParser.Parse(variableName); - if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName)); + return Observable.Create(observer => + { + var address = varaibleNameParser.Parse(variableName); + if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName)); - if (cycle < TimeSpan.FromMilliseconds(100)) - cycle = TimeSpan.FromMilliseconds(100); + var disposables = new CompositeDisposable(); + var disposeableContainer = multiVariableSubscriptions.GetOrCreateObservable(variableName); + disposeableContainer.AddDisposableTo(disposables); - var notification = ConnectionState.FirstAsync().Select(states => states == Enums.ConnectionState.Connected) - .SelectMany(async connected => - { - var value = default(TValue); - if (connected) - { - value = await GetValue(variableName, CancellationToken.None); - } + var observable = disposeableContainer.Observable + .Select(bytes => ConvertToType(bytes, address)); - return new - { - HasValue = connected, - Value = value - }; - }) - .RepeatAfterDelay(cycle) - .LogAndRetryAfterDelay(s7Connector.Logger, cycle, StringResources.StrLogErrorReadingDataFromPlc) - .TakeUntil(disposingSubject) - .Where(union => union.HasValue) - .Select(union => union.Value); + if (transmissionMode == TransmissionMode.OnChange) + observable = observable.DistinctUntilChanged(); - if (transmissionMode == TransmissionMode.Cyclic) - return notification; + observable.Subscribe(observer) + .AddDisposableTo(disposables); - if (transmissionMode == TransmissionMode.OnChange) - return notification.DistinctUntilChanged(); - - throw new ArgumentException("Transmission mode can either be Cyclic or OnChange", nameof(transmissionMode)); + return disposables; + }); } - + public void Dispose() { Dispose(true); @@ -284,6 +354,8 @@ namespace Sharp7.Rx { if (disposing) { + Disposables.Dispose(); + if (disposingSubject != null) { disposingSubject.OnNext(Unit.Default); @@ -308,5 +380,62 @@ namespace Sharp7.Rx { Dispose(false); } + + private IDisposable RunNotifications(IS7Connector connector, TimeSpan cycle) + { + return ConnectionState.FirstAsync() + .Select(states => states == Enums.ConnectionState.Connected) + .SelectMany(connected => GetAllValues(connected, connector)) + .RepeatAfterDelay(cycle) + .LogAndRetryAfterDelay(s7Connector.Logger, cycle, "Error while getting batch notifications from plc") + .TakeUntil(disposingSubject) + .Subscribe(); + } + + private async Task GetAllValues(bool connected, IS7Connector connector) + { + if (!connected) + return Unit.Default; + + if (multiVariableSubscriptions.ExistingKeys.IsEmpty()) + return Unit.Default; + + var stopWatch = Stopwatch.StartNew(); + foreach (var partsOfMultiVarRequest in multiVariableSubscriptions.ExistingKeys.Buffer(MultiVarRequestMaxItems)) + { + var multiVarRequest = await connector.ExecuteMultiVarRequest(partsOfMultiVarRequest); + + foreach (var pair in multiVarRequest) + { + if (multiVariableSubscriptions.TryGetObserver(pair.Key, out var subject)) + { + subject.OnNext(pair.Value); + } + } + } + + stopWatch.Stop(); + performanceCoutner.Add(stopWatch.ElapsedMilliseconds); + + PrintAndResetPerformanceStatistik(); + + return Unit.Default; + } + + private void PrintAndResetPerformanceStatistik() + { + if (performanceCoutner.Count == performanceCoutner.Capacity) + { + var average = performanceCoutner.Average(); + var min = performanceCoutner.Min(); + var max = performanceCoutner.Max(); + + s7Connector.Logger.LogInformation("Performance statistic during {0} elements of plc notification. Min: {1}, Max: {2}, Average: {3}, Plc: '{4}', Number of variables: {5}, Batch size: {6}", performanceCoutner.Capacity, min, max, average, plcConnectionSettings.IpAddress, multiVariableSubscriptions.ExistingKeys.Count(), + MultiVarRequestMaxItems); + performanceCoutner.Clear(); + } + } + + public int MultiVarRequestMaxItems { get; set; } = 16; } }