Skip to content

Commit

Permalink
Merge pull request #7 from LittleLittleCloud/u/fix#1
Browse files Browse the repository at this point in the history
refactor over workflow engine
  • Loading branch information
LittleLittleCloud authored Sep 11, 2024
2 parents 3f61560 + c367ba4 commit bce08f5
Show file tree
Hide file tree
Showing 15 changed files with 654 additions and 376 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/dotnet-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
build:
name: Build
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v4
- name: Setup .NET
Expand All @@ -35,5 +36,5 @@ jobs:
echo "Build"
dotnet build --no-restore --configuration Debug -bl /p:SignAssembly=true
- name: Unit Test
run: dotnet test --no-build -bl --configuration Debug -m:1
run: dotnet test --no-build -bl --configuration Debug -m:3 -v d

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# StepWise
</div>

StepWise is a powerful and flexible C# library for defining and executing workflows. It allows you to break down complex processes into manageable steps, define dependencies between them, and execute them in the correct order.
StepWise is a powerful and flexible C# library for defining and executing workflows. It allows you to break down complex workflows into manageable steps, define dependencies between them, and execute them in the correct order.

## Features

Expand Down
21 changes: 6 additions & 15 deletions example/CodeInterpreter/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
builder.AddConsole();
});

var logger = loggerFactory.CreateLogger<WorkflowEngine>();
var logger = loggerFactory.CreateLogger<StepWiseEngine>();

// Follow the configuration instruction on setting up dotnet interactive and python kernel
// https://github.com/LittleLittleCloud/code-interpreter-workflow?tab=readme-ov-file#pre-requisite
Expand All @@ -35,30 +35,21 @@
.RegisterMessageConnector();

var codeInterpreter = new Workflow(agent, kernel);
var engine = WorkflowEngine.CreateFromInstance(codeInterpreter, maxConcurrency: 1, logger);
var engine = StepWiseEngine.CreateFromInstance(codeInterpreter, maxConcurrency: 1, logger);

var task = "use python to switch my system to dark mode";
var input = new Dictionary<string, object>
var input = new Dictionary<string, StepVariable>
{
["task"] = task
["task"] = StepVariable.Create(task),
};

await foreach ((var stepName, var value) in engine.ExecuteStepAsync(nameof(Workflow.GenerateReply), input))
await foreach (var stepResult in engine.ExecuteAsync(nameof(Workflow.GenerateReply), input))
{
if (stepName == nameof(Workflow.GenerateReply) && value is string reply)
if (stepResult.StepName == nameof(Workflow.GenerateReply) && stepResult.Result?.As<string>() is string reply)
{
Console.WriteLine($"Final Reply: {reply}");
break;
}

Console.WriteLine($"Step {stepName} is completed");

var json = JsonSerializer.Serialize(value, new JsonSerializerOptions { WriteIndented = true });

Console.WriteLine($"""
Value:
{json}
""");
}

public class Workflow
Expand Down
21 changes: 5 additions & 16 deletions example/GetWeather/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
});

var getWeather = new Workflow();
var workflowEngine = WorkflowEngine.CreateFromInstance(getWeather, maxConcurrency: 3, loggerFactory.CreateLogger<WorkflowEngine>());

var input = new Dictionary<string, object>
var workflowEngine = StepWiseEngine.CreateFromInstance(getWeather, maxConcurrency: 3, loggerFactory.CreateLogger<StepWiseEngine>());
var input = new Dictionary<string, StepVariable>
{
{ "cities", new string[] { "Seattle", "Redmond" } }
{ "cities", StepVariable.Create(new string[] { "Seattle", "Redmond" }) }
};

await foreach( (var stepName, var value) in workflowEngine.ExecuteStepAsync(nameof(Workflow.GetWeatherAsync), input))
await foreach(var stepResult in workflowEngine.ExecuteAsync(nameof(Workflow.GetWeatherAsync), input))
{
if (stepName == nameof(Workflow.GetWeatherAsync) && value is Workflow.Weather[] weathers)
if (stepResult.StepName == nameof(Workflow.GetWeatherAsync) && stepResult.Result?.As<Workflow.Weather[]>() is Workflow.Weather[] weathers)
{
Console.WriteLine("Weather forecast:");
foreach (var weather in weathers)
Expand All @@ -27,17 +26,7 @@

break;
}

Console.WriteLine($"Step {stepName} is completed");

var json = JsonSerializer.Serialize(value, new JsonSerializerOptions { WriteIndented = true });

Console.WriteLine($"""
Value:
{json}
""");
}

public class Workflow
{
[Step]
Expand Down
33 changes: 33 additions & 0 deletions src/StepWise.Core/Extension/StepWiseEngineExtension.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace StepWise.Core.Extension;

public static class StepWiseEngineExtension
{
/// <summary>
/// Execute the workflow until the target step is reached or no further steps can be executed.
/// If the <paramref name="earlyStop"/> is true, the workflow will stop as soon as the target step is reached and completed.
/// Otherwise, the workflow will continue to execute until no further steps can be executed.
/// </summary>
public static async Task<TResult> ExecuteAsync<TResult>(
this IStepWiseEngine engine,
string targetStepName,
Dictionary<string, StepVariable>? inputs = null,
bool earlyStop = true,
int? maxSteps = null,
CancellationToken ct = default)
{
maxSteps ??= int.MaxValue;
await foreach (var stepResult in engine.ExecuteAsync(targetStepName, inputs, earlyStop, maxSteps, ct))
{
if (stepResult.Result != null && stepResult.StepName == targetStepName)
{
return stepResult.Result.As<TResult>() ?? throw new Exception($"Step '{targetStepName}' did not return the expected result type.");
}
}

throw new Exception($"Step '{targetStepName}' did not return the expected result type.");
}
}
16 changes: 16 additions & 0 deletions src/StepWise.Core/IStepWiseEngine.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

namespace StepWise.Core;

public interface IStepWiseEngine
{
/// <summary>
/// Execute the workflow until the target step is reached or no further steps can be executed.
/// If the <paramref name="earlyStop"/> is true, the workflow will stop as soon as the target step is reached and completed.
/// Otherwise, the workflow will continue to execute until no further steps can be executed.
IAsyncEnumerable<StepResult> ExecuteAsync(
string targetStep,
Dictionary<string, StepVariable>? inputs = null,
bool earlyStop = true,
int? maxSteps = null,
CancellationToken ct = default);
}
9 changes: 0 additions & 9 deletions src/StepWise.Core/IWorkflowEngine.cs

This file was deleted.

95 changes: 87 additions & 8 deletions src/StepWise.Core/Step.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,25 @@ public static Step CreateFromMethod(Delegate stepMethod)
public List<string> Dependencies { get; set; }
public Delegate StepMethod { get; set; }

public bool IsExecuctionConditionSatisfied(Dictionary<string, object> inputs)
public bool IsExecuctionConditionSatisfied(Dictionary<string, StepVariable> inputs)
{
foreach (var param in InputParameters)
{
if (param.HasDefaultValue)
{
continue;
}

if (!inputs.ContainsKey(param.SourceStep ?? param.Name))
{
return false;
}
}

return true;
}

private bool IsExecuctionConditionSatisfied(Dictionary<string, object> inputs)
{
foreach (var param in InputParameters)
{
Expand Down Expand Up @@ -126,31 +144,92 @@ public override string ToString()
}
}

public class StepBean
public class StepVariable
{
public StepVariable(int generation, object value)
{
Generation = generation;
Value = value;
}

public static StepVariable Create(object value, int generation = 0)
{
return new StepVariable(generation, value);
}

public int Generation { get; set; }

public object Value { get; set; }

/// <summary>
/// A convenient method to cast the result to the specified type.
/// </summary>
public T As<T>()
{
return (T)Value;
}
}

/// <summary>
/// The step run represents the minimal unit of execute a step.
/// It contains the step, the generation of the step, and the inputs(parameter) of the step.
/// </summary>
public class StepRun
{
private readonly Step _step;
private readonly int _generation = 0;
private readonly Dictionary<string, object> _inputs = new();
private readonly Dictionary<string, StepVariable> _inputs = new();

private StepBean(Step step, int generation, Dictionary<string, object> inputs)
private StepRun(Step step, int generation, Dictionary<string, StepVariable> inputs)
{
_step = step;
_generation = generation;
_inputs = inputs;
}

public static StepBean Create(Step step, int generation, Dictionary<string, object>? inputs = null)
public Step Step => _step;

public int Generation => _generation;

public Dictionary<string, StepVariable> Inputs => _inputs;

public static StepRun Create(Step step, int generation, Dictionary<string, StepVariable>? inputs = null)
{
return new StepBean(step, generation, inputs ?? new Dictionary<string, object>());
return new StepRun(step, generation, inputs ?? new Dictionary<string, StepVariable>());
}

public async Task<object?> ExecuteAsync(CancellationToken ct = default)
{
return await _step.ExecuteAsync(_inputs, ct);
return await _step.ExecuteAsync(_inputs.ToDictionary(kv => kv.Key, kv => kv.Value.Value), ct);
}

public override string ToString()
{
return $"{_step.Name} (gen: {_generation})";
// format [gen] stepName([gen]input1, [gen]input2, ...)
var inputs = string.Join(", ", _inputs.Select(kv => $"{kv.Key}[{_inputs[kv.Key].Generation}]"));
return $"{_step.Name}[{_generation}]({inputs})";
}
}

public class StepResult
{
public StepResult(StepRun stepBean, object? result)
{
StepRun = stepBean;
Result = result is null ? null : StepVariable.Create(result, stepBean.Generation);
}

public static StepResult Create(StepRun stepBean, object? result)
{
return new StepResult(stepBean, result);
}

public StepRun StepRun { get; }

public string StepName => StepRun.Step.Name;

/// <summary>
/// The result of the step.
/// </summary>
public StepVariable? Result { get; }
}
Loading

0 comments on commit bce08f5

Please sign in to comment.