Skip to main content

Writing Flows in C#

While BtScript is the recommended way to write flows, you can also write flows, tasks, and functions directly in C#. This gives you full access to the .NET ecosystem, IDE tooling, and type safety.

C# flows implement the same interfaces that the BtScript compiler targets. The Flow Service compiles your C# source using Roslyn and loads the resulting assembly identically to BtScript-compiled artifacts.

When to Use C#

Use CaseRecommended
Simple transformations and aggregationsBtScript
Complex business logicC#
Integration with .NET librariesC#
Prototyping and explorationBtScript
Team prefers Lisp/Scheme syntaxBtScript
Team prefers C# / .NETC#

Both approaches produce the same runtime artifact (a .NET assembly) and run identically in the FlowGrain.

Quick Start

Scaffold a C# flow:

flowctl new flow temperature-monitor --cs

This creates flows/temperature-monitor.cs with a working scaffold. Upload it:

flowctl artifact upload flows/temperature-monitor.cs

The Flow Service compiles the C# to a .NET assembly and stores it in the artifact registry. From here, instance creation, telemetry, and monitoring work exactly the same as BtScript flows.

Writing a Flow

A flow implements IFlow from the BeaconTower.Flows namespace. Here's a complete example:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BeaconTower.Flows;

namespace GeneratedFlows;

[Flow("btf:temperature-monitor;1.0.0")]
public class TemperatureMonitorFlow : IFlow
{
// --- Metadata (read by FlowGrain during initialization) ---

public FlowMetadata Metadata { get; } = new()
{
Id = "temperature-monitor",
Version = "1.0.0",
Triggers = new FlowTriggerInfo[]
{
new(FlowTriggerKind.AnyValue, Inputs: new[] { "celsius" }),
},
Inputs = new[]
{
new FlowInput("celsius", DtmiRef: null, ConcreteSignal: "sensor.celsius", ClrType: typeof(double)),
new FlowInput("threshold", DtmiRef: null, ConcreteSignal: "config.threshold", ClrType: typeof(double)),
},
Outputs = new FlowOutput[] { },
PersistenceMode = FlowPersistenceMode.Timer,
};

// --- State schema (rolling buffers, gates) ---

public FlowStateSchema GetStateSchema() => new()
{
RollingBuffers = { },
Gates = new List<GateSchema> { },
};

// --- Execution logic ---

public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var celsius = state.GetInput<double>("celsius")!;
var threshold = state.GetInput<double>("threshold")!;

var fahrenheit = (celsius * 9.0 / 5.0) + 32;
var isAlert = celsius > threshold;

await ctx.EmitAsync("Fahrenheit", fahrenheit);
await ctx.EmitAsync("Alert", isAlert);

if (isAlert)
await ctx.EmitAsync("Status", "HIGH TEMPERATURE ALERT");
else
await ctx.EmitAsync("Status", "Normal");

return FlowResult.Ok<object?>(null);
}
}

Key Elements

[Flow("btf:name;version")] attribute — Declares the BTI. The Flow Service reads this to register the artifact.

FlowMetadata — Static metadata read by the FlowGrain during initialization:

  • Id / Version — Artifact identity
  • Triggers — When the flow executes (AnyValue, AllValues, Timer)
  • Inputs — Input bindings with signal paths and CLR types
  • Outputs — Output declarations
  • PersistenceMode — How often state is persisted (Synchronous, Async, Timer, OnDeactivate, None)

FlowStateSchema — Declares rolling buffers and gates for the state system.

Execute() — Called when the trigger condition is met. Read inputs via state.GetInput<T>(), emit outputs via ctx.EmitAsync().

Input Signals

The ConcreteSignal in FlowInput maps to the asset's telemetry signal path. The format is {assetSignalName} — the asset ID is bound at instance creation time.

// This input binds to the "temperature" signal on whichever asset
// the instance is attached to
new FlowInput("temp", DtmiRef: null, ConcreteSignal: "sensor.temperature", ClrType: typeof(double))

Trigger Modes

// Trigger when ANY listed input receives a new value
new FlowTriggerInfo(FlowTriggerKind.AnyValue, Inputs: new[] { "celsius", "pressure" })

// Trigger when ALL listed inputs have new values (synchronized)
new FlowTriggerInfo(FlowTriggerKind.AllValues, Inputs: new[] { "pressure_in", "pressure_out" })

Emitting Output

// Emit a value (default channel — published to NATS events stream)
await ctx.EmitAsync("output-name", value);

// Emit with explicit channel routing
await ctx.EmitAsync("CriticalTemp", EmitChannel.Alarm, temp);
await ctx.EmitAsync("HighTemp", EmitChannel.Notification, temp);
await ctx.EmitAsync("TempNormal", EmitChannel.Event, temp);

// Emit multiple outputs
await ctx.EmitAsync("Fahrenheit", fahrenheit);
await ctx.EmitAsync("Alert", isAlert);
await ctx.EmitAsync("Status", statusMessage);

Available channels: EmitChannel.Event, EmitChannel.Alarm, EmitChannel.Notification.

Returning a Value

Use FlowResult.Ok<object?>() to return an explicit value from the flow:

// Flow with explicit return value
return FlowResult.Ok<object?>(computedValue);

// Emit-only flow (no return value)
return FlowResult.Ok<object?>(null);

Using State

Access persistent key-value state that survives restarts:

// Read state
var count = state.GetValue<int>("execution-count", defaultValue: 0);

// Write state (persisted according to PersistenceMode)
state.SetValue("execution-count", count + 1);
state.SetValue("last-value", celsius);

// Read the latest value of any input (even passive ones)
var lastThreshold = state.Latest<double>("threshold");

Rolling Aggregations

Use built-in rolling window functions for time-windowed computations:

// 5-minute rolling average
var avgTemp = state.RollingAvg("temp-avg", celsius, TimeSpan.FromMinutes(5));

// Rolling min/max/sum
var minTemp = state.RollingMin("temp-min", celsius, TimeSpan.FromMinutes(5));
var maxTemp = state.RollingMax("temp-max", celsius, TimeSpan.FromMinutes(5));
var sumFlow = state.RollingSum("flow-sum", flowRate, TimeSpan.FromHours(1));

Gates (Synchronized Inputs)

Gates wait for multiple inputs to arrive before processing them together:

public FlowStateSchema GetStateSchema() => new()
{
Gates = new List<GateSchema>
{
new("pressure-gate", new[] { "pressure_in", "pressure_out" }),
},
};

public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
if (state.GateReady("pressure-gate"))
{
state.GateReset("pressure-gate");

var pIn = state.GetInput<double>("pressure_in")!;
var pOut = state.GetInput<double>("pressure_out")!;
await ctx.EmitAsync("PressureDelta", pOut - pIn);
}

return FlowResult.Ok<object?>(null);
}

Writing a Task

Tasks are reusable, stateful computation units invoked by flows via ctx.CallTaskAsync().

using BeaconTower.Flows;

namespace GeneratedFlows;

[FlowTask("btft:rolling-std;1.0.0")]
public class RollingStdTask : IFlowTask<RollingStdInput, double>
{
public Task<double> ExecuteAsync(RollingStdInput input, ITaskState state, CancellationToken ct)
{
// Maintain a buffer in task state
var buffer = state.GetValue<List<double>>("buffer") ?? new List<double>();
buffer.Add(input.Value);

// Trim to window size
while (buffer.Count > input.WindowSize)
buffer.RemoveAt(0);

state.SetValue("buffer", buffer);

// Compute standard deviation
var mean = buffer.Average();
var variance = buffer.Sum(x => (x - mean) * (x - mean)) / buffer.Count;
return Task.FromResult(Math.Sqrt(variance));
}
}

public record RollingStdInput(double Value, int WindowSize = 100);

Key differences from flows:

  • Tasks implement IFlowTask<TInput, TOutput>
  • Tasks are invoked by flows, not triggered by telemetry
  • Tasks maintain their own state via ITaskState
  • BTI prefix is btft: instead of btf:

Calling a Task from a Flow

// In a flow's Execute() method:
var stdDev = await ctx.CallTaskAsync<double>(
"btft:rolling-std;1.0.0",
new RollingStdInput(celsius, WindowSize: 50),
ct);

await ctx.EmitAsync("StdDev", stdDev);

Writing a Function

Functions are stateless, pure computations. They take input and return output with no side effects.

using BeaconTower.Flows;

namespace GeneratedFlows;

[FlowFunction("btff:clamp;1.0.0")]
public class ClampFunction : IFlowFunction<ClampInput, double>
{
public double Execute(ClampInput input)
{
return Math.Clamp(input.Value, input.Min, input.Max);
}
}

public record ClampInput(double Value, double Min, double Max);

Key differences from tasks:

  • Functions implement IFlowFunction<TInput, TOutput>
  • Functions are stateless — no ITaskState parameter
  • Functions are synchronous (no Task return type)
  • BTI prefix is btff: instead of btft:

Upload and Deploy

C# artifacts are uploaded the same way as BtScript:

# Upload a flow
flowctl artifact upload flows/temperature-monitor.cs

# Upload a task
flowctl artifact upload tasks/rolling-std.cs

# Upload a function
flowctl artifact upload functions/clamp.cs

# List all artifacts
flowctl artifact list

# Create an instance (same as BtScript flows)
flowctl instance create monitor-1 \
--bti "btf:temperature-monitor;1.0.0" \
--asset pump-042

Available Interfaces

InterfaceArtifact TypeBTI PrefixStateInvocation
IFlowFlowbtf:Full (inputs, rolling buffers, gates, key-value)Triggered by telemetry
IFlowTask<TIn, TOut>Taskbtft:Key-value via ITaskStateCalled by flows
IFlowFunction<TIn, TOut>Functionbtff:None (stateless)Called by flows

IFlowContext Reference

Available in Execute() via the ctx parameter:

// Emit output (default channel)
Task EmitAsync(string outputName, object? value);

// Emit output with channel routing
Task EmitAsync(string outputName, EmitChannel channel, object? value);

// Call reusable artifacts
Task<T> CallTaskAsync<T>(string taskBti, object args, CancellationToken ct);

// Metadata
DateTimeOffset Now { get; }
CancellationToken CancellationToken { get; }

IFlowState Reference

Available in Execute():

// Input values
T? GetInput<T>(string inputName);
T? Latest<T>(string inputName);
bool HasInput(string inputName);

// Key-value state
T? GetValue<T>(string key);
void SetValue<T>(string key, T value);

// Rolling aggregations
double RollingAvg(string key, double value, TimeSpan window);
double RollingMin(string key, double value, TimeSpan window);
double RollingMax(string key, double value, TimeSpan window);
double RollingSum(string key, double value, TimeSpan window);

// Gates
bool GateReady(string gateName);
void GateReset(string gateName);

Next Steps