using System; using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Sharp7.Rx.Enums; using Sharp7.Rx.Interfaces; namespace Sharp7.Rx.Extensions { public static class PlcExtensions { public static IObservable CreateDatatransferWithHandshake(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func> readData, bool initialTransfer) { return Observable.Create(async observer => { var subscriptions = new CompositeDisposable(); var notification = plc .CreateNotification(triggerAddress, TransmissionMode.OnChange, TimeSpan.Zero) .Publish() .RefCount(); if (initialTransfer) { await plc.ConnectionState.FirstAsync(state => state == ConnectionState.Connected).ToTask(); var initialValue = await ReadData(plc, readData); observer.OnNext(initialValue); } notification .Where(trigger => trigger) .SelectMany(_ => ReadDataAndAcknowlodge(plc, readData, ackTriggerAddress)) .Subscribe(observer) .AddDisposableTo(subscriptions); notification .Where(trigger => !trigger) .SelectMany(async _ => { await plc.SetValue(ackTriggerAddress, false); return Unit.Default; }) .Subscribe() .AddDisposableTo(subscriptions); return subscriptions; }); } public static IObservable CreateDatatransferWithHandshake(this IPlc plc, string triggerAddress, string ackTriggerAddress, Func> readData) { return CreateDatatransferWithHandshake(plc, triggerAddress, ackTriggerAddress, readData, false); } private static async Task ReadData(IPlc plc, Func> receiveData) { return await receiveData(plc); } private static async Task ReadDataAndAcknowlodge(IPlc plc, Func> readData, string ackTriggerAddress) { try { return await ReadData(plc, readData); } finally { await plc.SetValue(ackTriggerAddress, true); } } } }