Compilation and Runtime
This page describes the end-to-end compilation pipeline and runtime execution of BtScript flows, from source code to running instances within Orleans grains.
Pipeline Overview
BtScript flows follow a multi-stage pipeline from developer-authored .bts source to executable .NET assemblies running in distributed Orleans grains:
┌──────────┐ ┌──────────────┐ ┌───────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────┐
│ .bts │ -> │ BtScript │ -> │ C# Code │ -> │ Roslyn │ -> │ Artifact │ -> │ FlowGrain│
│ Source │ │ Compiler │ │ Generated │ │ Compiler │ │ Store │ │ Execution│
└──────────┘ └──────────────┘ └───────────┘ └──────────────┘ └───────────────┘ └──────────┘
Each stage transforms the flow definition into a lower-level representation, ultimately producing a self-contained .NET assembly that can be loaded and executed by the runtime.
Compilation Stages
Stage 1: BtScript Source
Developers write .bts files containing flow, task, or function definitions. These use Lisp/Scheme syntax with special forms tailored for reactive dataflow processing.
Example flow with task dependencies:
(flow id: pump-monitor
(require
(|btft:fft-analysis;2.1.0| as: fft-analysis)
(|acme:pump-efficiency;1.2.0| as: efficiency-calc))
(inputs
(vibration "pump_42.accelerometer")
(temperature "pump_42.motor.temp"))
(trigger on-any: vibration temperature)
(efficiency-calc rated-power: 100 flow-rate: vibration as: eff)
(emit efficiency-metric value: eff))
Key features:
- Inputs map local variable names to asset telemetry paths (EndpointId format)
- Require declarations specify external task dependencies by BTI
- Trigger clauses define when the flow executes
Stage 2: C# Code Generation
The BtScript compiler (btscript package) parses .bts source and generates C# classes implementing the IFlow, IFlowTask, or IFlowFunction interface from BeaconTower.Flows namespace (provided by btcommon).
Compiler process:
- Lexing/Parsing — Tokenize
.btssource and build abstract syntax tree - Macro Expansion — Evaluate through
stdlib.scmmacros (transform-flow,emit-csharp) - C# Emission — Generate C# source implementing the appropriate interface
Generated C# example:
using BeaconTower.Flows;
namespace GeneratedFlows;
[Flow("btf:pump-monitor;1.0.0")]
public class PumpMonitorFlow : IFlow
{
public FlowMetadata Metadata { get; } = new()
{
Id = "pump-monitor",
Version = "1.0.0",
Triggers = new FlowTriggerInfo[]
{
new(FlowTriggerKind.AnyValue, Inputs: new[] { "vibration", "temperature" }),
},
Inputs = new[]
{
new FlowInput("vibration", DtmiRef: null, ConcreteSignal: "pump_42.accelerometer", ClrType: typeof(double)),
new FlowInput("temperature", DtmiRef: null, ConcreteSignal: "pump_42.motor.temp", ClrType: typeof(double)),
},
Outputs = new FlowOutput[] { },
PersistenceMode = FlowPersistenceMode.Timer,
};
public FlowStateSchema GetStateSchema() => new()
{
RollingBuffers = { },
Gates = new List<GateSchema> { },
};
public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var vibration = state.GetInput<double>("vibration")!;
var temperature = state.GetInput<double>("temperature")!;
var eff = await ctx.CallTaskAsync<double>(
"acme:pump-efficiency;1.2.0",
new { RatedPower = 100, FlowRate = vibration }, ct);
await ctx.EmitAsync("EfficiencyMetric", eff);
return FlowResult.Ok<object?>(null);
}
}
Key implementation details:
[Flow]attribute — BTI identifier read by the Flow Service during registrationMetadata— Instance property with trigger, input, output, and persistence configurationGetStateSchema()— Declares rolling buffers and gates for the state systemExecute()— Receives bothIFlowContext(for side effects) andIFlowState(for inputs and state); runs when trigger fires
Stage 3: Roslyn Compilation
The flow service receives C# source (either from the BtScript compiler or directly from developers) and compiles it to a .NET assembly using the Roslyn compiler.
Compilation process:
public CompilationResult Compile(string sourceCode, string? assemblyName = null)
{
// Parse C# source
var syntaxTree = CSharpSyntaxTree.ParseText(sourceCode, _parseOptions);
// Create compilation with references
var compilation = CSharpCompilation.Create(
assemblyName ?? $"Flow_{Guid.NewGuid():N}",
new[] { syntaxTree },
_references, // Runtime assemblies from TRUSTED_PLATFORM_ASSEMBLIES
_compilationOptions);
// Emit to memory stream
using var ms = new MemoryStream();
var result = compilation.Emit(ms);
if (!result.Success)
{
// Return diagnostics for compilation errors
return CompilationResult.Failure(result.Diagnostics);
}
return CompilationResult.Success(ms.ToArray());
}
Referenced assemblies:
BeaconTower.Common— ProvidesIFlow,IFlowState,IFlowContext, and metadata types- .NET runtime assemblies — System.Runtime, System.Linq, etc.
Stage 4: Artifact Storage
Compiled assemblies are stored in PostgreSQL via IArtifactStore. Each artifact has a complete record with metadata and dependency information.
StoredArtifact record:
public sealed record StoredArtifact(
string Bti, // e.g., "btf:pump-monitor;1.0.0"
BtiType ArtifactType, // Flow, Task, or Function
string Name, // "pump-monitor"
string Version, // "1.0.0"
ImmutableArray<byte> AssemblyBytes, // Compiled .NET assembly
ImmutableArray<byte>? SourceBytes, // Original .bts or .cs source
ArtifactInfo Manifest, // Extracted metadata
ImmutableArray<ArtifactDependency> Dependencies, // Task/function BTIs
DateTimeOffset CreatedAt,
string? CreatedBy);
Storage operations:
StoreAssemblyAsync(bti, assemblyBytes)— Save new artifactGetAssemblyAsync(bti)— Retrieve assembly bytes by BTIListArtifactsAsync()— Query registry
Stage 5: Assembly Loading
When a FlowGrain needs to execute a flow, it fetches the assembly from the flow service and loads it into a CollectibleFlowLoadContext (an AssemblyLoadContext that supports hot-unloading).
FlowAssemblyCache implementation:
public async Task<Type?> GetFlowTypeAsync(string bti, CancellationToken ct)
{
// Check cache first
if (_cache.TryGetValue(bti, out var cached))
return cached.FlowType;
// Fetch from flow service
var artifact = await _client.GetArtifactAsync(bti, ct);
// Load into collectible ALC
var loadContext = new CollectibleFlowLoadContext(bti);
var assembly = loadContext.LoadFromBytes(artifact.AssemblyBytes);
// Find IFlow implementation
var flowType = assembly.GetTypes()
.FirstOrDefault(t => typeof(IFlow).IsAssignableFrom(t));
// Cache and return
_cache[bti] = new CacheEntry(flowType, loadContext);
return flowType;
}
CollectibleFlowLoadContext:
public sealed class CollectibleFlowLoadContext : AssemblyLoadContext
{
public CollectibleFlowLoadContext(string bti)
: base(name: $"Flow-{bti}", isCollectible: true)
{
}
}
Why collectible? — When no grains reference a flow assembly, it can be garbage collected and unloaded from memory. This enables hot-reloading of flow definitions without restarting the service.
Type identity via ALC fallback:
CollectibleFlowLoadContexthas no customLoad()override- When the flow assembly references
BeaconTower.Common, the ALC falls back to the Default ALC - Default ALC has
btcommonalready loaded (btap references it) - Result:
IFlowin flow assembly =IFlowin btap, preserving type identity
Stage 6: Flow Execution
The FlowGrain (an Orleans grain in btap) hosts and executes the flow instance.
Basic FlowGrain interaction:
public async Task Init(string flowBti, Dictionary<string, string>? inputMapping)
{
// Load flow type from assembly cache
var flowType = await _assemblyCache.GetFlowTypeAsync(flowBti);
// Extract metadata (static properties)
var metadata = FlowLoader.GetMetadata(flowType);
var stateSchema = FlowLoader.GetStateSchema(flowType);
// Create flow state with rolling buffers and gates
_stateStore.State = new FlowState(this.GetPrimaryKeyString(), stateSchema);
// Create flow instance
_flow = _assemblyCache.CreateInstance(flowType);
// Subscribe to Orleans streams for each input
await this.SubscribeToStreams<Message>(
inputMapping.Values,
StreamDirection.Out,
OnInputMessage);
// Persist config and state
await Task.WhenAll(
_configStore.WriteStateAsync(),
_stateStore.WriteStateAsync());
}
The Four Packages
Each package in the BtScript ecosystem has a distinct responsibility:
| Package | Responsibility | Key Types |
|---|---|---|
| btscript | DSL compiler — parses .bts, generates C# code | BtScriptCompiler, Lexer, Parser, stdlib.scm macros |
| flow service | Roslyn compilation, artifact registry API, PostgreSQL storage | RoslynCompiler, IArtifactStore, StoredArtifact |
| btap | FlowGrain runtime — assembly loading, flow execution, persistence | FlowGrain, FlowAssemblyCache, FlowState, FlowContext |
| btcommon | Shared interfaces and types used across packages | IFlow, IFlowState, IFlowContext, FlowMetadata |
Package boundaries:
- btscript and flow service are independent — flow service can accept
.bts,.cs, or.dllartifacts - btap depends on btcommon for flow contracts, but not on btscript
- btcommon is the contract layer — defines
IFlowinterface and execution model
FlowGrain Lifecycle
FlowGrain manages the entire lifecycle of a flow instance, from initialization to deactivation. Each phase interacts with the flow via the IFlow interface.
Init() — First-Time Setup
Called once when a flow is first deployed to an asset.
Steps:
- Load flow assembly — Fetch from flow service via
FlowAssemblyCache - Extract metadata — Read static
Metadataproperty andGetStateSchema()method - Create input mapping — Map flow input names to Orleans stream IDs (EndpointIds)
- Initialize state — Create
FlowStatewith rolling buffers and gates from schema - Create flow instance — Instantiate flow class
- Subscribe to streams — Subscribe to Orleans streams for each input
- Persist configuration — Save flow BTI, input mapping, and state to persistent storage
What's stored:
- FlowGrainConfig — BTI, input mapping, metadata, state schema, persistence mode
- FlowState — Input values, rolling buffers, gate state, key-value pairs
OnActivateAsync() — Grain Reactivation
Called by Orleans when the grain is activated (after deactivation or silo restart).
Steps:
- Check initialization — If
_configStore.RecordExists()is false, skip (not initialized) - Reload flow assembly — Assembly may have been unloaded; fetch from cache/service
- Recreate flow instance — Instantiate flow from cached type
- Resubscribe to streams — Orleans stream subscriptions don't persist; resubscribe
- Start persist timer — If
PersistenceMode == Timer, register Orleans grain timer
State restoration:
- Orleans automatically loads
_configStore.Stateand_stateStore.Statefrom storage - FlowState includes all inputs, rolling buffers, gates, and key-value pairs
OnInputMessage() — Input Arrival
Called when a subscribed Orleans stream delivers a message (new telemetry value).
Steps:
- Map stream ID → input name — Look up input name from
inputMapping - Store in state — Call
_stateStore.State.SetInput(name, value, timestamp)- This is the single source of truth for input values
- State is persisted according to
PersistenceMode
- Update gate state — For gated inputs,
state.GateInput(gateName, inputName, value, timestamp) - Track last input time — Used for
AllValuestrigger logic - Check trigger condition — Call
ShouldTrigger(inputName) - Execute flow — If triggered, call
ExecuteFlow(timestamp, inputName)
Trigger evaluation logic:
ShouldTrigger(inputName):
1. if (!AllInputsInitialized()) return false
- ALL inputs (active + passive) must have at least one value
2. if (inputName is not in TriggerInputs) return false
- Passive inputs don't trigger execution (used via `(latest x)`)
3. Evaluate trigger mode:
- AnyValue → true (any active input triggers)
- AllValues → all TriggerInputs have new values since last execution
- OnChange → value differs from previous value
- Timer → timer fired (inputName check not applicable)
Active vs. Passive inputs:
| Input Type | In Trigger? | Can Trigger? | Access Method |
|---|---|---|---|
| Active | Yes | Yes | state.GetInput<T>(name) or (latest name) |
| Passive | No | No | (latest name) only |
Example:
(inputs
(pressure "pump.pressure")
(temperature "pump.temp")
(diameter "asset.config.pipe-diameter")) ; Passive - from property
(trigger on-any: pressure temperature) ; Active inputs
; diameter is passive: doesn't trigger, but can be read via (latest diameter)
Flow waits until all 3 inputs have values, then triggers on pressure or temperature changes.
ExecuteFlow() — Trigger Fired
Called when the trigger condition is satisfied.
Steps:
- Create execution context —
new FlowContext(timestamp, ct, triggerId)FlowContextprovidesEmitAsync(),CallTaskAsync(), and execution metadata
- Execute flow logic — Call
_flow.Execute(ctx, state, ct)- Flow reads inputs via
state.GetInput<T>(name) - Uses rolling windows:
state.RollingAvg(key, value, window) - Checks gates:
if (state.GateReady(gateName)) { ... state.GateReset(gateName); } - Calls tasks:
await ctx.CallTaskAsync<T>(bti, args, ct) - Emits outputs:
await ctx.EmitAsync(outputName, value)orawait ctx.EmitAsync(outputName, EmitChannel.Alarm, value)
- Flow reads inputs via
- Process result — Extract
FlowResult<T>.Value; read side effects from context - Persist state — Call
PersistByModeAsync()based onPersistenceMode
Inside Execute() — generated code:
public async Task<FlowResult<object?>> Execute(
IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var vibration = state.GetInput<double>("vibration")!;
var pressureIn = state.GetInput<double>("pressure-in")!;
var pressureOut = state.GetInput<double>("pressure-out")!;
if (state.GateReady("gate_pressure-in_pressure-out"))
{
state.GateReset("gate_pressure-in_pressure-out");
await ctx.EmitAsync("PressureDelta", pressureOut - pressureIn);
}
var vibAvg = state.RollingAvg("vib_avg", vibration, TimeSpan.FromMinutes(5));
if (vibration > 0.8)
await ctx.EmitAsync("VibrationAlarm", vibration);
return FlowResult.Ok<object?>(null);
}
OnDeactivateAsync() — Grain Deactivation
Called by Orleans when the grain is being deactivated (idle timeout, silo shutdown).
Steps:
- Best-effort persist — If
PersistenceMode != None, callPersistIfDirtyAsync()- Check
_stateStore.State.IsDirty - Mark persisted:
_stateStore.State.MarkPersisted() - Write:
await _stateStore.WriteStateAsync()
- Check
- Stop persist timer — Dispose Orleans timer if running
- Release assembly reference — Call
_assemblyCache.ReleaseReference(flowBti)- If refcount → 0, assembly can be unloaded (collectible ALC)
Gate Mechanism
Gates synchronize multiple inputs so they're processed together. Used when you need correlated values (e.g., inlet and outlet pressure to compute delta).
DSL Syntax
(gate zip: pressure-in pressure-out) ; Wait for both before processing
How It Works
When inputs are gated:
- FlowGrain tracks gate inputs — calls
state.GateInput(gateName, inputName, value, timestamp)for each gated input - In Execute(), check
state.GateReady(gateName)before using gated inputs - After processing, call
state.GateReset(gateName)to clear for next batch
Gate state structure:
internal class Gate
{
private readonly HashSet<string> _requiredInputs; // e.g., {"pressure-in", "pressure-out"}
private readonly Dictionary<string, (object?, DateTimeOffset)> _receivedInputs;
public void SetInput(string inputName, object value, DateTimeOffset timestamp)
{
_receivedInputs[inputName] = (value, timestamp);
}
public bool IsReady()
{
// True when ALL required inputs have been received
return _requiredInputs.All(input => _receivedInputs.ContainsKey(input));
}
public void Reset()
{
// Clear for next synchronized batch
_receivedInputs.Clear();
}
}
Example flow with gate:
(flow id: pressure-delta
(inputs
(pressure-in "pump.inlet.pressure")
(pressure-out "pump.outlet.pressure"))
(gate zip: pressure-in pressure-out) ; Synchronize these two
(trigger on-any: pressure-in pressure-out)
(let ((delta (- pressure-out pressure-in)))
(emit pressure-delta value: delta)))
Generated Execute() with gate check:
Gate input tracking is handled by the FlowGrain directly. The generated flow code checks gate readiness and resets after processing:
public async Task<FlowResult<object?>> Execute(IFlowContext ctx, IFlowState state, CancellationToken ct = default)
{
var pressureIn = state.GetInput<double>("pressure-in")!;
var pressureOut = state.GetInput<double>("pressure-out")!;
// gate :zip - synchronize all inputs
if (!state.GateReady("gate_pressure-in_pressure-out"))
{
return FlowResult.Ok<object?>(null);
}
state.GateReset("gate_pressure-in_pressure-out");
var delta = (pressureOut - pressureIn);
await ctx.EmitAsync("PressureDelta", delta);
return FlowResult.Ok<object?>(null);
}
Key insight: Gates ensure temporal correlation. Without gates, you might compute pressureOut - pressureIn using values from different time periods.
Core Interfaces
These interfaces from btcommon (BeaconTower.Flows namespace) define the contract between the runtime and compiled flows.
IFlow
public interface IFlow
{
FlowMetadata Metadata { get; }
FlowStateSchema GetStateSchema();
Task<FlowResult<object?>> Execute(IFlowContext ctx, IFlowState state, CancellationToken ct = default);
}
Usage:
- Metadata — FlowGrain reads this during initialization for trigger, input, and persistence config
- GetStateSchema() — Declares rolling buffers and gates for the state system
- Execute() — FlowGrain calls this when trigger condition is satisfied, passing context and state
IFlowState
public interface IFlowState
{
string StateId { get; }
// Input storage (used by FlowGrain)
void SetInput(string inputName, object? value, DateTimeOffset timestamp);
T? GetInput<T>(string inputName);
bool HasInput(string inputName);
// Key-value storage
T? GetValue<T>(string key);
T GetValue<T>(string key, T defaultValue);
void SetValue<T>(string key, T value);
// Latest value access (for passive inputs and (latest x) form)
T? Latest<T>(string inputName);
T Latest<T>(string inputName, T defaultValue);
// Rolling window aggregations
double RollingAvg(string key, double value, TimeSpan window);
double RollingMin(string key, double value, TimeSpan window);
double RollingMax(string key, double value, TimeSpan window);
double RollingSum(string key, double value, TimeSpan window);
// Gate operations
void GateInput(string gateName, string inputName, object value, DateTimeOffset timestamp);
bool GateReady(string gateName);
void GateReset(string gateName);
// Persistence tracking
bool IsDirty { get; }
DateTimeOffset LastPersisted { get; }
void MarkPersisted();
void MarkDirty();
}
Implementation: FlowState in btap (Orleans serializable with [GenerateSerializer])
Persistence: State is persisted to Orleans IPersistentState<FlowState> according to PersistenceMode
IFlowContext
public interface IFlowContext
{
// Side effects
Task EmitAsync(string outputName, object? value);
Task EmitAsync(string outputName, EmitChannel channel, object? value);
Task<T> CallTaskAsync<T>(string taskBti, object args, CancellationToken ct = default);
Task CallTaskAsync(string taskBti, object args, CancellationToken ct = default);
// Execution metadata
DateTimeOffset Now { get; }
CancellationToken CancellationToken { get; }
}
Implementation: FlowContext in btap
Usage in generated code:
ctx.EmitAsync(output, value)— Emit output (2-arg, default channel)ctx.EmitAsync(output, EmitChannel.Alarm, value)— Emit output with channel routingctx.CallTaskAsync<T>(bti, args)— Call external task (requires task resolution)
Supporting Types
| Type | Description |
|---|---|
FlowMetadata | Flow metadata: Id, Version, Inputs, Outputs, TriggerType, TriggerInputs, PersistenceMode |
FlowStateSchema | State schema: RollingBuffers, Gates |
FlowInput | Input definition: Name, DtmiRef, ConcreteSignal, ClrType |
FlowOutput | Output definition: Name, ClrType |
FlowResult<T> | Execution result: Value, SideEffects |
FlowSideEffect | Side effect record: Type, Name, Data |
FlowTriggerType | Enum: AnyValue, AllValues, OnInput, Timer |
FlowPersistenceMode | Enum: Synchronous, Async, Timer, OnDeactivate, None |
GateSchema | Gate definition: Name, InputNames |
IFlowTask<TInput, TOutput> | Interface for stateful task implementations |
IFlowFunction<TInput, TOutput> | Interface for stateless function implementations |
FlowTaskAttribute | Attribute: [FlowTask("btft:name;version")] |
FlowFunctionAttribute | Attribute: [FlowFunction("btff:name;version")] |
Persistence Modes
FlowGrain supports multiple persistence strategies for FlowState, configurable via the persist: keyword in the flow definition.
| Mode | When Persisted |
|---|---|
| Synchronous | After each Execute(), blocks until complete |
| Async | After each Execute(), fire-and-forget |
| Timer (default) | Every N seconds (default 5s) via Orleans timer |
| OnDeactivate | Only on grain deactivation |
| None | Never |
See Persistence & State for detailed trade-offs, decision guide, and BtScript examples.
State serialization:
[GenerateSerializer]
public class FlowState : IFlowState
{
[Id(0)] private readonly Dictionary<string, object?> _values = new();
[Id(1)] private readonly Dictionary<string, PersistedInput> _inputs = new();
[Id(2)] private readonly Dictionary<string, RollingBuffer> _rollingBuffers = new();
[Id(3)] private readonly Dictionary<string, Gate> _gates = new();
[Id(4)] public DateTimeOffset LastPersisted { get; set; }
[Id(5)] public string StateId { get; }
[NonSerialized] private bool _isDirty; // Transient
}
Next Steps
For more details on:
- BtScript syntax — See Language Reference
- Local development — See Local Flow Development
- Flow deployment — See Flow Service API
- Task and function definitions — See Language Reference