Telemetry Ingestion
This page explains how telemetry data enters the Beacon Tower platform — from the message format and required headers to how data is routed through providers, parsed by the ingress processor, and delivered to asset grains.
Ingestion Pipeline
Telemetry flows through a multi-stage pipeline before reaching flows and storage:
Data Source (device, simulator, CLI)
↓
NATS JetStream: telemetry.{providerId}::{clientId}
↓
IngressProcessor (NATS consumer)
→ Reads headers (providerId, providerClientId, messageType, parser)
→ Selects parser based on messageType + parser header
→ Splits multi-signal payload into atoms
↓
Orleans Stream: {providerId}::{clientId}.{signal}-out
↓
TelemetryGrain (one per asset signal)
→ Applies binding transforms
→ Forwards to subscribed flows
↓
Orleans Stream: {assetId}.{signal}-out
↓
FlowGrain (subscribed via input mapping)
NATS Message Format
All telemetry enters the system as messages on the NATS TELEMETRY JetStream stream. Each message has three parts: a subject, headers, and a JSON payload.
Subject
telemetry.{providerId}::{clientId}
| Segment | Description | Example |
|---|---|---|
providerId | Identifies the data source system | flowctl, iothub, jemac |
clientId | Identifies the specific device or asset within that provider | pump-042, D43D39819853 |
Examples:
telemetry.flowctl::pump-042
telemetry.iothub::temp-sensor-001
telemetry.jemac::D43D39819853
The :: separator is part of the subject — it connects provider and client into a compound identifier used throughout Beacon Tower.
Headers
| Header | Value | Required | Description |
|---|---|---|---|
messageType | See Message Types | Yes | Determines how the payload is processed |
providerId | string | Yes | Provider identifier (must match subject) |
providerClientId | string | Yes | Client/device identifier (must match subject) |
enqueuedTime | ISO 8601 timestamp | No | When the data was produced. If omitted, the system uses receive time. |
parser | jemac, jemac-v2, iqr, jmes | No | Selects a non-default parser. Omit for standard JSON telemetry. |
Message Types
The messageType header determines how the IngressProcessor routes and parses the message:
| messageType | Default Parser | Description |
|---|---|---|
ProviderClientTelemetry | DefaultTelemetryParser | Telemetry data — sensor readings, metrics, measurements |
ProviderClientPropertiesChanged | DefaultPropertyParser | Property change notifications — device configuration, reported state |
When a parser header is also present, it overrides the default parser for that message type.
JSON Payload
The standard telemetry payload (default parser) is a flat JSON object where each key is a signal name and each value is the signal's current reading:
{"motor_temp": 85.5}
Multi-signal messages are supported — the IngressProcessor splits them into individual atoms:
{"motor_temp": 85.5, "inlet_pressure": 101.3, "flow_rate": 42.7}
This single message produces three separate telemetry atoms, one per signal.
Reserved Keys
The following keys have special meaning and are not treated as signal values:
| Key | Purpose |
|---|---|
timestamp | Parsed as the telemetry timestamp (ISO 8601). If present, takes precedence over the enqueuedTime header. |
componentId | Identifies a sub-component within the asset |
Timestamp Resolution
The system resolves timestamps in this priority order:
- JSON payload
timestampfield — If the payload contains a"timestamp"key, it is parsed and used as the telemetry timestamp enqueuedTimeheader — Falls back to the NATS message header- Receive time — If neither is present, uses current UTC time
{"motor_temp": 85.5, "timestamp": "2026-02-09T10:30:00Z"}
This means the timestamp field in the JSON payload is consumed by the parser and not forwarded as a telemetry signal.
Complete Example
A complete NATS telemetry message for a pump sending temperature and pressure:
Subject: telemetry.flowctl::pump-042
Headers: messageType=ProviderClientTelemetry
providerId=flowctl
providerClientId=pump-042
enqueuedTime=2026-02-09T10:30:00Z
Payload: {"motor_temp": 85.5, "inlet_pressure": 101.3}
Using the flowctl CLI:
flowctl nats pub "telemetry.flowctl::pump-042" \
'{"motor_temp": 85.5, "inlet_pressure": 101.3}' \
--header "messageType=ProviderClientTelemetry" \
--header "providerId=flowctl" \
--header "providerClientId=pump-042"
Property Change Messages
Property change messages use messageType=ProviderClientPropertiesChanged and support two payload formats:
Simple flat format (same as telemetry):
{"firmware_version": "2.1.0", "reporting_interval": 60}
IoT Hub format (Azure IoT Hub device twin reported properties):
{
"properties": {
"reported": {
"firmware_version": "2.1.0",
"$metadata": {
"firmware_version": {
"$lastUpdated": "2026-02-09T10:30:00Z"
}
}
}
}
}
In the IoT Hub format, timestamps are extracted from $metadata[key].$lastUpdated.
Providers and Provider Clients
A provider is a data source — an external system that pushes telemetry into Beacon Tower. Each provider has a unique identifier string.
A provider client is a specific device or connection within a provider. The combination {providerId}::{clientId} uniquely identifies a data source in the system.
| Term | What it represents | Example |
|---|---|---|
| Provider | External system or integration | iothub (Azure IoT Hub), jemac (JEMAC energy dongles) |
| Provider Client | Individual device within a provider | pump-042, D43D39819853 |
| Compound ID | Combined identifier | iothub::pump-042, jemac::D43D39819853 |
Built-in Provider: flowctl
For local development, flowctl uses a built-in provider called flowctl. When you run flowctl instance send or flowctl feed, data is published with providerId=flowctl by default.
# These commands use providerId "flowctl" automatically
flowctl instance send dev-1 temperature 85.5
flowctl feed data.jsonl
To simulate a different provider:
flowctl feed data.jsonl --provider-id acme-iot
Provider-to-Asset Binding
When an asset is initialized with a model, flowctl automatically creates input bindings that subscribe the asset to the correct Orleans streams. For example, initializing asset pump-042 with a model that has a motor_temp signal creates an endpoint subscription to flowctl::pump-042.motor_temp.
See Assets and Bindings for the full binding model.
Signal Names and Models
Signal names in the JSON payload must correspond to telemetry definitions in the asset's DTDL model. For example, given this DTDL model:
{
"@type": "Interface",
"@id": "dtmi:myorg:Pump;1",
"contents": [
{ "@type": "Telemetry", "name": "motor_temp", "schema": "double" },
{ "@type": "Telemetry", "name": "inlet_pressure", "schema": "double" }
]
}
Valid payloads use those exact signal names:
{"motor_temp": 85.5}
{"inlet_pressure": 101.3}
{"motor_temp": 85.5, "inlet_pressure": 101.3}
A signal name like temperature would not match and would not be routed to the asset's telemetry grains.
Note:
flowctlalso supports an abbreviated YAML format for models during local development. See Local Flow Development for details.
Tip: Use
flowctl asset get <assetId>to see the asset's registered endpoints and verify signal names.
Value Types
Signal values support any JSON type:
| JSON Type | Example | Notes |
|---|---|---|
| Number | 85.5, 42, -10 | Most common for telemetry |
| Boolean | true, false | Useful for binary states |
| String | "running", "idle" | Status codes, enums |
| Object | {"lat": 59.33, "lon": 18.07} | Complex/structured values |
| Array | [1.0, 2.0, 3.0] | Batch or vector values |
Parsers
The IngressProcessor selects a parser based on the messageType and optional parser header. When no parser header is present, the default parser for that message type is used.
Default Telemetry Parser
Used when messageType=ProviderClientTelemetry and no parser header is set.
Expects a flat JSON object. Each property (except reserved keys) becomes a separate telemetry atom:
{"motor_temp": 85.5, "inlet_pressure": 101.3, "timestamp": "2026-02-09T10:30:00Z"}
Produces two atoms (motor_temp and inlet_pressure), both with the specified timestamp.
Default Property Parser
Used when messageType=ProviderClientPropertiesChanged and no parser header is set.
Supports both the simple flat format and the Azure IoT Hub properties.reported format with $metadata timestamps.
JEMAC Parser (parser=jemac)
Parses OBIS-coded energy meter data from JEMAC energy dongles. Timestamp is read from the ts field:
{
"id": "D43D39819853",
"rssi": -71,
"ts": "2026-02-09T07:01:12Z",
"data": [
"/ELL5\\253926427_A",
"0-0:1.0.0(260209080110W)",
"1-0:1.8.0(00089649.718*kWh)",
"1-0:32.7.0(223.4*V)",
"!F8AA"
]
}
OBIS codes are normalized to signal names: 1-0:1.8.0 becomes v1_8_0.
JEMAC v2 Parser (parser=jemac-v2)
Parses JEMAC data with an embedded custom timestamp format (YYMMDDHHMMSS{W|Z}), used by newer firmware versions.
IQR Parser (parser=iqr)
Parses structured metric arrays. Each metric has its own name and timestamp (Unix milliseconds):
{
"metrics": [
{"name": "signal_name", "value": 123.45, "timestamp": 1642250400123}
]
}
JMESPath Parser (parser=jmes)
Uses a JMESPath expression to transform the payload before delegating to the default telemetry parser. Useful for extracting telemetry from arbitrarily nested JSON structures.
JSONL Feed Format (flowctl)
Note: The JSONL feed format described in this section is a
flowctlCLI convenience format for local development and testing. In production, devices publish NATS messages directly using the message format described above.
When feeding telemetry from files via flowctl feed, records use JSONL (one JSON object per line). Two record formats are supported:
Asset Form (Recommended)
The simplest format — specify the asset, signal, and value:
{"assetId": "pump-042", "signal": "motor_temp", "value": 85.5}
{"assetId": "pump-042", "signal": "motor_temp", "value": 82.1, "delay": "100ms"}
{"assetId": "pump-042", "signal": "inlet_pressure", "value": 101.3, "timestamp": "2026-02-09T10:00:00Z"}
| Field | Type | Required | Description |
|---|---|---|---|
assetId | string | Yes | Asset identifier (becomes the providerClientId) |
signal | string | Yes | Signal name (becomes the JSON payload key) |
value | any | Yes | Signal value (number, string, boolean, object, array) |
timestamp | string | No | ISO 8601 timestamp for enqueuedTime header |
delay | string | No | Pacing delay before sending: "100ms", "1s", "1m" (flowctl only) |
The flowctl feed command converts each record into a NATS message:
- Subject:
telemetry.{providerId}::{assetId}(default providerId isflowctl) - Payload:
{"{signal}": value} - Headers:
messageType,providerId,providerClientId, and optionallyenqueuedTime
Provider Form
For cross-provider testing or simulating third-party data sources, use the provider form with a compound endpoint ID:
{"endpointId": "acme-iot::pump-042.motor_temp", "value": 85.5}
{"endpointId": "jemac::D43D39819853.v1_8_0", "value": 89649.718}
| Field | Type | Required | Description |
|---|---|---|---|
endpointId | string | Yes | Format: {providerId}::{clientId}.{signalName} |
value | any | Yes | Signal value |
timestamp | string | No | ISO 8601 timestamp |
delay | string | No | Pacing delay before sending (flowctl only) |
Validation
A record must be either asset form or provider form, not both. The flowctl feed --validate flag checks records without sending:
flowctl feed data.jsonl --validate
Validation checks:
- Each line is valid JSON
- Record has either
assetId+signal+value(asset form) orendpointId+value(provider form) - Provider form
endpointIdmatches the pattern{providerId}::{clientId}.{signal}
Next Steps
- Assets and Bindings — How assets connect to providers via bindings
- Models and DTDL — Define signal schemas using DTDL
- Telemetry and Timeseries — Querying and streaming stored telemetry
- Local Flow Development — Hands-on guide to feeding data locally
- Flows — How flows process telemetry data