mirror of
https://github.com/evopro-ag/Sharp7Reactive.git
synced 2025-12-16 11:42:52 +00:00
Use file scoped namespaces
This commit is contained in:
@@ -1,13 +1,11 @@
|
||||
using System;
|
||||
using System.Reactive.Disposables;
|
||||
using System.Reactive.Disposables;
|
||||
|
||||
namespace Sharp7.Rx.Extensions
|
||||
namespace Sharp7.Rx.Extensions;
|
||||
|
||||
internal static class DisposableExtensions
|
||||
{
|
||||
internal static class DisposableExtensions
|
||||
public static void AddDisposableTo(this IDisposable disposable, CompositeDisposable compositeDisposable)
|
||||
{
|
||||
public static void AddDisposableTo(this IDisposable disposable, CompositeDisposable compositeDisposable)
|
||||
{
|
||||
compositeDisposable.Add(disposable);
|
||||
}
|
||||
compositeDisposable.Add(disposable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,83 +1,81 @@
|
||||
using System;
|
||||
using System.Reactive.Concurrency;
|
||||
using System.Reactive.Concurrency;
|
||||
using System.Reactive.Disposables;
|
||||
using System.Reactive.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Sharp7.Rx.Extensions
|
||||
{
|
||||
internal static class ObservableExtensions
|
||||
{
|
||||
public static IObservable<T> DisposeMany<T>(this IObservable<T> source)
|
||||
{
|
||||
return Observable.Create<T>(obs =>
|
||||
{
|
||||
var serialDisposable = new SerialDisposable();
|
||||
var subscription =
|
||||
source.Subscribe(
|
||||
item =>
|
||||
{
|
||||
serialDisposable.Disposable = item as IDisposable;
|
||||
obs.OnNext(item);
|
||||
},
|
||||
obs.OnError,
|
||||
obs.OnCompleted);
|
||||
return new CompositeDisposable(serialDisposable, subscription);
|
||||
});
|
||||
}
|
||||
namespace Sharp7.Rx.Extensions;
|
||||
|
||||
public static IObservable<T> LogAndRetry<T>(this IObservable<T> source, ILogger logger, string message)
|
||||
internal static class ObservableExtensions
|
||||
{
|
||||
public static IObservable<T> DisposeMany<T>(this IObservable<T> source)
|
||||
{
|
||||
return Observable.Create<T>(obs =>
|
||||
{
|
||||
return source
|
||||
var serialDisposable = new SerialDisposable();
|
||||
var subscription =
|
||||
source.Subscribe(
|
||||
item =>
|
||||
{
|
||||
serialDisposable.Disposable = item as IDisposable;
|
||||
obs.OnNext(item);
|
||||
},
|
||||
obs.OnError,
|
||||
obs.OnCompleted);
|
||||
return new CompositeDisposable(serialDisposable, subscription);
|
||||
});
|
||||
}
|
||||
|
||||
public static IObservable<T> LogAndRetry<T>(this IObservable<T> source, ILogger logger, string message)
|
||||
{
|
||||
return source
|
||||
.Do(
|
||||
_ => { },
|
||||
ex => logger?.LogError(ex, message))
|
||||
.Retry();
|
||||
}
|
||||
|
||||
public static IObservable<T> LogAndRetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
ILogger logger,
|
||||
TimeSpan retryDelay,
|
||||
string message,
|
||||
int retryCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
var sourceLogged =
|
||||
source
|
||||
.Do(
|
||||
_ => { },
|
||||
ex => logger?.LogError(ex, message))
|
||||
.Retry();
|
||||
}
|
||||
ex => logger?.LogError(ex, message));
|
||||
|
||||
public static IObservable<T> LogAndRetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
ILogger logger,
|
||||
TimeSpan retryDelay,
|
||||
string message,
|
||||
int retryCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
var sourceLogged =
|
||||
source
|
||||
.Do(
|
||||
_ => { },
|
||||
ex => logger?.LogError(ex, message));
|
||||
|
||||
return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler);
|
||||
}
|
||||
|
||||
public static IObservable<T> RepeatAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int repeatCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat);
|
||||
}
|
||||
|
||||
public static IObservable<T> RetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int retryCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry);
|
||||
}
|
||||
|
||||
private static IObservable<T> RedoAfterDelay<T>(IObservable<T> source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func<IObservable<T>, IObservable<T>> reDo,
|
||||
Func<IObservable<T>, int, IObservable<T>> reDoCount)
|
||||
{
|
||||
scheduler = scheduler ?? TaskPoolScheduler.Default;
|
||||
var attempt = 0;
|
||||
|
||||
var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler)));
|
||||
return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs);
|
||||
}
|
||||
return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
public static IObservable<T> RepeatAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int repeatCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat);
|
||||
}
|
||||
|
||||
public static IObservable<T> RetryAfterDelay<T>(
|
||||
this IObservable<T> source,
|
||||
TimeSpan retryDelay,
|
||||
int retryCount = -1,
|
||||
IScheduler scheduler = null)
|
||||
{
|
||||
return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry);
|
||||
}
|
||||
|
||||
private static IObservable<T> RedoAfterDelay<T>(IObservable<T> source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func<IObservable<T>, IObservable<T>> reDo,
|
||||
Func<IObservable<T>, int, IObservable<T>> reDoCount)
|
||||
{
|
||||
scheduler = scheduler ?? TaskPoolScheduler.Default;
|
||||
var attempt = 0;
|
||||
|
||||
var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler)));
|
||||
return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,74 +1,71 @@
|
||||
using System;
|
||||
using System.Reactive;
|
||||
using System.Reactive;
|
||||
using System.Reactive.Disposables;
|
||||
using System.Reactive.Linq;
|
||||
using System.Reactive.Threading.Tasks;
|
||||
using System.Threading.Tasks;
|
||||
using Sharp7.Rx.Enums;
|
||||
using Sharp7.Rx.Interfaces;
|
||||
|
||||
namespace Sharp7.Rx.Extensions
|
||||
namespace Sharp7.Rx.Extensions;
|
||||
|
||||
public static class PlcExtensions
|
||||
{
|
||||
public static class PlcExtensions
|
||||
public static IObservable<TReturn> CreateDatatransferWithHandshake<TReturn>(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func<IPlc, Task<TReturn>> readData, bool initialTransfer)
|
||||
{
|
||||
public static IObservable<TReturn> CreateDatatransferWithHandshake<TReturn>(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func<IPlc, Task<TReturn>> readData, bool initialTransfer)
|
||||
return Observable.Create<TReturn>(async observer =>
|
||||
{
|
||||
return Observable.Create<TReturn>(async observer =>
|
||||
var subscriptions = new CompositeDisposable();
|
||||
|
||||
var notification = plc
|
||||
.CreateNotification<bool>(triggerAddress, TransmissionMode.OnChange)
|
||||
.Publish()
|
||||
.RefCount();
|
||||
|
||||
if (initialTransfer)
|
||||
{
|
||||
var subscriptions = new CompositeDisposable();
|
||||
await plc.ConnectionState.FirstAsync(state => state == ConnectionState.Connected).ToTask();
|
||||
var initialValue = await ReadData(plc, readData);
|
||||
observer.OnNext(initialValue);
|
||||
}
|
||||
|
||||
var notification = plc
|
||||
.CreateNotification<bool>(triggerAddress, TransmissionMode.OnChange)
|
||||
.Publish()
|
||||
.RefCount();
|
||||
notification
|
||||
.Where(trigger => trigger)
|
||||
.SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress))
|
||||
.Subscribe(observer)
|
||||
.AddDisposableTo(subscriptions);
|
||||
|
||||
if (initialTransfer)
|
||||
notification
|
||||
.Where(trigger => !trigger)
|
||||
.SelectMany(async _ =>
|
||||
{
|
||||
await plc.ConnectionState.FirstAsync(state => state == ConnectionState.Connected).ToTask();
|
||||
var initialValue = await ReadData(plc, readData);
|
||||
observer.OnNext(initialValue);
|
||||
}
|
||||
await plc.SetValue(ackTriggerAddress, false);
|
||||
return Unit.Default;
|
||||
})
|
||||
.Subscribe()
|
||||
.AddDisposableTo(subscriptions);
|
||||
|
||||
notification
|
||||
.Where(trigger => trigger)
|
||||
.SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress))
|
||||
.Subscribe(observer)
|
||||
.AddDisposableTo(subscriptions);
|
||||
return subscriptions;
|
||||
});
|
||||
}
|
||||
|
||||
notification
|
||||
.Where(trigger => !trigger)
|
||||
.SelectMany(async _ =>
|
||||
{
|
||||
await plc.SetValue(ackTriggerAddress, false);
|
||||
return Unit.Default;
|
||||
})
|
||||
.Subscribe()
|
||||
.AddDisposableTo(subscriptions);
|
||||
public static IObservable<TReturn> CreateDatatransferWithHandshake<TReturn>(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func<IPlc, Task<TReturn>> readData)
|
||||
{
|
||||
return CreateDatatransferWithHandshake(plc, triggerAddress, ackTriggerAddress, readData, false);
|
||||
}
|
||||
|
||||
return subscriptions;
|
||||
});
|
||||
}
|
||||
private static async Task<TReturn> ReadData<TReturn>(IPlc plc, Func<IPlc, Task<TReturn>> receiveData)
|
||||
{
|
||||
return await receiveData(plc);
|
||||
}
|
||||
|
||||
public static IObservable<TReturn> CreateDatatransferWithHandshake<TReturn>(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func<IPlc, Task<TReturn>> readData)
|
||||
private static async Task<TReturn> ReadDataAndAcknowlodge<TReturn>(IPlc plc, Func<IPlc, Task<TReturn>> readData, string ackTriggerAddress)
|
||||
{
|
||||
try
|
||||
{
|
||||
return CreateDatatransferWithHandshake(plc, triggerAddress, ackTriggerAddress, readData, false);
|
||||
return await ReadData(plc, readData);
|
||||
}
|
||||
|
||||
private static async Task<TReturn> ReadData<TReturn>(IPlc plc, Func<IPlc, Task<TReturn>> receiveData)
|
||||
finally
|
||||
{
|
||||
return await receiveData(plc);
|
||||
}
|
||||
|
||||
private static async Task<TReturn> ReadDataAndAcknowlodge<TReturn>(IPlc plc, Func<IPlc, Task<TReturn>> readData, string ackTriggerAddress)
|
||||
{
|
||||
try
|
||||
{
|
||||
return await ReadData(plc, readData);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await plc.SetValue(ackTriggerAddress, true);
|
||||
}
|
||||
await plc.SetValue(ackTriggerAddress, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user