OpexFlow Documentation
Industrial data middleware: the OpexMX Connector — ingest from any protocol, transform in flight with JavaScript or the native DSL, deliver to any sink, all from a single static Rust binary with an embedded UI.
Welcome to the OpexFlow docs. OpexFlow is the product layer over the open-source OpexMX Connector. New here? Start with Quick start. If you only read one section, make it the Transform DSL — it's the fastest path through your data.
CanonicalMachineData struct — no JSON marshalling, no interpreter VM — so it's the lowest-latency path for filters and field math. You rarely write either by hand: the AI transform generator emits both.
Installation
The connector ships as a single static binary (musl) with the React UI embedded. No runtime, no host dependencies.
Docker (full stack)
The repo's docker-compose.yml brings up Postgres (TimescaleDB), ClickHouse, NATS, and the connector together:
git clone https://github.com/Opex-Technologies-Pte-Ltd/opex-connector.git
cd opex-connector
cp config.example.yaml config.yaml # edit devices + push_endpoint
docker compose up -d # UI + API on :3080
Build from source
cargo build --release
# default features: modbus,opcua,nats,http,mqtt,clickhouse,postgres,transform,static-ui
./target/release/opexmx-connector config.yaml
Feature flags are additive — build only the adapters you ship to the edge to keep the binary small. The QuickJS transform layer is the transform feature; the DSL needs no extra feature.
Quick start
Put this in config.yaml: a connector identity, one Modbus source, and a default DSL transform.
connector:
name: "shop-floor-pilot"
push_endpoint: "https://api.opex.mx/ingest"
hmac_secret: "\${OPEXMX_HMAC_SECRET}"
health_check_port: 3000
adapters:
modbus:
enabled: true
devices:
- id: "fanuc-01"
host: "192.168.1.10"
port: 502
registers: [ ... ]
transform:
enabled: true
default:
backend: "dsl"
script_inline: |
when metadata.temp > 80 || metadata.vib > 5.0
set metadata.alert = metadata.temp > 80 || metadata.vib > 5.0'
export OPEXMX_HMAC_SECRET=change-me
./target/release/opexmx-connector config.yaml
Open the canvas UI at http://localhost:3000. Every polled record now runs through the DSL before fanning out to your sinks.
Core concepts
OpexFlow is built around four primitives. Everything else is plumbing.
| Primitive | Role |
|---|---|
| Source adapter | Ingests events from a protocol — Modbus poll, OPC-UA subscription, HTTP REST poll, MQTT topic, NATS subject. |
| Transform per-source route | Reshapes each event before fan-out. Keyed by {source}:{machine.id}, with a shared default. Either a DSL program or a JavaScript function. |
| Sink fan-out | Delivers the result — hub (WebSocket), HMAC-signed HTTP push, NATS, MQTT, SQLite, ClickHouse, Postgres. |
| Flow canvas tab | A UI grouping of sources → transforms → sinks on the canvas. Layouts persist to flows.layout.json; routing stays global. |
Every event is normalized to CanonicalMachineData at the source boundary, so transforms and sinks always see the same shape regardless of protocol.
Flows
A flow is a canvas tab — a saved layout of source, transform, and sink nodes. The engine runs each source as an async polling task; each record is routed through its transform, then fans out to the configured sink set. Events are passed as native CanonicalMachineData structs, never re-serialized at the DSL boundary — which is why the DSL is so fast.
Per-source routing
Transforms are addressed by {source}:{machine.id}. A record looks up its own route first, then falls back to default. Each route carries its own script and its own sink set, so one source can drop noisy machines, enrich others, and push to different backends:
transform:
enabled: true
default:
backend: "dsl"
script_inline: "set metadata.processedAt = timestamp"
sinks: ["storage", "push"]
sources:
- key: "http:ERP-01"
backend: "js"
script: "./transforms/erp.js"
sinks: ["nats", "clickhouse"]
Configuration reference
OpexFlow reads YAML (config.yaml by default). Any string value supports ${ENV_VAR} substitution — useful for secrets. Top-level sections:
| Key | Description |
|---|---|
connector | Identity + delivery: name, push_endpoint, hmac_secret, health_check_port (default 3000), poll_interval_ms. |
adapters | modbus / opcua / http device definitions. |
nats / mqtt | Stream sources (ingest) — also usable as sinks. |
transform | Per-source routes, default route, caps (timeout_ms, memory_limit_mb, on_error). |
clickhouse / postgres | Sink connection config. Postgres expects TimescaleDB. |
ai | OpenAI-compatible codegen endpoint (see AI generator). |
telemetry | Stall-alert threshold, sample interval, alert targets (webhook / Telegram). |
Transform DSL
The OpexFlow DSL is a tiny, statement-based language evaluated natively against the in-memory event struct. It exists for the 90% of transforms that are filters and field math — where spinning up a JS interpreter per event is pure waste.
DSL programs live inline (script_inline) or in a file (script), exactly where a JS script would. Pick the backend with backend: "dsl".
Syntax overview
A program is a list of statements, one per line. Comments start with #. There are exactly three statements:
# a complete DSL program
when metadata.temp > 80 || metadata.vib > 5.0 # keep iff true; else DROP the record
set metadata.alert = true # mutate a field
set status.state = "alarm" # ... another field
| Statement | Meaning |
|---|---|
when <expr> | Filter. The record is kept iff <expr> is true; otherwise it's dropped (nothing flows to sinks). At most one when per program. |
set <path> = <expr> | Mutate a field. Targets: metadata.* (nested, auto-created), status.state, status.alarmCode, status.alarmMessage, machine.id, machine.name. |
drop | Drop the record unconditionally. Equivalent to JS return null. |
If a program has no when and no drop, the (possibly mutated) record always passes through to its sinks.
Expressions & operators
| Operator | Description |
|---|---|
|| && ! | Boolean logic (or / and / not). |
== != < <= > >= | Comparisons, return booleans. Numeric first, string ordering as fallback. |
+ - * / | Arithmetic. + also concatenates two strings. |
( ) | Grouping. |
Literals
42 3.14 # numbers (float)
"running" # string
true false null # booleans, null
Paths
Paths read directly from CanonicalMachineData. Dotted, with numeric segments indexing arrays:
| Path | Reads |
|---|---|
source | Origin protocol (modbus, opcua, http, …). |
machine.id / machine.name | Stable machine id / display name. |
status.state | running | idle | alarm | offline. |
status.alarmCode / status.alarmMessage | Active alarm (or null). |
spindle.speed / spindle.load | Spindle readings (null if no spindle). |
axes.<i>.position | Indexed axis position (also .name, .unit). |
metadata.<key>... | Free-form nested JSON — where sensors like temp, vib, rpm live. |
Examples
Conditional alert (filter + flag)
when metadata.temp > 80 || metadata.vib > 5.0
set metadata.alert = metadata.temp > 80 || metadata.vib > 5.0
Force alarm state on high load
when spindle.load > 90
set status.state = "alarm"
set status.alarmMessage = "spindle overload"
Unit conversion + nested metadata
set metadata.temp_f = metadata.temp_c * 9 / 5 + 32
set metadata.flags.hot = metadata.temp_c > 60
Pass only running machines
when !(status.state == "offline")
when expr — keep iff trueset path = expr — mutate fielddrop — drop the recordmetadata.x — sensor valuesstatus.state — run state|| && ! — boolean logic== != > < — comparisons+ - * / — arithmetic# comment — one per line"str" / true / nullTry it — live REPL
Edit the record and the program, then run. This is an in-browser evaluator of the documented DSL subset operating on the record object — no server round-trip.
// press RunJavaScript transforms
When a transform needs loops, regex, or logic the DSL can't express, use a JavaScript transform. It runs in an embedded, sandboxed QuickJS runtime — no filesystem, no network, hard memory + stack limits, and a deadline interrupt. A worker pool (one runtime per scripted route, sharded by machine id) gives parallelism with backpressure.
// enrich.js — runs once per record
function transform(record, ctx) {
if (record.metadata.temp > 95) return null; // drop
record.metadata.risk = record.metadata.temp * 0.8
+ record.metadata.vib * 1.2;
return record; // pass on (mutated)
}
| Contract | Behavior |
|---|---|
transform(record, ctx) | Called per record. record is the CanonicalMachineData object; ctx is { source, timestamp, machineId }. |
| Return an object | The object replaces the record and flows to its sinks (must deserialize back to CanonicalMachineData). |
Return null / undefined | The record is dropped (same as DSL drop). |
| Throw / timeout / bad output | Applies on_error policy: passthrough (default, never lose data), drop, or fail. |
AI transform generator
Describe the transform in plain language; OpexFlow writes it. The generator talks to any OpenAI-compatible /chat/completions endpoint, is steered to emit DSL first (falling back to JS when the rule isn't expressible), and server-validates the output before returning it.
ai:
enabled: true
base_url: "https://api.openai.com/v1" # any OpenAI-compatible base
model: "gpt-4o-mini"
api_key: "\${OPEX_AI_API_KEY}"
max_tokens: 1200
In the UI's AI tab you sketch the input shape, the rule, and the desired output — e.g. "emit only when temp exceeds 80 or vibration crosses 5.0" — and hit Generate. The model maps your fields onto metadata.* and returns a runnable program you paste into the editor and save. Endpoint: POST /api/transform/generate.
Sources
Each source is a protocol adapter configured under adapters (or nats/mqtt for streaming). All normalize to CanonicalMachineData.
| Type | Key options |
|---|---|
modbus | host, port, unit, registers (with mapping to axes/spindle/metadata) |
opcua | endpoint, nodes (mapping types), security |
http | url, interval, headers — polls any REST endpoint |
mqtt | url, topic, qos (ingests under opexmx.ingest.*) |
nats | url, subject (ingests under opexmx.ingest.*) |
inject | Manual / test source fired from the UI or POST /api/sources/{key}/inject |
Sinks
Each route declares which sinks it fans out to (omit to inherit the default, which is all sinks). Names map case-insensitively.
| Name | Aliases | Delivers to |
|---|---|---|
hub | ws | In-process WebSocket hub (live canvas preview). |
push | http, webhook | HMAC-signed POST to connector.push_endpoint. |
nats | jetstream | NATS subject. |
mqtt | — | MQTT topic. |
storage | sqlite | Local SQLite retention. |
clickhouse | ch | ClickHouse table. |
postgres | pg, timescale | Postgres / TimescaleDB hypertable. |
push sink signs every request with the connector's hmac_secret so receivers can verify origin and integrity.Observability
Observability is fully native — no sidecar collector required. The connector serves Prometheus metrics and keeps in-process traces + a SQLite history series, all viewable in its own Observability tab.
- Metrics at
/metrics—opex_connector_records_total,opex_connector_sink_total,opex_connector_push_duration_seconds,opex_connector_last_publish_ts. - Traces — one span per record through source → transform → sink, queryable via
/api/obs/traces. - History — sampled metric series in SQLite (
/api/obs/series). - Alerts — sink-stall detection; delivers to a webhook and/or Telegram.
- Grafana — provisioned dashboards ship in the
grafana/directory.
telemetry:
stall_threshold_secs: 300
sample_interval_secs: 15
alerts:
webhook_url: "https://hooks/ops"
telegram: { bot_token: "\${TG_TOKEN}", chat_id: "-100…" }
REST API
The same API the canvas UI uses. JSON over HTTP, served on health_check_port (default 3000).
| Method & path | Description |
|---|---|
GET /api/health / /api/ready | Liveness / readiness + poller status. |
GET /api/sources | Configured sources + live status. |
POST /api/sources/{key}/inject | Manually fire an inject source. |
GET /api/transform | Transform caps + default + routes. |
GET/PUT /api/transform/default | Read / hot-replace the default route. |
GET/PUT/DELETE /api/transform/routes/{key} | Manage a per-source route. |
POST /api/transform/validate | Syntax-check a candidate script (JS or DSL). |
POST /api/transform/test | Dry-run a script against a sample record. |
POST /api/transform/generate | AI-generate a transform from a description. |
GET /api/devices / /api/data | Known devices + recent retained events. |
GET /api/flows / PUT /api/flows/{id} | Canvas flow layouts. |
GET /api/config / /api/config/reload | Read / hot-reload config.yaml. |
GET /api/obs/{traces,series,alerts} | Native traces, history, alerts. |
GET /metrics | Prometheus scrape endpoint. |
WS /api/ws/live | Live event stream (canvas preview). |
Deployment
systemd
# /etc/systemd/system/opexmx-connector.service
[Unit]
Description=OpexMX Connector
After=network.target
[Service]
Environment=OPEXMX_HMAC_SECRET=change-me
ExecStart=/usr/local/bin/opexmx-connector /etc/opexmx/config.yaml
Restart=on-failure
User=opexmx
[Install]
WantedBy=multi-user.target
Edge (ARM64)
The same static binary cross-compiles to aarch64-unknown-linux-musl. Deploy identical artifacts from Raspberry Pi to rack server.
CLI
The connector takes an optional config path (defaults to config.yaml):
opexmx-connector # run with ./config.yaml
opexmx-connector /etc/opexmx/config.yaml
opexmx-connector --help
Validate transforms from the UI (Check) or the API (POST /api/transform/validate) rather than a separate CLI subcommand. Set secrets via environment variables and reference them as ${VAR} in the config.