Writing Flows in C#
While BtScript is the recommended way to write flows, you can also write flows, tasks, and functions directly in C#. This gives you full access to the .NET ecosystem, IDE tooling, and type safety.
C# flows implement the same interfaces that the BtScript compiler targets. The Flow Service compiles your C# source using Roslyn and loads the resulting assembly identically to BtScript-compiled artifacts.
When to Use C#
| Use Case | Recommended |
|---|---|
| Simple transformations and aggregations | BtScript |
| Complex business logic | C# |
| Integration with .NET libraries | C# |
| Prototyping and exploration | BtScript |
| Team prefers Lisp/Scheme syntax | BtScript |
| Team prefers C# / .NET | C# |
Both approaches produce the same runtime artifact (a .NET assembly) and run identically in the FlowGrain.
Quick Start
Scaffold a C# flow:
flowctl new flow temperature-monitor --cs
This creates flows/temperature-monitor.cs with a working scaffold. Upload it:
flowctl artifact upload flows/temperature-monitor.cs
The Flow Service compiles the C# to a .NET assembly and stores it in the artifact registry. From here, instance creation, telemetry, and monitoring work exactly the same as BtScript flows.
Writing a Flow
A flow implements IFlow from the BeaconTower.Flows namespace. Here's a complete example:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BeaconTower.Flows;
namespace GeneratedFlows;
[Flow("btf:temperature-monitor;1.0.0")]
public class TemperatureMonitorFlow : IFlow
{
// --- Metadata (read by FlowGrain during initialization) ---
public FlowMetadata Metadata { get; } = new()
{
Id = "temperature-monitor",
Version = "1.0.0",
Triggers = new FlowTriggerInfo[]
{
new(FlowTriggerKind.AnyValue, Inputs: new[] { "celsius" }),
},
Inputs = new[]
{
new FlowInput("celsius", DtmiRef: null, ConcreteSignal: "sensor.celsius", ClrType: typeof(double)),
new FlowInput("threshold", DtmiRef: null, ConcreteSignal: "config.threshold", ClrType: typeof(double)),
},
Outputs = new FlowOutput[] { },
PersistenceMode = FlowPersistenceMode.Timer,
};
// --- State schema (rolling buffers, gates) ---
public FlowStateSchema GetStateSchema() => new()
{
RollingBuffers = { },
Gates = new List<GateSchema> { },
};
// --- Execution logic ---
public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var celsius = state.GetInput<double>("celsius")!;
var threshold = state.GetInput<double>("threshold")!;
var fahrenheit = (celsius * 9.0 / 5.0) + 32;
var isAlert = celsius > threshold;
await ctx.EmitAsync("Fahrenheit", fahrenheit);
await ctx.EmitAsync("Alert", isAlert);
if (isAlert)
await ctx.EmitAsync("Status", "HIGH TEMPERATURE ALERT");
else
await ctx.EmitAsync("Status", "Normal");
return FlowResult.Ok<object?>(null);
}
}
Key Elements
[Flow("btf:name;version")] attribute — Declares the BTI. The Flow Service reads this to register the artifact.
FlowMetadata — Static metadata read by the FlowGrain during initialization:
Id/Version— Artifact identityTriggers— When the flow executes (AnyValue,AllValues,Timer)Inputs— Input bindings with signal paths and CLR typesOutputs— Output declarationsPersistenceMode— How often state is persisted (Synchronous,Async,Timer,OnDeactivate,None)
FlowStateSchema — Declares rolling buffers and gates for the state system.
Execute() — Called when the trigger condition is met. Read inputs via state.GetInput<T>(), emit outputs via ctx.EmitAsync().
Input Signals
The ConcreteSignal in FlowInput maps to the asset's telemetry signal path. The format is {assetSignalName} — the asset ID is bound at instance creation time.
// This input binds to the "temperature" signal on whichever asset
// the instance is attached to
new FlowInput("temp", DtmiRef: null, ConcreteSignal: "sensor.temperature", ClrType: typeof(double))
Trigger Modes
// Trigger when ANY listed input receives a new value
new FlowTriggerInfo(FlowTriggerKind.AnyValue, Inputs: new[] { "celsius", "pressure" })
// Trigger when ALL listed inputs have new values (synchronized)
new FlowTriggerInfo(FlowTriggerKind.AllValues, Inputs: new[] { "pressure_in", "pressure_out" })
Emitting Output
// Emit a value (default channel — published to NATS events stream)
await ctx.EmitAsync("output-name", value);
// Emit with explicit channel routing
await ctx.EmitAsync("CriticalTemp", EmitChannel.Alarm, temp);
await ctx.EmitAsync("HighTemp", EmitChannel.Notification, temp);
await ctx.EmitAsync("TempNormal", EmitChannel.Event, temp);
// Emit multiple outputs
await ctx.EmitAsync("Fahrenheit", fahrenheit);
await ctx.EmitAsync("Alert", isAlert);
await ctx.EmitAsync("Status", statusMessage);
Available channels: EmitChannel.Event, EmitChannel.Alarm, EmitChannel.Notification.
Returning a Value
Use FlowResult.Ok<object?>() to return an explicit value from the flow:
// Flow with explicit return value
return FlowResult.Ok<object?>(computedValue);
// Emit-only flow (no return value)
return FlowResult.Ok<object?>(null);
Using State
Access persistent key-value state that survives restarts:
// Read state
var count = state.GetValue<int>("execution-count", defaultValue: 0);
// Write state (persisted according to PersistenceMode)
state.SetValue("execution-count", count + 1);
state.SetValue("last-value", celsius);
// Read the latest value of any input (even passive ones)
var lastThreshold = state.Latest<double>("threshold");
Rolling Aggregations
Use built-in rolling window functions for time-windowed computations:
// 5-minute rolling average
var avgTemp = state.RollingAvg("temp-avg", celsius, TimeSpan.FromMinutes(5));
// Rolling min/max/sum
var minTemp = state.RollingMin("temp-min", celsius, TimeSpan.FromMinutes(5));
var maxTemp = state.RollingMax("temp-max", celsius, TimeSpan.FromMinutes(5));
var sumFlow = state.RollingSum("flow-sum", flowRate, TimeSpan.FromHours(1));
Gates (Synchronized Inputs)
Gates wait for multiple inputs to arrive before processing them together:
public FlowStateSchema GetStateSchema() => new()
{
Gates = new List<GateSchema>
{
new("pressure-gate", new[] { "pressure_in", "pressure_out" }),
},
};
public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
if (state.GateReady("pressure-gate"))
{
state.GateReset("pressure-gate");
var pIn = state.GetInput<double>("pressure_in")!;
var pOut = state.GetInput<double>("pressure_out")!;
await ctx.EmitAsync("PressureDelta", pOut - pIn);
}
return FlowResult.Ok<object?>(null);
}
Writing a Task
Tasks are reusable, stateful computation units invoked by flows via ctx.CallTaskAsync().
using BeaconTower.Flows;
namespace GeneratedFlows;
[FlowTask("btft:rolling-std;1.0.0")]
public class RollingStdTask : IFlowTask<RollingStdInput, double>
{
public Task<double> ExecuteAsync(RollingStdInput input, ITaskState state, CancellationToken ct)
{
// Maintain a buffer in task state
var buffer = state.GetValue<List<double>>("buffer") ?? new List<double>();
buffer.Add(input.Value);
// Trim to window size
while (buffer.Count > input.WindowSize)
buffer.RemoveAt(0);
state.SetValue("buffer", buffer);
// Compute standard deviation
var mean = buffer.Average();
var variance = buffer.Sum(x => (x - mean) * (x - mean)) / buffer.Count;
return Task.FromResult(Math.Sqrt(variance));
}
}
public record RollingStdInput(double Value, int WindowSize = 100);
Key differences from flows:
- Tasks implement
IFlowTask<TInput, TOutput> - Tasks are invoked by flows, not triggered by telemetry
- Tasks maintain their own state via
ITaskState - BTI prefix is
btft:instead ofbtf:
Calling a Task from a Flow
// In a flow's Execute() method:
var stdDev = await ctx.CallTaskAsync<double>(
"btft:rolling-std;1.0.0",
new RollingStdInput(celsius, WindowSize: 50),
ct);
await ctx.EmitAsync("StdDev", stdDev);
Writing a Function
Functions are stateless, pure computations. They take input and return output with no side effects.
using BeaconTower.Flows;
namespace GeneratedFlows;
[FlowFunction("btff:clamp;1.0.0")]
public class ClampFunction : IFlowFunction<ClampInput, double>
{
public double Execute(ClampInput input)
{
return Math.Clamp(input.Value, input.Min, input.Max);
}
}
public record ClampInput(double Value, double Min, double Max);
Key differences from tasks:
- Functions implement
IFlowFunction<TInput, TOutput> - Functions are stateless — no
ITaskStateparameter - Functions are synchronous (no
Taskreturn type) - BTI prefix is
btff:instead ofbtft:
Upload and Deploy
C# artifacts are uploaded the same way as BtScript:
# Upload a flow
flowctl artifact upload flows/temperature-monitor.cs
# Upload a task
flowctl artifact upload tasks/rolling-std.cs
# Upload a function
flowctl artifact upload functions/clamp.cs
# List all artifacts
flowctl artifact list
# Create an instance (same as BtScript flows)
flowctl instance create monitor-1 \
--bti "btf:temperature-monitor;1.0.0" \
--asset pump-042
Available Interfaces
| Interface | Artifact Type | BTI Prefix | State | Invocation |
|---|---|---|---|---|
IFlow | Flow | btf: | Full (inputs, rolling buffers, gates, key-value) | Triggered by telemetry |
IFlowTask<TIn, TOut> | Task | btft: | Key-value via ITaskState | Called by flows |
IFlowFunction<TIn, TOut> | Function | btff: | None (stateless) | Called by flows |
IFlowContext Reference
Available in Execute() via the ctx parameter:
// Emit output (default channel)
Task EmitAsync(string outputName, object? value);
// Emit output with channel routing
Task EmitAsync(string outputName, EmitChannel channel, object? value);
// Call reusable artifacts
Task<T> CallTaskAsync<T>(string taskBti, object args, CancellationToken ct);
// Metadata
DateTimeOffset Now { get; }
CancellationToken CancellationToken { get; }
IFlowState Reference
Available in Execute():
// Input values
T? GetInput<T>(string inputName);
T? Latest<T>(string inputName);
bool HasInput(string inputName);
// Key-value state
T? GetValue<T>(string key);
void SetValue<T>(string key, T value);
// Rolling aggregations
double RollingAvg(string key, double value, TimeSpan window);
double RollingMin(string key, double value, TimeSpan window);
double RollingMax(string key, double value, TimeSpan window);
double RollingSum(string key, double value, TimeSpan window);
// Gates
bool GateReady(string gateName);
void GateReset(string gateName);
Next Steps
- BtScript Overview — If you prefer a DSL approach over C#
- Compilation & Runtime — How C# source becomes a running flow
- Persistence & State — State management strategies
- Local Flow Development — Set up your dev environment