mirror of
https://github.com/evopro-ag/Sharp7Reactive.git
synced 2025-12-16 03:42:51 +00:00
Added multivar create notification
This commit is contained in:
@@ -1,16 +1,21 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Reactive;
|
||||
using System.Reactive.Disposables;
|
||||
using System.Reactive.Linq;
|
||||
using System.Reactive.Subjects;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Sharp7.Rx.Basics;
|
||||
using Sharp7.Rx.Enums;
|
||||
using Sharp7.Rx.Extensions;
|
||||
using Sharp7.Rx.Interfaces;
|
||||
using Sharp7.Rx.Resources;
|
||||
using Sharp7.Rx.Settings;
|
||||
|
||||
namespace Sharp7.Rx
|
||||
{
|
||||
@@ -19,16 +24,26 @@ namespace Sharp7.Rx
|
||||
private readonly string ipAddress;
|
||||
private readonly int rackNumber;
|
||||
private readonly int cpuMpiAddress;
|
||||
private readonly S7VariableNameParser varaibleNameParser;
|
||||
private readonly int port;
|
||||
private readonly IS7VariableNameParser varaibleNameParser;
|
||||
private bool disposed;
|
||||
private ISubject<Unit> disposingSubject = new Subject<Unit>();
|
||||
private IS7Connector s7Connector;
|
||||
private readonly PlcConnectionSettings plcConnectionSettings;
|
||||
private readonly ConcurrentSubjectDictionary<string, byte[]> multiVariableSubscriptions = new ConcurrentSubjectDictionary<string, byte[]>(StringComparer.InvariantCultureIgnoreCase);
|
||||
protected readonly CompositeDisposable Disposables = new CompositeDisposable();
|
||||
private readonly List<long> performanceCoutner = new List<long>(1000);
|
||||
|
||||
public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress)
|
||||
|
||||
|
||||
public Sharp7Plc(string ipAddress, int rackNumber, int cpuMpiAddress, int port = 102)
|
||||
{
|
||||
this.ipAddress = ipAddress;
|
||||
this.rackNumber = rackNumber;
|
||||
this.cpuMpiAddress = cpuMpiAddress;
|
||||
this.port = port;
|
||||
|
||||
plcConnectionSettings = new PlcConnectionSettings(){IpAddress = ipAddress, RackNumber = rackNumber, CpuMpiAddress = cpuMpiAddress, Port = port};
|
||||
|
||||
varaibleNameParser = new S7VariableNameParser();
|
||||
}
|
||||
@@ -37,10 +52,13 @@ namespace Sharp7.Rx
|
||||
|
||||
public async Task<bool> InitializeAsync()
|
||||
{
|
||||
s7Connector = new Sharp7Connector(ipAddress, rackNumber, cpuMpiAddress);
|
||||
s7Connector = new Sharp7Connector(plcConnectionSettings, varaibleNameParser);
|
||||
ConnectionState = s7Connector.ConnectionState;
|
||||
|
||||
await s7Connector.InitializeAsync();
|
||||
|
||||
RunNotifications(s7Connector, TimeSpan.FromMilliseconds(100))
|
||||
.AddDisposableTo(Disposables);
|
||||
|
||||
#pragma warning disable 4014
|
||||
Task.Run(async () =>
|
||||
@@ -64,6 +82,72 @@ namespace Sharp7.Rx
|
||||
return GetValue<TValue>(variableName, CancellationToken.None);
|
||||
}
|
||||
|
||||
private TValue ConvertToType<TValue>(byte[] buffer, S7VariableAddress address)
|
||||
{
|
||||
if (typeof(TValue) == typeof(bool))
|
||||
{
|
||||
return (TValue) (object) Convert.ToBoolean(buffer[0] & (1 << address.Bit));
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(int))
|
||||
{
|
||||
if (address.Length == 2)
|
||||
return (TValue)(object)((buffer[0] << 8) + buffer[1]);
|
||||
if (address.Length == 4)
|
||||
{
|
||||
Array.Reverse(buffer);
|
||||
return (TValue)(object)BitConverter.ToInt32(buffer,0);
|
||||
}
|
||||
|
||||
throw new InvalidOperationException($"length must be 2 or 4 but is {address.Length}");
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(long))
|
||||
{
|
||||
Array.Reverse(buffer);
|
||||
return (TValue)(object)BitConverter.ToInt64(buffer,0);
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(ulong))
|
||||
{
|
||||
Array.Reverse(buffer);
|
||||
return (TValue)(object)BitConverter.ToUInt64(buffer, 0);
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(short))
|
||||
{
|
||||
return (TValue)(object)(short)((buffer[0] << 8) + buffer[1]);
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(byte) || typeof(TValue) == typeof(char))
|
||||
{
|
||||
return (TValue)(object)buffer[0];
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(byte[]))
|
||||
{
|
||||
return (TValue)(object)buffer;
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(double) || typeof(TValue) == typeof(float))
|
||||
{
|
||||
var d = BitConverter.ToSingle(buffer.Reverse().ToArray(),0);
|
||||
return (TValue)(object)d;
|
||||
}
|
||||
|
||||
if (typeof(TValue) == typeof(string))
|
||||
if (address.Type == DbType.String)
|
||||
{
|
||||
return (TValue) (object) Encoding.ASCII.GetString(buffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
return (TValue) (object) Encoding.ASCII.GetString(buffer).Trim();
|
||||
}
|
||||
|
||||
throw new InvalidOperationException(string.Format("type '{0}' not supported.", typeof(TValue)));
|
||||
}
|
||||
|
||||
public async Task<TValue> GetValue<TValue>(string variableName, CancellationToken token)
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
@@ -235,44 +319,30 @@ namespace Sharp7.Rx
|
||||
}
|
||||
}
|
||||
|
||||
public IObservable<TValue> CreateNotification<TValue>(string variableName, TransmissionMode transmissionMode, TimeSpan cycle)
|
||||
public IObservable<TValue> CreateNotification<TValue>(string variableName, TransmissionMode transmissionMode, TimeSpan cycleTime)
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName));
|
||||
return Observable.Create<TValue>(observer =>
|
||||
{
|
||||
var address = varaibleNameParser.Parse(variableName);
|
||||
if (address == null) throw new ArgumentException("Input variable name is not valid", nameof(variableName));
|
||||
|
||||
if (cycle < TimeSpan.FromMilliseconds(100))
|
||||
cycle = TimeSpan.FromMilliseconds(100);
|
||||
var disposables = new CompositeDisposable();
|
||||
var disposeableContainer = multiVariableSubscriptions.GetOrCreateObservable(variableName);
|
||||
disposeableContainer.AddDisposableTo(disposables);
|
||||
|
||||
var notification = ConnectionState.FirstAsync().Select(states => states == Enums.ConnectionState.Connected)
|
||||
.SelectMany(async connected =>
|
||||
{
|
||||
var value = default(TValue);
|
||||
if (connected)
|
||||
{
|
||||
value = await GetValue<TValue>(variableName, CancellationToken.None);
|
||||
}
|
||||
var observable = disposeableContainer.Observable
|
||||
.Select(bytes => ConvertToType<TValue>(bytes, address));
|
||||
|
||||
return new
|
||||
{
|
||||
HasValue = connected,
|
||||
Value = value
|
||||
};
|
||||
})
|
||||
.RepeatAfterDelay(cycle)
|
||||
.LogAndRetryAfterDelay(s7Connector.Logger, cycle, StringResources.StrLogErrorReadingDataFromPlc)
|
||||
.TakeUntil(disposingSubject)
|
||||
.Where(union => union.HasValue)
|
||||
.Select(union => union.Value);
|
||||
if (transmissionMode == TransmissionMode.OnChange)
|
||||
observable = observable.DistinctUntilChanged();
|
||||
|
||||
if (transmissionMode == TransmissionMode.Cyclic)
|
||||
return notification;
|
||||
observable.Subscribe(observer)
|
||||
.AddDisposableTo(disposables);
|
||||
|
||||
if (transmissionMode == TransmissionMode.OnChange)
|
||||
return notification.DistinctUntilChanged();
|
||||
|
||||
throw new ArgumentException("Transmission mode can either be Cyclic or OnChange", nameof(transmissionMode));
|
||||
return disposables;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Dispose(true);
|
||||
@@ -284,6 +354,8 @@ namespace Sharp7.Rx
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
Disposables.Dispose();
|
||||
|
||||
if (disposingSubject != null)
|
||||
{
|
||||
disposingSubject.OnNext(Unit.Default);
|
||||
@@ -308,5 +380,62 @@ namespace Sharp7.Rx
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
|
||||
private IDisposable RunNotifications(IS7Connector connector, TimeSpan cycle)
|
||||
{
|
||||
return ConnectionState.FirstAsync()
|
||||
.Select(states => states == Enums.ConnectionState.Connected)
|
||||
.SelectMany(connected => GetAllValues(connected, connector))
|
||||
.RepeatAfterDelay(cycle)
|
||||
.LogAndRetryAfterDelay(s7Connector.Logger, cycle, "Error while getting batch notifications from plc")
|
||||
.TakeUntil(disposingSubject)
|
||||
.Subscribe();
|
||||
}
|
||||
|
||||
private async Task<Unit> GetAllValues(bool connected, IS7Connector connector)
|
||||
{
|
||||
if (!connected)
|
||||
return Unit.Default;
|
||||
|
||||
if (multiVariableSubscriptions.ExistingKeys.IsEmpty())
|
||||
return Unit.Default;
|
||||
|
||||
var stopWatch = Stopwatch.StartNew();
|
||||
foreach (var partsOfMultiVarRequest in multiVariableSubscriptions.ExistingKeys.Buffer(MultiVarRequestMaxItems))
|
||||
{
|
||||
var multiVarRequest = await connector.ExecuteMultiVarRequest(partsOfMultiVarRequest);
|
||||
|
||||
foreach (var pair in multiVarRequest)
|
||||
{
|
||||
if (multiVariableSubscriptions.TryGetObserver(pair.Key, out var subject))
|
||||
{
|
||||
subject.OnNext(pair.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stopWatch.Stop();
|
||||
performanceCoutner.Add(stopWatch.ElapsedMilliseconds);
|
||||
|
||||
PrintAndResetPerformanceStatistik();
|
||||
|
||||
return Unit.Default;
|
||||
}
|
||||
|
||||
private void PrintAndResetPerformanceStatistik()
|
||||
{
|
||||
if (performanceCoutner.Count == performanceCoutner.Capacity)
|
||||
{
|
||||
var average = performanceCoutner.Average();
|
||||
var min = performanceCoutner.Min();
|
||||
var max = performanceCoutner.Max();
|
||||
|
||||
s7Connector.Logger.LogInformation("Performance statistic during {0} elements of plc notification. Min: {1}, Max: {2}, Average: {3}, Plc: '{4}', Number of variables: {5}, Batch size: {6}", performanceCoutner.Capacity, min, max, average, plcConnectionSettings.IpAddress, multiVariableSubscriptions.ExistingKeys.Count(),
|
||||
MultiVarRequestMaxItems);
|
||||
performanceCoutner.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
public int MultiVarRequestMaxItems { get; set; } = 16;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user