Skip to main content

Compilation and Runtime

This page describes the end-to-end compilation pipeline and runtime execution of BtScript flows, from source code to running instances within Orleans grains.

Pipeline Overview

BtScript flows follow a multi-stage pipeline from developer-authored .bts source to executable .NET assemblies running in distributed Orleans grains:

┌──────────┐    ┌──────────────┐    ┌───────────┐    ┌──────────────┐    ┌───────────────┐    ┌──────────┐
│ .bts │ -> │ BtScript │ -> │ C# Code │ -> │ Roslyn │ -> │ Artifact │ -> │ FlowGrain│
│ Source │ │ Compiler │ │ Generated │ │ Compiler │ │ Store │ │ Execution│
└──────────┘ └──────────────┘ └───────────┘ └──────────────┘ └───────────────┘ └──────────┘

Each stage transforms the flow definition into a lower-level representation, ultimately producing a self-contained .NET assembly that can be loaded and executed by the runtime.

Compilation Stages

Stage 1: BtScript Source

Developers write .bts files containing flow, task, or function definitions. These use Lisp/Scheme syntax with special forms tailored for reactive dataflow processing.

Example flow with task dependencies:

(flow id: pump-monitor

(require
(|btft:fft-analysis;2.1.0| as: fft-analysis)
(|acme:pump-efficiency;1.2.0| as: efficiency-calc))

(inputs
(vibration "pump_42.accelerometer")
(temperature "pump_42.motor.temp"))

(trigger on-any: vibration temperature)

(efficiency-calc rated-power: 100 flow-rate: vibration as: eff)
(emit efficiency-metric value: eff))

Key features:

  • Inputs map local variable names to asset telemetry paths (EndpointId format)
  • Require declarations specify external task dependencies by BTI
  • Trigger clauses define when the flow executes

Stage 2: C# Code Generation

The BtScript compiler (btscript package) parses .bts source and generates C# classes implementing the IFlow, IFlowTask, or IFlowFunction interface from BeaconTower.Flows namespace (provided by btcommon).

Compiler process:

  1. Lexing/Parsing — Tokenize .bts source and build abstract syntax tree
  2. Macro Expansion — Evaluate through stdlib.scm macros (transform-flow, emit-csharp)
  3. C# Emission — Generate C# source implementing the appropriate interface

Generated C# example:

using BeaconTower.Flows;

namespace GeneratedFlows;

[Flow("btf:pump-monitor;1.0.0")]
public class PumpMonitorFlow : IFlow
{
public FlowMetadata Metadata { get; } = new()
{
Id = "pump-monitor",
Version = "1.0.0",
Triggers = new FlowTriggerInfo[]
{
new(FlowTriggerKind.AnyValue, Inputs: new[] { "vibration", "temperature" }),
},
Inputs = new[]
{
new FlowInput("vibration", DtmiRef: null, ConcreteSignal: "pump_42.accelerometer", ClrType: typeof(double)),
new FlowInput("temperature", DtmiRef: null, ConcreteSignal: "pump_42.motor.temp", ClrType: typeof(double)),
},
Outputs = new FlowOutput[] { },
PersistenceMode = FlowPersistenceMode.Timer,
};

public FlowStateSchema GetStateSchema() => new()
{
RollingBuffers = { },
Gates = new List<GateSchema> { },
};

public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var vibration = state.GetInput<double>("vibration")!;
var temperature = state.GetInput<double>("temperature")!;

var eff = await ctx.CallTaskAsync<double>(
"acme:pump-efficiency;1.2.0",
new { RatedPower = 100, FlowRate = vibration }, ct);

await ctx.EmitAsync("EfficiencyMetric", eff);
return FlowResult.Ok<object?>(null);
}
}

Key implementation details:

  • [Flow] attribute — BTI identifier read by the Flow Service during registration
  • Metadata — Instance property with trigger, input, output, and persistence configuration
  • GetStateSchema() — Declares rolling buffers and gates for the state system
  • Execute() — Receives both IFlowContext (for side effects) and IFlowState (for inputs and state); runs when trigger fires

Stage 3: Roslyn Compilation

The flow service receives C# source (either from the BtScript compiler or directly from developers) and compiles it to a .NET assembly using the Roslyn compiler.

Compilation process:

public CompilationResult Compile(string sourceCode, string? assemblyName = null)
{
// Parse C# source
var syntaxTree = CSharpSyntaxTree.ParseText(sourceCode, _parseOptions);

// Create compilation with references
var compilation = CSharpCompilation.Create(
assemblyName ?? $"Flow_{Guid.NewGuid():N}",
new[] { syntaxTree },
_references, // Runtime assemblies from TRUSTED_PLATFORM_ASSEMBLIES
_compilationOptions);

// Emit to memory stream
using var ms = new MemoryStream();
var result = compilation.Emit(ms);

if (!result.Success)
{
// Return diagnostics for compilation errors
return CompilationResult.Failure(result.Diagnostics);
}

return CompilationResult.Success(ms.ToArray());
}

Referenced assemblies:

  • BeaconTower.Common — Provides IFlow, IFlowState, IFlowContext, and metadata types
  • .NET runtime assemblies — System.Runtime, System.Linq, etc.

Stage 4: Artifact Storage

Compiled assemblies are stored in PostgreSQL via IArtifactStore. Each artifact has a complete record with metadata and dependency information.

StoredArtifact record:

public sealed record StoredArtifact(
string Bti, // e.g., "btf:pump-monitor;1.0.0"
BtiType ArtifactType, // Flow, Task, or Function
string Name, // "pump-monitor"
string Version, // "1.0.0"
ImmutableArray<byte> AssemblyBytes, // Compiled .NET assembly
ImmutableArray<byte>? SourceBytes, // Original .bts or .cs source
ArtifactInfo Manifest, // Extracted metadata
ImmutableArray<ArtifactDependency> Dependencies, // Task/function BTIs
DateTimeOffset CreatedAt,
string? CreatedBy);

Storage operations:

  • StoreAssemblyAsync(bti, assemblyBytes) — Save new artifact
  • GetAssemblyAsync(bti) — Retrieve assembly bytes by BTI
  • ListArtifactsAsync() — Query registry

Stage 5: Assembly Loading

When a FlowGrain needs to execute a flow, it fetches the assembly from the flow service and loads it into a CollectibleFlowLoadContext (an AssemblyLoadContext that supports hot-unloading).

FlowAssemblyCache implementation:

public async Task<Type?> GetFlowTypeAsync(string bti, CancellationToken ct)
{
// Check cache first
if (_cache.TryGetValue(bti, out var cached))
return cached.FlowType;

// Fetch from flow service
var artifact = await _client.GetArtifactAsync(bti, ct);

// Load into collectible ALC
var loadContext = new CollectibleFlowLoadContext(bti);
var assembly = loadContext.LoadFromBytes(artifact.AssemblyBytes);

// Find IFlow implementation
var flowType = assembly.GetTypes()
.FirstOrDefault(t => typeof(IFlow).IsAssignableFrom(t));

// Cache and return
_cache[bti] = new CacheEntry(flowType, loadContext);
return flowType;
}

CollectibleFlowLoadContext:

public sealed class CollectibleFlowLoadContext : AssemblyLoadContext
{
public CollectibleFlowLoadContext(string bti)
: base(name: $"Flow-{bti}", isCollectible: true)
{
}
}

Why collectible? — When no grains reference a flow assembly, it can be garbage collected and unloaded from memory. This enables hot-reloading of flow definitions without restarting the service.

Type identity via ALC fallback:

  • CollectibleFlowLoadContext has no custom Load() override
  • When the flow assembly references BeaconTower.Common, the ALC falls back to the Default ALC
  • Default ALC has btcommon already loaded (btap references it)
  • Result: IFlow in flow assembly = IFlow in btap, preserving type identity

Stage 6: Flow Execution

The FlowGrain (an Orleans grain in btap) hosts and executes the flow instance.

Basic FlowGrain interaction:

public async Task Init(string flowBti, Dictionary<string, string>? inputMapping)
{
// Load flow type from assembly cache
var flowType = await _assemblyCache.GetFlowTypeAsync(flowBti);

// Extract metadata (static properties)
var metadata = FlowLoader.GetMetadata(flowType);
var stateSchema = FlowLoader.GetStateSchema(flowType);

// Create flow state with rolling buffers and gates
_stateStore.State = new FlowState(this.GetPrimaryKeyString(), stateSchema);

// Create flow instance
_flow = _assemblyCache.CreateInstance(flowType);

// Subscribe to Orleans streams for each input
await this.SubscribeToStreams<Message>(
inputMapping.Values,
StreamDirection.Out,
OnInputMessage);

// Persist config and state
await Task.WhenAll(
_configStore.WriteStateAsync(),
_stateStore.WriteStateAsync());
}

The Four Packages

Each package in the BtScript ecosystem has a distinct responsibility:

PackageResponsibilityKey Types
btscriptDSL compiler — parses .bts, generates C# codeBtScriptCompiler, Lexer, Parser, stdlib.scm macros
flow serviceRoslyn compilation, artifact registry API, PostgreSQL storageRoslynCompiler, IArtifactStore, StoredArtifact
btapFlowGrain runtime — assembly loading, flow execution, persistenceFlowGrain, FlowAssemblyCache, FlowState, FlowContext
btcommonShared interfaces and types used across packagesIFlow, IFlowState, IFlowContext, FlowMetadata

Package boundaries:

  • btscript and flow service are independent — flow service can accept .bts, .cs, or .dll artifacts
  • btap depends on btcommon for flow contracts, but not on btscript
  • btcommon is the contract layer — defines IFlow interface and execution model

FlowGrain Lifecycle

FlowGrain manages the entire lifecycle of a flow instance, from initialization to deactivation. Each phase interacts with the flow via the IFlow interface.

Init() — First-Time Setup

Called once when a flow is first deployed to an asset.

Steps:

  1. Load flow assembly — Fetch from flow service via FlowAssemblyCache
  2. Extract metadata — Read static Metadata property and GetStateSchema() method
  3. Create input mapping — Map flow input names to Orleans stream IDs (EndpointIds)
  4. Initialize state — Create FlowState with rolling buffers and gates from schema
  5. Create flow instance — Instantiate flow class
  6. Subscribe to streams — Subscribe to Orleans streams for each input
  7. Persist configuration — Save flow BTI, input mapping, and state to persistent storage

What's stored:

  • FlowGrainConfig — BTI, input mapping, metadata, state schema, persistence mode
  • FlowState — Input values, rolling buffers, gate state, key-value pairs

OnActivateAsync() — Grain Reactivation

Called by Orleans when the grain is activated (after deactivation or silo restart).

Steps:

  1. Check initialization — If _configStore.RecordExists() is false, skip (not initialized)
  2. Reload flow assembly — Assembly may have been unloaded; fetch from cache/service
  3. Recreate flow instance — Instantiate flow from cached type
  4. Resubscribe to streams — Orleans stream subscriptions don't persist; resubscribe
  5. Start persist timer — If PersistenceMode == Timer, register Orleans grain timer

State restoration:

  • Orleans automatically loads _configStore.State and _stateStore.State from storage
  • FlowState includes all inputs, rolling buffers, gates, and key-value pairs

OnInputMessage() — Input Arrival

Called when a subscribed Orleans stream delivers a message (new telemetry value).

Steps:

  1. Map stream ID → input name — Look up input name from inputMapping
  2. Store in state — Call _stateStore.State.SetInput(name, value, timestamp)
    • This is the single source of truth for input values
    • State is persisted according to PersistenceMode
  3. Update gate state — For gated inputs, state.GateInput(gateName, inputName, value, timestamp)
  4. Track last input time — Used for AllValues trigger logic
  5. Check trigger condition — Call ShouldTrigger(inputName)
  6. Execute flow — If triggered, call ExecuteFlow(timestamp, inputName)

Trigger evaluation logic:

ShouldTrigger(inputName):
1. if (!AllInputsInitialized()) return false
- ALL inputs (active + passive) must have at least one value
2. if (inputName is not in TriggerInputs) return false
- Passive inputs don't trigger execution (used via `(latest x)`)
3. Evaluate trigger mode:
- AnyValue → true (any active input triggers)
- AllValues → all TriggerInputs have new values since last execution
- OnChange → value differs from previous value
- Timer → timer fired (inputName check not applicable)

Active vs. Passive inputs:

Input TypeIn Trigger?Can Trigger?Access Method
ActiveYesYesstate.GetInput<T>(name) or (latest name)
PassiveNoNo(latest name) only

Example:

(inputs
(pressure "pump.pressure")
(temperature "pump.temp")
(diameter "asset.config.pipe-diameter")) ; Passive - from property

(trigger on-any: pressure temperature) ; Active inputs

; diameter is passive: doesn't trigger, but can be read via (latest diameter)

Flow waits until all 3 inputs have values, then triggers on pressure or temperature changes.

ExecuteFlow() — Trigger Fired

Called when the trigger condition is satisfied.

Steps:

  1. Create execution contextnew FlowContext(timestamp, ct, triggerId)
    • FlowContext provides EmitAsync(), CallTaskAsync(), and execution metadata
  2. Execute flow logic — Call _flow.Execute(ctx, state, ct)
    • Flow reads inputs via state.GetInput<T>(name)
    • Uses rolling windows: state.RollingAvg(key, value, window)
    • Checks gates: if (state.GateReady(gateName)) { ... state.GateReset(gateName); }
    • Calls tasks: await ctx.CallTaskAsync<T>(bti, args, ct)
    • Emits outputs: await ctx.EmitAsync(outputName, value) or await ctx.EmitAsync(outputName, EmitChannel.Alarm, value)
  3. Process result — Extract FlowResult<T>.Value; read side effects from context
  4. Persist state — Call PersistByModeAsync() based on PersistenceMode

Inside Execute() — generated code:

public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var vibration = state.GetInput<double>("vibration")!;
var pressureIn = state.GetInput<double>("pressure-in")!;
var pressureOut = state.GetInput<double>("pressure-out")!;

if (state.GateReady("gate_pressure-in_pressure-out"))
{
state.GateReset("gate_pressure-in_pressure-out");
await ctx.EmitAsync("PressureDelta", pressureOut - pressureIn);
}

var vibAvg = state.RollingAvg("vib_avg", vibration, TimeSpan.FromMinutes(5));
if (vibration > 0.8)
await ctx.EmitAsync("VibrationAlarm", vibration);

return FlowResult.Ok<object?>(null);
}

OnDeactivateAsync() — Grain Deactivation

Called by Orleans when the grain is being deactivated (idle timeout, silo shutdown).

Steps:

  1. Best-effort persist — If PersistenceMode != None, call PersistIfDirtyAsync()
    • Check _stateStore.State.IsDirty
    • Mark persisted: _stateStore.State.MarkPersisted()
    • Write: await _stateStore.WriteStateAsync()
  2. Stop persist timer — Dispose Orleans timer if running
  3. Release assembly reference — Call _assemblyCache.ReleaseReference(flowBti)
    • If refcount → 0, assembly can be unloaded (collectible ALC)

Gate Mechanism

Gates synchronize multiple inputs so they're processed together. Used when you need correlated values (e.g., inlet and outlet pressure to compute delta).

DSL Syntax

(gate zip: pressure-in pressure-out)  ; Wait for both before processing

How It Works

When inputs are gated:

  1. FlowGrain tracks gate inputs — calls state.GateInput(gateName, inputName, value, timestamp) for each gated input
  2. In Execute(), check state.GateReady(gateName) before using gated inputs
  3. After processing, call state.GateReset(gateName) to clear for next batch

Gate state structure:

internal class Gate
{
private readonly HashSet<string> _requiredInputs; // e.g., {"pressure-in", "pressure-out"}
private readonly Dictionary<string, (object?, DateTimeOffset)> _receivedInputs;

public void SetInput(string inputName, object value, DateTimeOffset timestamp)
{
_receivedInputs[inputName] = (value, timestamp);
}

public bool IsReady()
{
// True when ALL required inputs have been received
return _requiredInputs.All(input => _receivedInputs.ContainsKey(input));
}

public void Reset()
{
// Clear for next synchronized batch
_receivedInputs.Clear();
}
}

Example flow with gate:

(flow id: pressure-delta

(inputs
(pressure-in "pump.inlet.pressure")
(pressure-out "pump.outlet.pressure"))

(gate zip: pressure-in pressure-out) ; Synchronize these two

(trigger on-any: pressure-in pressure-out)

(let ((delta (- pressure-out pressure-in)))
(emit pressure-delta value: delta)))

Generated Execute() with gate check:

Gate input tracking is handled by the FlowGrain directly. The generated flow code checks gate readiness and resets after processing:

public async Task<FlowResult<object?>> Execute(IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var pressureIn = state.GetInput<double>("pressure-in")!;
var pressureOut = state.GetInput<double>("pressure-out")!;
// gate :zip - synchronize all inputs
if (!state.GateReady("gate_pressure-in_pressure-out"))
{
return FlowResult.Ok<object?>(null);
}
state.GateReset("gate_pressure-in_pressure-out");
var delta = (pressureOut - pressureIn);
await ctx.EmitAsync("PressureDelta", delta);
return FlowResult.Ok<object?>(null);
}

Key insight: Gates ensure temporal correlation. Without gates, you might compute pressureOut - pressureIn using values from different time periods.

Core Interfaces

These interfaces from btcommon (BeaconTower.Flows namespace) define the contract between the runtime and compiled flows.

IFlow

public interface IFlow
{
FlowMetadata Metadata { get; }
FlowStateSchema GetStateSchema();
Task<FlowResult<object?>> Execute(IFlowContext ctx, IFlowState state, CancellationToken ct = default);
}

Usage:

  • Metadata — FlowGrain reads this during initialization for trigger, input, and persistence config
  • GetStateSchema() — Declares rolling buffers and gates for the state system
  • Execute() — FlowGrain calls this when trigger condition is satisfied, passing context and state

IFlowState

public interface IFlowState
{
string StateId { get; }

// Input storage (used by FlowGrain)
void SetInput(string inputName, object? value, DateTimeOffset timestamp);
T? GetInput<T>(string inputName);
bool HasInput(string inputName);

// Key-value storage
T? GetValue<T>(string key);
T GetValue<T>(string key, T defaultValue);
void SetValue<T>(string key, T value);

// Latest value access (for passive inputs and (latest x) form)
T? Latest<T>(string inputName);
T Latest<T>(string inputName, T defaultValue);

// Rolling window aggregations
double RollingAvg(string key, double value, TimeSpan window);
double RollingMin(string key, double value, TimeSpan window);
double RollingMax(string key, double value, TimeSpan window);
double RollingSum(string key, double value, TimeSpan window);

// Gate operations
void GateInput(string gateName, string inputName, object value, DateTimeOffset timestamp);
bool GateReady(string gateName);
void GateReset(string gateName);

// Persistence tracking
bool IsDirty { get; }
DateTimeOffset LastPersisted { get; }
void MarkPersisted();
void MarkDirty();
}

Implementation: FlowState in btap (Orleans serializable with [GenerateSerializer])

Persistence: State is persisted to Orleans IPersistentState<FlowState> according to PersistenceMode

IFlowContext

public interface IFlowContext
{
// Side effects
Task EmitAsync(string outputName, object? value);
Task EmitAsync(string outputName, EmitChannel channel, object? value);
Task<T> CallTaskAsync<T>(string taskBti, object args, CancellationToken ct = default);
Task CallTaskAsync(string taskBti, object args, CancellationToken ct = default);

// Execution metadata
DateTimeOffset Now { get; }
CancellationToken CancellationToken { get; }
}

Implementation: FlowContext in btap

Usage in generated code:

  • ctx.EmitAsync(output, value) — Emit output (2-arg, default channel)
  • ctx.EmitAsync(output, EmitChannel.Alarm, value) — Emit output with channel routing
  • ctx.CallTaskAsync<T>(bti, args) — Call external task (requires task resolution)

Supporting Types

TypeDescription
FlowMetadataFlow metadata: Id, Version, Inputs, Outputs, TriggerType, TriggerInputs, PersistenceMode
FlowStateSchemaState schema: RollingBuffers, Gates
FlowInputInput definition: Name, DtmiRef, ConcreteSignal, ClrType
FlowOutputOutput definition: Name, ClrType
FlowResult<T>Execution result: Value, SideEffects
FlowSideEffectSide effect record: Type, Name, Data
FlowTriggerTypeEnum: AnyValue, AllValues, OnInput, Timer
FlowPersistenceModeEnum: Synchronous, Async, Timer, OnDeactivate, None
GateSchemaGate definition: Name, InputNames
IFlowTask<TInput, TOutput>Interface for stateful task implementations
IFlowFunction<TInput, TOutput>Interface for stateless function implementations
FlowTaskAttributeAttribute: [FlowTask("btft:name;version")]
FlowFunctionAttributeAttribute: [FlowFunction("btff:name;version")]

Persistence Modes

FlowGrain supports multiple persistence strategies for FlowState, configurable via the persist: keyword in the flow definition.

ModeWhen Persisted
SynchronousAfter each Execute(), blocks until complete
AsyncAfter each Execute(), fire-and-forget
Timer (default)Every N seconds (default 5s) via Orleans timer
OnDeactivateOnly on grain deactivation
NoneNever

See Persistence & State for detailed trade-offs, decision guide, and BtScript examples.

State serialization:

[GenerateSerializer]
public class FlowState : IFlowState
{
[Id(0)] private readonly Dictionary<string, object?> _values = new();
[Id(1)] private readonly Dictionary<string, PersistedInput> _inputs = new();
[Id(2)] private readonly Dictionary<string, RollingBuffer> _rollingBuffers = new();
[Id(3)] private readonly Dictionary<string, Gate> _gates = new();
[Id(4)] public DateTimeOffset LastPersisted { get; set; }
[Id(5)] public string StateId { get; }

[NonSerialized] private bool _isDirty; // Transient
}

Next Steps

For more details on: