diff --git a/Sharp7.Rx/Extensions/ObservableExtensions.cs b/Sharp7.Rx/Extensions/ObservableExtensions.cs index 0b98c69..052b6ee 100644 --- a/Sharp7.Rx/Extensions/ObservableExtensions.cs +++ b/Sharp7.Rx/Extensions/ObservableExtensions.cs @@ -16,20 +16,6 @@ namespace Sharp7.Rx.Extensions { internal static class ObservableExtensions { - public static IObservable Select(this IObservable source, Func selector) - { - return source - .Select(x => Observable.FromAsync(async () => await selector(x))) - .Concat(); - } - - public static IObservable Select(this IObservable source, Func> selector) - { - return source - .Select(x => Observable.FromAsync(async () => await selector(x))) - .Concat(); - } - public static IObservable LogAndRetry(this IObservable source, ILogger logger, string message) { return source @@ -101,14 +87,5 @@ namespace Sharp7.Rx.Extensions return new CompositeDisposable(serialDisposable, subscription); }); } - - public static IObservable SelectMany(this IObservable source, Func selector) - { - return source.SelectMany(async item => - { - await selector(item); - return Unit.Default; - }); - } } } \ No newline at end of file diff --git a/Sharp7.Rx/Extensions/PlcExtensions.cs b/Sharp7.Rx/Extensions/PlcExtensions.cs index 23cbcf5..88259cf 100644 --- a/Sharp7.Rx/Extensions/PlcExtensions.cs +++ b/Sharp7.Rx/Extensions/PlcExtensions.cs @@ -1,4 +1,5 @@ using System; +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Threading.Tasks; @@ -28,13 +29,17 @@ namespace Sharp7.Rx.Extensions notification .Where(trigger => trigger) - .Select(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress)) + .SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress)) .Subscribe(observer) .AddDisposableTo(subscriptions); notification .Where(trigger => !trigger) - .Select(_ => plc.SetValue(ackTriggerAddress, false)) + .SelectMany(async _ => + { + await plc.SetValue(ackTriggerAddress, false); + return Unit.Default; + }) .Subscribe() .AddDisposableTo(subscriptions);