Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow state watch to debug mode #1975

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
77 changes: 63 additions & 14 deletions Bonsai.Core/Expressions/InspectBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Bonsai.Expressions
{
/// <summary>
/// Represents an expression builder that replays the latest notification from all the
/// Represents an expression builder that monitors the notifications from all the
/// subscriptions made to its decorated builder.
/// </summary>
public sealed class InspectBuilder : ExpressionBuilder, INamedElement
Expand All @@ -23,7 +23,7 @@ public sealed class InspectBuilder : ExpressionBuilder, INamedElement
/// specified expression builder.
/// </summary>
/// <param name="builder">
/// The expression builder whose notifications will be replayed by this inspector.
/// The expression builder whose notifications will be monitored by this inspector.
/// </param>
public InspectBuilder(ExpressionBuilder builder)
: base(builder, decorator: false)
Expand Down Expand Up @@ -101,6 +101,12 @@ internal IReadOnlyList<VisualizerMapping> VisualizerMappings
/// </summary>
public IObservable<Exception> ErrorEx { get; private set; }

/// <summary>
/// Gets an observable sequence that multicasts watch notifications from all
/// the subscriptions made to the output of the decorated expression builder.
/// </summary>
public IObservable<IObservable<WatchNotification>> Watch { get; private set; }

/// <summary>
/// Gets the range of input arguments that the decorated expression builder accepts.
/// </summary>
Expand Down Expand Up @@ -142,6 +148,7 @@ public override Expression Build(IEnumerable<Expression> arguments)
if (VisualizerElement != null)
{
Output = VisualizerElement.Output;
Watch = VisualizerElement.Watch;
ErrorEx = Observable.Empty<Exception>();
VisualizerElement = BuildVisualizerElement(VisualizerElement, VisualizerMappings);
return source;
Expand All @@ -162,6 +169,7 @@ public override Expression Build(IEnumerable<Expression> arguments)
else
{
Output = Observable.Empty<IObservable<object>>();
Watch = Observable.Empty<IObservable<WatchNotification>>();
ErrorEx = Observable.Empty<Exception>();
if (VisualizerElement != null)
{
Expand Down Expand Up @@ -241,17 +249,15 @@ methodCall.Arguments[0] is MethodCallExpression lazy &&
return null;
}

ReplaySubject<IObservable<TSource>> CreateInspectorSubject<TSource>()
ReplaySubject<Inspector<TSource>> CreateInspectorSubject<TSource>()
{
var subject = new ReplaySubject<IObservable<TSource>>(1);
Output = subject.Select(ys => ys.Select(xs => (object)xs));
var subject = new ReplaySubject<Inspector<TSource>>(1);
Output = subject.Select(ys => ys.Output);
#pragma warning disable CS0612 // Type or member is obsolete
Error = subject.Merge().IgnoreElements().Select(xs => Unit.Default);
Error = subject.SelectMany(ys => ys.Error);
#pragma warning restore CS0612 // Type or member is obsolete
ErrorEx = subject.SelectMany(xs => xs
.IgnoreElements()
.Select(x => default(Exception))
.Catch<Exception, Exception>(ex => Observable.Return(ex)));
ErrorEx = subject.SelectMany(xs => xs.ErrorEx);
Watch = subject.Select(xs => xs.Watch);
return subject;
}

Expand All @@ -260,13 +266,13 @@ IObservable<TSource> Process<TSource>(IObservable<TSource> source)
return source;
}

IObservable<TSource> Process<TSource>(IObservable<TSource> source, ReplaySubject<IObservable<TSource>> subject)
IObservable<TSource> Process<TSource>(IObservable<TSource> source, ReplaySubject<Inspector<TSource>> subject)
{
return Observable.Create<TSource>(observer =>
{
var sourceInspector = new Subject<TSource>();
var sourceInspector = new Inspector<TSource>();
subject.OnNext(sourceInspector);
var subscription = source.Do(sourceInspector).SubscribeSafe(observer);
var subscription = source.Do(sourceInspector.Subject).SubscribeSafe(observer);
return Disposable.Create(() =>
{
try { subscription.Dispose(); }
Expand All @@ -275,11 +281,54 @@ IObservable<TSource> Process<TSource>(IObservable<TSource> source, ReplaySubject
{
throw new WorkflowRuntimeException(ex.Message, this, ex);
}
finally { sourceInspector.OnCompleted(); }
finally
{
if (!sourceInspector.Subject.HasTerminated)
{
sourceInspector.IsCanceled = true;
sourceInspector.Subject.OnCompleted();
}
}
});
});
}

class Inspector<T>
{
public InspectSubject<T> Subject { get; } = new();

public bool IsCanceled { get; internal set; }

public IObservable<object> Output => Subject.Select(value => (object)value);

public IObservable<Unit> Error => Subject.IgnoreElements().Select(xs => Unit.Default);

public IObservable<Exception> ErrorEx => Subject
.IgnoreElements()
.Select(x => default(Exception))
.Catch<Exception, Exception>(ex => Observable.Return(ex));

public IObservable<WatchNotification> Watch =>
Observable.Create<WatchNotification>(observer =>
{
observer.OnNext(WatchNotification.Subscribe);
var notificationObserver = Observer.Create<T>(
value => observer.OnNext(WatchNotification.OnNext),
error =>
{
observer.OnNext(WatchNotification.OnError);
observer.OnNext(WatchNotification.Unsubscribe);
},
() =>
{
if (!IsCanceled)
observer.OnNext(WatchNotification.OnCompleted);
observer.OnNext(WatchNotification.Unsubscribe);
});
return Subject.SubscribeSafe(notificationObserver);
});
}

class VisualizerMappingList
{
readonly SortedList<int, VisualizerMapping> localMappings = new SortedList<int, VisualizerMapping>();
Expand Down
Loading