using System; using System.Collections.Generic; using System.ComponentModel; using System.Linq; using System.Linq.Expressions; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Sharp7.Rx.Resources; namespace Sharp7.Rx.Extensions { internal static class ObservableExtensions { public static IObservable Select(this IObservable source, Func selector) { return source .Select(x => Observable.FromAsync(async () => await selector(x))) .Concat(); } public static IObservable Select(this IObservable source, Func> selector) { return source .Select(x => Observable.FromAsync(async () => await selector(x))) .Concat(); } public static IObservable LogAndRetry(this IObservable source, ILogger logger, string message) { return source .Do( _ => { }, ex => logger?.LogError(ex, message)) .Retry(); } public static IObservable RetryAfterDelay( this IObservable source, TimeSpan retryDelay, int retryCount = -1, IScheduler scheduler = null) { return RedoAfterDelay(source, retryDelay, retryCount, scheduler, Observable.Retry, Observable.Retry); } public static IObservable RepeatAfterDelay( this IObservable source, TimeSpan retryDelay, int repeatCount = -1, IScheduler scheduler = null) { return RedoAfterDelay(source, retryDelay, repeatCount, scheduler, Observable.Repeat, Observable.Repeat); } public static IObservable LogAndRetryAfterDelay( this IObservable source, ILogger logger, TimeSpan retryDelay, string message, int retryCount = -1, IScheduler scheduler = null) { var sourceLogged = source .Do( _ => { }, ex => logger?.LogError(ex, message)); return RetryAfterDelay(sourceLogged, retryDelay, retryCount, scheduler); } private static IObservable RedoAfterDelay(IObservable source, TimeSpan retryDelay, int retryCount, IScheduler scheduler, Func, IObservable> reDo, Func, int, IObservable> reDoCount) { scheduler = scheduler ?? TaskPoolScheduler.Default; var attempt = 0; var deferedObs = Observable.Defer(() => ((++attempt == 1) ? source : source.DelaySubscription(retryDelay, scheduler))); return retryCount > 0 ? reDoCount(deferedObs, retryCount) : reDo(deferedObs); } public static IObservable DisposeMany(this IObservable source) { return Observable.Create(obs => { var serialDisposable = new SerialDisposable(); var subscription = source.Subscribe( item => { serialDisposable.Disposable = item as IDisposable; obs.OnNext(item); }, obs.OnError, obs.OnCompleted); return new CompositeDisposable(serialDisposable, subscription); }); } public static IObservable SelectMany(this IObservable source, Func selector) { return source.SelectMany(async item => { await selector(item); return Unit.Default; }); } } }