This commit is contained in:
Federico Barresi
2019-11-21 14:43:03 +01:00
3 changed files with 9 additions and 25 deletions

View File

@@ -16,20 +16,6 @@ namespace Sharp7.Rx.Extensions
{ {
internal static class ObservableExtensions internal static class ObservableExtensions
{ {
public static IObservable<Unit> Select<TSource>(this IObservable<TSource> source, Func<TSource, Task> selector)
{
return source
.Select(x => Observable.FromAsync(async () => await selector(x)))
.Concat();
}
public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector)
{
return source
.Select(x => Observable.FromAsync(async () => await selector(x)))
.Concat();
}
public static IObservable<T> LogAndRetry<T>(this IObservable<T> source, ILogger logger, string message) public static IObservable<T> LogAndRetry<T>(this IObservable<T> source, ILogger logger, string message)
{ {
return source return source
@@ -101,14 +87,5 @@ namespace Sharp7.Rx.Extensions
return new CompositeDisposable(serialDisposable, subscription); return new CompositeDisposable(serialDisposable, subscription);
}); });
} }
public static IObservable<Unit> SelectMany<T>(this IObservable<T> source, Func<T, Task> selector)
{
return source.SelectMany(async item =>
{
await selector(item);
return Unit.Default;
});
}
} }
} }

View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Reactive;
using System.Reactive.Disposables; using System.Reactive.Disposables;
using System.Reactive.Linq; using System.Reactive.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -28,13 +29,17 @@ namespace Sharp7.Rx.Extensions
notification notification
.Where(trigger => trigger) .Where(trigger => trigger)
.Select(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress)) .SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress))
.Subscribe(observer) .Subscribe(observer)
.AddDisposableTo(subscriptions); .AddDisposableTo(subscriptions);
notification notification
.Where(trigger => !trigger) .Where(trigger => !trigger)
.Select(_ => plc.SetValue(ackTriggerAddress, false)) .SelectMany(async _ =>
{
await plc.SetValue(ackTriggerAddress, false);
return Unit.Default;
})
.Subscribe() .Subscribe()
.AddDisposableTo(subscriptions); .AddDisposableTo(subscriptions);

View File

@@ -10,6 +10,8 @@
<Description>Reactive framework for Sharp7, the Ethernet S7 PLC communication suite</Description> <Description>Reactive framework for Sharp7, the Ethernet S7 PLC communication suite</Description>
<PackageProjectUrl>https://github.com/evopro-ag/Sharp7Reactive</PackageProjectUrl> <PackageProjectUrl>https://github.com/evopro-ag/Sharp7Reactive</PackageProjectUrl>
<PackageLicenseUrl>https://raw.githubusercontent.com/evopro-ag/Sharp7Reactive/master/LICENSE</PackageLicenseUrl> <PackageLicenseUrl>https://raw.githubusercontent.com/evopro-ag/Sharp7Reactive/master/LICENSE</PackageLicenseUrl>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>