mirror of
https://github.com/evopro-ag/Sharp7Reactive.git
synced 2025-12-16 11:42:52 +00:00
82 lines
2.7 KiB
C#
82 lines
2.7 KiB
C#
using System.Reactive.Concurrency;
|
|
using System.Reactive.Disposables;
|
|
using System.Reactive.Linq;
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
namespace Sharp7.Rx.Extensions;
|
|
|
|
internal static class ObservableExtensions
|
|
{
|
|
public static IObservable<T> DisposeMany<T>(this IObservable<T> source)
|
|
{
|
|
return Observable.Create<T>(obs =>
|
|
{
|
|
var serialDisposable = new SerialDisposable();
|
|
var subscription =
|
|
source.Subscribe(
|
|
item =>
|
|
{
|
|
serialDisposable.Disposable = item as IDisposable;
|
|
obs.OnNext(item);
|
|
},
|
|
obs.OnError,
|
|
obs.OnCompleted);
|
|
return new CompositeDisposable(serialDisposable, subscription);
|
|
});
|
|
}
|
|
|
|
public static IObservable<T> LogAndRetry<T>(this IObservable<T> source, ILogger logger, string message)
|
|
{
|
|
return source
|
|
.Do(
|
|
_ => { },
|
|
ex => logger?.LogError(ex, message))
|
|
.Retry();
|
|
}
|
|
|
|
public static IObservable<T> LogAndRetryAfterDelay<T>(
|
|
this IObservable<T> source,
|
|
ILogger logger,
|
|
TimeSpan retryDelay,
|
|
string message,
|
|
int retryCount = -1,
|
|
IScheduler scheduler = null)
|
|
{
|
|
var sourceLogged =
|
|
source
|
|
.Do(
|
|
_ => { },
|
|
ex => logger?.LogError(ex, message));
|
|
|
|
return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler);
|
|
}
|
|
|
|
public static IObservable<T> RepeatAfterDelay<T>(
|
|
this IObservable<T> source,
|
|
TimeSpan retryDelay,
|
|
int repeatCount = -1,
|
|
IScheduler scheduler = null)
|
|
{
|
|
return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat);
|
|
}
|
|
|
|
public static IObservable<T> RetryAfterDelay<T>(
|
|
this IObservable<T> source,
|
|
TimeSpan retryDelay,
|
|
int retryCount = -1,
|
|
IScheduler scheduler = null)
|
|
{
|
|
return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry);
|
|
}
|
|
|
|
private static IObservable<T> RedoAfterDelay<T>(IObservable<T> source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func<IObservable<T>, IObservable<T>> reDo,
|
|
Func<IObservable<T>, int, IObservable<T>> reDoCount)
|
|
{
|
|
scheduler = scheduler ?? TaskPoolScheduler.Default;
|
|
var attempt = 0;
|
|
|
|
var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler)));
|
|
return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs);
|
|
}
|
|
}
|