Skip to main content

Telemetry Ingestion

This page explains how telemetry data enters the Beacon Tower platform — from the message format and required headers to how data is routed through providers, parsed by the ingress processor, and delivered to asset grains.

Ingestion Pipeline

Telemetry flows through a multi-stage pipeline before reaching flows and storage:

Data Source (device, simulator, CLI)

NATS JetStream: telemetry.{providerId}::{clientId}

IngressProcessor (NATS consumer)
→ Reads headers (providerId, providerClientId, messageType, parser)
→ Selects parser based on messageType + parser header
→ Splits multi-signal payload into atoms

Orleans Stream: {providerId}::{clientId}.{signal}-out

TelemetryGrain (one per asset signal)
→ Applies binding transforms
→ Forwards to subscribed flows

Orleans Stream: {assetId}.{signal}-out

FlowGrain (subscribed via input mapping)

NATS Message Format

All telemetry enters the system as messages on the NATS TELEMETRY JetStream stream. Each message has three parts: a subject, headers, and a JSON payload.

Subject

telemetry.{providerId}::{clientId}
SegmentDescriptionExample
providerIdIdentifies the data source systemflowctl, iothub, jemac
clientIdIdentifies the specific device or asset within that providerpump-042, D43D39819853

Examples:

telemetry.flowctl::pump-042
telemetry.iothub::temp-sensor-001
telemetry.jemac::D43D39819853

The :: separator is part of the subject — it connects provider and client into a compound identifier used throughout Beacon Tower.

Headers

HeaderValueRequiredDescription
messageTypeSee Message TypesYesDetermines how the payload is processed
providerIdstringYesProvider identifier (must match subject)
providerClientIdstringYesClient/device identifier (must match subject)
enqueuedTimeISO 8601 timestampNoWhen the data was produced. If omitted, the system uses receive time.
parserjemac, jemac-v2, iqr, jmesNoSelects a non-default parser. Omit for standard JSON telemetry.

Message Types

The messageType header determines how the IngressProcessor routes and parses the message:

messageTypeDefault ParserDescription
ProviderClientTelemetryDefaultTelemetryParserTelemetry data — sensor readings, metrics, measurements
ProviderClientPropertiesChangedDefaultPropertyParserProperty change notifications — device configuration, reported state

When a parser header is also present, it overrides the default parser for that message type.

JSON Payload

The standard telemetry payload (default parser) is a flat JSON object where each key is a signal name and each value is the signal's current reading:

{"motor_temp": 85.5}

Multi-signal messages are supported — the IngressProcessor splits them into individual atoms:

{"motor_temp": 85.5, "inlet_pressure": 101.3, "flow_rate": 42.7}

This single message produces three separate telemetry atoms, one per signal.

Reserved Keys

The following keys have special meaning and are not treated as signal values:

KeyPurpose
timestampParsed as the telemetry timestamp (ISO 8601). If present, takes precedence over the enqueuedTime header.
componentIdIdentifies a sub-component within the asset

Timestamp Resolution

The system resolves timestamps in this priority order:

  1. JSON payload timestamp field — If the payload contains a "timestamp" key, it is parsed and used as the telemetry timestamp
  2. enqueuedTime header — Falls back to the NATS message header
  3. Receive time — If neither is present, uses current UTC time
{"motor_temp": 85.5, "timestamp": "2026-02-09T10:30:00Z"}

This means the timestamp field in the JSON payload is consumed by the parser and not forwarded as a telemetry signal.

Complete Example

A complete NATS telemetry message for a pump sending temperature and pressure:

Subject:  telemetry.flowctl::pump-042
Headers: messageType=ProviderClientTelemetry
providerId=flowctl
providerClientId=pump-042
enqueuedTime=2026-02-09T10:30:00Z
Payload: {"motor_temp": 85.5, "inlet_pressure": 101.3}

Using the flowctl CLI:

flowctl nats pub "telemetry.flowctl::pump-042" \
'{"motor_temp": 85.5, "inlet_pressure": 101.3}' \
--header "messageType=ProviderClientTelemetry" \
--header "providerId=flowctl" \
--header "providerClientId=pump-042"

Property Change Messages

Property change messages use messageType=ProviderClientPropertiesChanged and support two payload formats:

Simple flat format (same as telemetry):

{"firmware_version": "2.1.0", "reporting_interval": 60}

IoT Hub format (Azure IoT Hub device twin reported properties):

{
"properties": {
"reported": {
"firmware_version": "2.1.0",
"$metadata": {
"firmware_version": {
"$lastUpdated": "2026-02-09T10:30:00Z"
}
}
}
}
}

In the IoT Hub format, timestamps are extracted from $metadata[key].$lastUpdated.

Providers and Provider Clients

A provider is a data source — an external system that pushes telemetry into Beacon Tower. Each provider has a unique identifier string.

A provider client is a specific device or connection within a provider. The combination {providerId}::{clientId} uniquely identifies a data source in the system.

TermWhat it representsExample
ProviderExternal system or integrationiothub (Azure IoT Hub), jemac (JEMAC energy dongles)
Provider ClientIndividual device within a providerpump-042, D43D39819853
Compound IDCombined identifieriothub::pump-042, jemac::D43D39819853

Built-in Provider: flowctl

For local development, flowctl uses a built-in provider called flowctl. When you run flowctl instance send or flowctl feed, data is published with providerId=flowctl by default.

# These commands use providerId "flowctl" automatically
flowctl instance send dev-1 temperature 85.5
flowctl feed data.jsonl

To simulate a different provider:

flowctl feed data.jsonl --provider-id acme-iot

Provider-to-Asset Binding

When an asset is initialized with a model, flowctl automatically creates input bindings that subscribe the asset to the correct Orleans streams. For example, initializing asset pump-042 with a model that has a motor_temp signal creates an endpoint subscription to flowctl::pump-042.motor_temp.

See Assets and Bindings for the full binding model.

Signal Names and Models

Signal names in the JSON payload must correspond to telemetry definitions in the asset's DTDL model. For example, given this DTDL model:

{
"@type": "Interface",
"@id": "dtmi:myorg:Pump;1",
"contents": [
{ "@type": "Telemetry", "name": "motor_temp", "schema": "double" },
{ "@type": "Telemetry", "name": "inlet_pressure", "schema": "double" }
]
}

Valid payloads use those exact signal names:

{"motor_temp": 85.5}
{"inlet_pressure": 101.3}
{"motor_temp": 85.5, "inlet_pressure": 101.3}

A signal name like temperature would not match and would not be routed to the asset's telemetry grains.

Note: flowctl also supports an abbreviated YAML format for models during local development. See Local Flow Development for details.

Tip: Use flowctl asset get <assetId> to see the asset's registered endpoints and verify signal names.

Value Types

Signal values support any JSON type:

JSON TypeExampleNotes
Number85.5, 42, -10Most common for telemetry
Booleantrue, falseUseful for binary states
String"running", "idle"Status codes, enums
Object{"lat": 59.33, "lon": 18.07}Complex/structured values
Array[1.0, 2.0, 3.0]Batch or vector values

Parsers

The IngressProcessor selects a parser based on the messageType and optional parser header. When no parser header is present, the default parser for that message type is used.

Default Telemetry Parser

Used when messageType=ProviderClientTelemetry and no parser header is set.

Expects a flat JSON object. Each property (except reserved keys) becomes a separate telemetry atom:

{"motor_temp": 85.5, "inlet_pressure": 101.3, "timestamp": "2026-02-09T10:30:00Z"}

Produces two atoms (motor_temp and inlet_pressure), both with the specified timestamp.

Default Property Parser

Used when messageType=ProviderClientPropertiesChanged and no parser header is set.

Supports both the simple flat format and the Azure IoT Hub properties.reported format with $metadata timestamps.

JEMAC Parser (parser=jemac)

Parses OBIS-coded energy meter data from JEMAC energy dongles. Timestamp is read from the ts field:

{
"id": "D43D39819853",
"rssi": -71,
"ts": "2026-02-09T07:01:12Z",
"data": [
"/ELL5\\253926427_A",
"0-0:1.0.0(260209080110W)",
"1-0:1.8.0(00089649.718*kWh)",
"1-0:32.7.0(223.4*V)",
"!F8AA"
]
}

OBIS codes are normalized to signal names: 1-0:1.8.0 becomes v1_8_0.

JEMAC v2 Parser (parser=jemac-v2)

Parses JEMAC data with an embedded custom timestamp format (YYMMDDHHMMSS{W|Z}), used by newer firmware versions.

IQR Parser (parser=iqr)

Parses structured metric arrays. Each metric has its own name and timestamp (Unix milliseconds):

{
"metrics": [
{"name": "signal_name", "value": 123.45, "timestamp": 1642250400123}
]
}

JMESPath Parser (parser=jmes)

Uses a JMESPath expression to transform the payload before delegating to the default telemetry parser. Useful for extracting telemetry from arbitrarily nested JSON structures.

JSONL Feed Format (flowctl)

Note: The JSONL feed format described in this section is a flowctl CLI convenience format for local development and testing. In production, devices publish NATS messages directly using the message format described above.

When feeding telemetry from files via flowctl feed, records use JSONL (one JSON object per line). Two record formats are supported:

The simplest format — specify the asset, signal, and value:

{"assetId": "pump-042", "signal": "motor_temp", "value": 85.5}
{"assetId": "pump-042", "signal": "motor_temp", "value": 82.1, "delay": "100ms"}
{"assetId": "pump-042", "signal": "inlet_pressure", "value": 101.3, "timestamp": "2026-02-09T10:00:00Z"}
FieldTypeRequiredDescription
assetIdstringYesAsset identifier (becomes the providerClientId)
signalstringYesSignal name (becomes the JSON payload key)
valueanyYesSignal value (number, string, boolean, object, array)
timestampstringNoISO 8601 timestamp for enqueuedTime header
delaystringNoPacing delay before sending: "100ms", "1s", "1m" (flowctl only)

The flowctl feed command converts each record into a NATS message:

  • Subject: telemetry.{providerId}::{assetId} (default providerId is flowctl)
  • Payload: {"{signal}": value}
  • Headers: messageType, providerId, providerClientId, and optionally enqueuedTime

Provider Form

For cross-provider testing or simulating third-party data sources, use the provider form with a compound endpoint ID:

{"endpointId": "acme-iot::pump-042.motor_temp", "value": 85.5}
{"endpointId": "jemac::D43D39819853.v1_8_0", "value": 89649.718}
FieldTypeRequiredDescription
endpointIdstringYesFormat: {providerId}::{clientId}.{signalName}
valueanyYesSignal value
timestampstringNoISO 8601 timestamp
delaystringNoPacing delay before sending (flowctl only)

Validation

A record must be either asset form or provider form, not both. The flowctl feed --validate flag checks records without sending:

flowctl feed data.jsonl --validate

Validation checks:

  1. Each line is valid JSON
  2. Record has either assetId + signal + value (asset form) or endpointId + value (provider form)
  3. Provider form endpointId matches the pattern {providerId}::{clientId}.{signal}

Next Steps