diff --git a/Sharp7.Rx/Extensions/ObservableExtensions.cs b/Sharp7.Rx/Extensions/ObservableExtensions.cs index b93038f..0b98c69 100644 --- a/Sharp7.Rx/Extensions/ObservableExtensions.cs +++ b/Sharp7.Rx/Extensions/ObservableExtensions.cs @@ -1,24 +1,114 @@ using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; +using System.Linq.Expressions; using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Sharp7.Rx.Resources; namespace Sharp7.Rx.Extensions { - internal static class ObservableExtensions - { - public static IObservable Select(this IObservable source, Func selector) - { - return source - .Select(x => Observable.FromAsync(async () => await selector(x))) - .Concat(); - } + internal static class ObservableExtensions + { + public static IObservable Select(this IObservable source, Func selector) + { + return source + .Select(x => Observable.FromAsync(async () => await selector(x))) + .Concat(); + } - public static IObservable Select(this IObservable source, Func> selector) - { - return source - .Select(x => Observable.FromAsync(async () => await selector(x))) - .Concat(); - } - } -} + public static IObservable Select(this IObservable source, Func> selector) + { + return source + .Select(x => Observable.FromAsync(async () => await selector(x))) + .Concat(); + } + + public static IObservable LogAndRetry(this IObservable source, ILogger logger, string message) + { + return source + .Do( + _ => { }, + ex => logger?.LogError(ex, message)) + .Retry(); + } + + public static IObservable RetryAfterDelay( + this IObservable source, + TimeSpan retryDelay, + int retryCount = -1, + IScheduler scheduler = null) + { + return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry); + } + + public static IObservable RepeatAfterDelay( + this IObservable source, + TimeSpan retryDelay, + int repeatCount = -1, + IScheduler scheduler = null) + { + return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat); + } + + public static IObservable LogAndRetryAfterDelay( + this IObservable source, + ILogger logger, + TimeSpan retryDelay, + string message, + int retryCount = -1, + IScheduler scheduler = null) + { + var sourceLogged = + source + .Do( + _ => { }, + ex => logger?.LogError(ex, message)); + + return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler); + } + + private static IObservable RedoAfterDelay(IObservable source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func, IObservable> reDo, + Func, int, IObservable> reDoCount) + { + scheduler = scheduler ?? TaskPoolScheduler.Default; + var attempt = 0; + + var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler))); + return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs); + } + + public static IObservable DisposeMany(this IObservable source) + { + return Observable.Create(obs => + { + var serialDisposable = new SerialDisposable(); + var subscription = + source.Subscribe( + item => + { + serialDisposable.Disposable = item as IDisposable; + obs.OnNext(item); + }, + obs.OnError, + obs.OnCompleted); + return new CompositeDisposable(serialDisposable, subscription); + }); + } + + public static IObservable SelectMany(this IObservable source, Func selector) + { + return source.SelectMany(async item => + { + await selector(item); + return Unit.Default; + }); + } + } +} \ No newline at end of file diff --git a/Sharp7.Rx/Interfaces/IPlc.cs b/Sharp7.Rx/Interfaces/IPlc.cs index cff12eb..ad83374 100644 --- a/Sharp7.Rx/Interfaces/IPlc.cs +++ b/Sharp7.Rx/Interfaces/IPlc.cs @@ -9,5 +9,6 @@ namespace Sharp7.Rx.Interfaces IObservable CreateNotification(string variableName, TransmissionMode transmissionMode, TimeSpan cycleSpan); Task SetValue(string variableName, TValue value); Task GetValue(string variableName); + IObservable ConnectionState { get; } } } diff --git a/Sharp7.Rx/Interfaces/IS7Connector.cs b/Sharp7.Rx/Interfaces/IS7Connector.cs index 28468ae..43d393b 100644 --- a/Sharp7.Rx/Interfaces/IS7Connector.cs +++ b/Sharp7.Rx/Interfaces/IS7Connector.cs @@ -1,6 +1,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; using Sharp7.Rx.Enums; namespace Sharp7.Rx.Interfaces @@ -18,5 +19,6 @@ namespace Sharp7.Rx.Interfaces Task WriteBit(Operand operand, ushort startByteAddress, byte bitAdress, bool value, ushort dbNr, CancellationToken token); Task WriteBytes(Operand operand, ushort startByteAdress, byte[] data, ushort dBNr, CancellationToken token); + ILogger Logger { get; } } } \ No newline at end of file diff --git a/Sharp7.Rx/Sharp7.Rx.csproj b/Sharp7.Rx/Sharp7.Rx.csproj index 305f24b..adc56e5 100644 --- a/Sharp7.Rx/Sharp7.Rx.csproj +++ b/Sharp7.Rx/Sharp7.Rx.csproj @@ -13,6 +13,7 @@ + diff --git a/Sharp7.Rx/Sharp7Connector.cs b/Sharp7.Rx/Sharp7Connector.cs index eb70569..76009bd 100644 --- a/Sharp7.Rx/Sharp7Connector.cs +++ b/Sharp7.Rx/Sharp7Connector.cs @@ -4,7 +4,9 @@ using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; using Sharp7.Rx.Enums; +using Sharp7.Rx.Extensions; using Sharp7.Rx.Interfaces; using Sharp7.Rx.Resources; @@ -18,17 +20,21 @@ namespace Sharp7.Rx private readonly string ipAddress; private readonly int rackNr; private readonly int cpuSlotNr; + private readonly int port; - private S7Client sharp7; + private S7Client sharp7; private bool disposed; - public Sharp7Connector(string ipAddress, int rackNr = 0, int cpuSlotNr = 2) - { + public ILogger Logger { get; set; } + + public Sharp7Connector(string ipAddress, int rackNr = 0, int cpuSlotNr = 2, int port = 102) + { this.ipAddress = ipAddress; this.cpuSlotNr = cpuSlotNr; - this.rackNr = rackNr; + this.port = port; + this.rackNr = rackNr; - ReconnectDelay = TimeSpan.FromSeconds(5); + ReconnectDelay = TimeSpan.FromSeconds(5); } public TimeSpan ReconnectDelay { get; set; } @@ -76,22 +82,23 @@ namespace Sharp7.Rx try { sharp7 = new S7Client(); + sharp7.PLCPort = this.port; - var subscription = + var subscription = ConnectionState .Where(state => state == Enums.ConnectionState.ConnectionLost) .Take(1) .SelectMany(_ => Reconnect()) - // TODO: .RepeatAfterDelay(ReconnectDelay) - // TODO: .LogAndRetry(logger, "Error while reconnecting to S7.") + .RepeatAfterDelay(ReconnectDelay) + .LogAndRetry(Logger, "Error while reconnecting to S7.") .Subscribe(); disposables.Add(subscription); } catch (Exception ex) { - // TODO: - } + Logger?.LogError(ex, StringResources.StrErrorS7DriverCouldNotBeInitialized); + } return Task.FromResult(true); } diff --git a/Sharp7.Rx/Sharp7Plc.cs b/Sharp7.Rx/Sharp7Plc.cs index 493e471..f295047 100644 --- a/Sharp7.Rx/Sharp7Plc.cs +++ b/Sharp7.Rx/Sharp7Plc.cs @@ -5,7 +5,9 @@ using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Logging; using Sharp7.Rx.Enums; +using Sharp7.Rx.Extensions; using Sharp7.Rx.Interfaces; using Sharp7.Rx.Resources; @@ -247,8 +249,8 @@ namespace Sharp7.Rx Value = value }; }) - // TODO: .RepeatAfterDelay(cycle) - // TODO: .LogAndRetryAfterDelay(Logger, cycle, StringResources.StrLogErrorReadingDataFromPlc) + .RepeatAfterDelay(cycle) + .LogAndRetryAfterDelay(s7Connector.Logger, cycle, StringResources.StrLogErrorReadingDataFromPlc) .TakeUntil(disposingSubject) .Where(union => union.HasValue) .Select(union => union.Value);