/ home / docs v2.0

OpexFlow Documentation

Industrial data middleware: the OpexMX Connector — ingest from any protocol, transform in flight with JavaScript or the native DSL, deliver to any sink, all from a single static Rust binary with an embedded UI.

Welcome to the OpexFlow docs. OpexFlow is the product layer over the open-source OpexMX Connector. New here? Start with Quick start. If you only read one section, make it the Transform DSL — it's the fastest path through your data.

Two transform engines JavaScript runs in a sandboxed QuickJS runtime (a worker pool, one runtime per scripted route) for full flexibility. The native DSL is evaluated directly against the in-memory CanonicalMachineData struct — no JSON marshalling, no interpreter VM — so it's the lowest-latency path for filters and field math. You rarely write either by hand: the AI transform generator emits both.

Installation

The connector ships as a single static binary (musl) with the React UI embedded. No runtime, no host dependencies.

Docker (full stack)

The repo's docker-compose.yml brings up Postgres (TimescaleDB), ClickHouse, NATS, and the connector together:

git clone https://github.com/Opex-Technologies-Pte-Ltd/opex-connector.git
cd opex-connector
cp config.example.yaml config.yaml      # edit devices + push_endpoint
docker compose up -d                    # UI + API on :3080

Build from source

cargo build --release
# default features: modbus,opcua,nats,http,mqtt,clickhouse,postgres,transform,static-ui
./target/release/opexmx-connector config.yaml

Feature flags are additive — build only the adapters you ship to the edge to keep the binary small. The QuickJS transform layer is the transform feature; the DSL needs no extra feature.

Quick start

Put this in config.yaml: a connector identity, one Modbus source, and a default DSL transform.

connector:
  name: "shop-floor-pilot"
  push_endpoint: "https://api.opex.mx/ingest"
  hmac_secret: "\${OPEXMX_HMAC_SECRET}"
  health_check_port: 3000

adapters:
  modbus:
    enabled: true
    devices:
      - id: "fanuc-01"
        host: "192.168.1.10"
        port: 502
        registers: [ ... ]

transform:
  enabled: true
  default:
    backend: "dsl"
    script_inline: |
      when metadata.temp > 80 || metadata.vib > 5.0
      set metadata.alert = metadata.temp > 80 || metadata.vib > 5.0'
export OPEXMX_HMAC_SECRET=change-me
./target/release/opexmx-connector config.yaml

Open the canvas UI at http://localhost:3000. Every polled record now runs through the DSL before fanning out to your sinks.

Core concepts

OpexFlow is built around four primitives. Everything else is plumbing.

PrimitiveRole
Source adapterIngests events from a protocol — Modbus poll, OPC-UA subscription, HTTP REST poll, MQTT topic, NATS subject.
Transform per-source routeReshapes each event before fan-out. Keyed by {source}:{machine.id}, with a shared default. Either a DSL program or a JavaScript function.
Sink fan-outDelivers the result — hub (WebSocket), HMAC-signed HTTP push, NATS, MQTT, SQLite, ClickHouse, Postgres.
Flow canvas tabA UI grouping of sources → transforms → sinks on the canvas. Layouts persist to flows.layout.json; routing stays global.

Every event is normalized to CanonicalMachineData at the source boundary, so transforms and sinks always see the same shape regardless of protocol.

Flows

A flow is a canvas tab — a saved layout of source, transform, and sink nodes. The engine runs each source as an async polling task; each record is routed through its transform, then fans out to the configured sink set. Events are passed as native CanonicalMachineData structs, never re-serialized at the DSL boundary — which is why the DSL is so fast.

Per-source routing

Transforms are addressed by {source}:{machine.id}. A record looks up its own route first, then falls back to default. Each route carries its own script and its own sink set, so one source can drop noisy machines, enrich others, and push to different backends:

transform:
  enabled: true
  default:
    backend: "dsl"
    script_inline: "set metadata.processedAt = timestamp"
    sinks: ["storage", "push"]
  sources:
    - key: "http:ERP-01"
      backend: "js"
      script: "./transforms/erp.js"
      sinks: ["nats", "clickhouse"]

Configuration reference

OpexFlow reads YAML (config.yaml by default). Any string value supports ${ENV_VAR} substitution — useful for secrets. Top-level sections:

KeyDescription
connectorIdentity + delivery: name, push_endpoint, hmac_secret, health_check_port (default 3000), poll_interval_ms.
adaptersmodbus / opcua / http device definitions.
nats / mqttStream sources (ingest) — also usable as sinks.
transformPer-source routes, default route, caps (timeout_ms, memory_limit_mb, on_error).
clickhouse / postgresSink connection config. Postgres expects TimescaleDB.
aiOpenAI-compatible codegen endpoint (see AI generator).
telemetryStall-alert threshold, sample interval, alert targets (webhook / Telegram).

Transform DSL

The OpexFlow DSL is a tiny, statement-based language evaluated natively against the in-memory event struct. It exists for the 90% of transforms that are filters and field math — where spinning up a JS interpreter per event is pure waste.

Why a DSL Every JavaScript transform pays two taxes: marshal the event to a JS object and back (four serialize passes across the FFI boundary), then interpret the function. The DSL skips both — it compiles once at route load, then reads and mutates typed fields in place with no thread hop. It runs 15–50× faster than the equivalent JS for simple transforms. For anything the DSL can't express (loops, regex, string munging), drop into JavaScript.

DSL programs live inline (script_inline) or in a file (script), exactly where a JS script would. Pick the backend with backend: "dsl".

Syntax overview

A program is a list of statements, one per line. Comments start with #. There are exactly three statements:

# a complete DSL program
when metadata.temp > 80 || metadata.vib > 5.0   # keep iff true; else DROP the record
set metadata.alert = true                 # mutate a field
set status.state = "alarm"                 # ... another field
StatementMeaning
when <expr>Filter. The record is kept iff <expr> is true; otherwise it's dropped (nothing flows to sinks). At most one when per program.
set <path> = <expr>Mutate a field. Targets: metadata.* (nested, auto-created), status.state, status.alarmCode, status.alarmMessage, machine.id, machine.name.
dropDrop the record unconditionally. Equivalent to JS return null.

If a program has no when and no drop, the (possibly mutated) record always passes through to its sinks.

Expressions & operators

OperatorDescription
||   &&   !Boolean logic (or / and / not).
== != < <= > >=Comparisons, return booleans. Numeric first, string ordering as fallback.
+ - * /Arithmetic. + also concatenates two strings.
( )Grouping.

Literals

42   3.14             # numbers (float)
"running"            # string
true  false  null    # booleans, null

Paths

Paths read directly from CanonicalMachineData. Dotted, with numeric segments indexing arrays:

PathReads
sourceOrigin protocol (modbus, opcua, http, …).
machine.id / machine.nameStable machine id / display name.
status.staterunning | idle | alarm | offline.
status.alarmCode / status.alarmMessageActive alarm (or null).
spindle.speed / spindle.loadSpindle readings (null if no spindle).
axes.<i>.positionIndexed axis position (also .name, .unit).
metadata.<key>...Free-form nested JSON — where sensors like temp, vib, rpm live.

Examples

Conditional alert (filter + flag)

when metadata.temp > 80 || metadata.vib > 5.0
set metadata.alert = metadata.temp > 80 || metadata.vib > 5.0

Force alarm state on high load

when spindle.load > 90
set status.state = "alarm"
set status.alarmMessage = "spindle overload"

Unit conversion + nested metadata

set metadata.temp_f = metadata.temp_c * 9 / 5 + 32
set metadata.flags.hot = metadata.temp_c > 60

Pass only running machines

when !(status.state == "offline")
Tip Don't memorize this. Describe what you want in the AI transform panel and OpexFlow writes the DSL (or JS) for you — server-validated before it lands.
DSL cheat sheet
when expr — keep iff true
set path = expr — mutate field
drop — drop the record
metadata.x — sensor values
status.state — run state
|| && ! — boolean logic
== != > < — comparisons
+ - * / — arithmetic
# comment — one per line
"str" / true / null

Try it — live REPL

Edit the record and the program, then run. This is an in-browser evaluator of the documented DSL subset operating on the record object — no server round-trip.

// press Run

JavaScript transforms

When a transform needs loops, regex, or logic the DSL can't express, use a JavaScript transform. It runs in an embedded, sandboxed QuickJS runtime — no filesystem, no network, hard memory + stack limits, and a deadline interrupt. A worker pool (one runtime per scripted route, sharded by machine id) gives parallelism with backpressure.

// enrich.js — runs once per record
function transform(record, ctx) {
  if (record.metadata.temp > 95) return null;        // drop
  record.metadata.risk = record.metadata.temp * 0.8
                      + record.metadata.vib * 1.2;
  return record;                         // pass on (mutated)
}
ContractBehavior
transform(record, ctx)Called per record. record is the CanonicalMachineData object; ctx is { source, timestamp, machineId }.
Return an objectThe object replaces the record and flows to its sinks (must deserialize back to CanonicalMachineData).
Return null / undefinedThe record is dropped (same as DSL drop).
Throw / timeout / bad outputApplies on_error policy: passthrough (default, never lose data), drop, or fail.
Choose wisely Prefer the DSL for filters and field math. Drop to JS only when you need expressiveness — it costs the marshal + interpreter tax per record.

AI transform generator

Describe the transform in plain language; OpexFlow writes it. The generator talks to any OpenAI-compatible /chat/completions endpoint, is steered to emit DSL first (falling back to JS when the rule isn't expressible), and server-validates the output before returning it.

ai:
  enabled: true
  base_url: "https://api.openai.com/v1"   # any OpenAI-compatible base
  model: "gpt-4o-mini"
  api_key: "\${OPEX_AI_API_KEY}"
  max_tokens: 1200

In the UI's AI tab you sketch the input shape, the rule, and the desired output — e.g. "emit only when temp exceeds 80 or vibration crosses 5.0" — and hit Generate. The model maps your fields onto metadata.* and returns a runnable program you paste into the editor and save. Endpoint: POST /api/transform/generate.

Sources

Each source is a protocol adapter configured under adapters (or nats/mqtt for streaming). All normalize to CanonicalMachineData.

TypeKey options
modbushost, port, unit, registers (with mapping to axes/spindle/metadata)
opcuaendpoint, nodes (mapping types), security
httpurl, interval, headers — polls any REST endpoint
mqtturl, topic, qos (ingests under opexmx.ingest.*)
natsurl, subject (ingests under opexmx.ingest.*)
injectManual / test source fired from the UI or POST /api/sources/{key}/inject

Sinks

Each route declares which sinks it fans out to (omit to inherit the default, which is all sinks). Names map case-insensitively.

NameAliasesDelivers to
hubwsIn-process WebSocket hub (live canvas preview).
pushhttp, webhookHMAC-signed POST to connector.push_endpoint.
natsjetstreamNATS subject.
mqttMQTT topic.
storagesqliteLocal SQLite retention.
clickhousechClickHouse table.
postgrespg, timescalePostgres / TimescaleDB hypertable.
HMAC signing The push sink signs every request with the connector's hmac_secret so receivers can verify origin and integrity.

Observability

Observability is fully native — no sidecar collector required. The connector serves Prometheus metrics and keeps in-process traces + a SQLite history series, all viewable in its own Observability tab.

  • Metrics at /metricsopex_connector_records_total, opex_connector_sink_total, opex_connector_push_duration_seconds, opex_connector_last_publish_ts.
  • Traces — one span per record through source → transform → sink, queryable via /api/obs/traces.
  • History — sampled metric series in SQLite (/api/obs/series).
  • Alerts — sink-stall detection; delivers to a webhook and/or Telegram.
  • Grafana — provisioned dashboards ship in the grafana/ directory.
telemetry:
  stall_threshold_secs: 300
  sample_interval_secs: 15
  alerts:
    webhook_url: "https://hooks/ops"
    telegram: { bot_token: "\${TG_TOKEN}", chat_id: "-100…" }

REST API

The same API the canvas UI uses. JSON over HTTP, served on health_check_port (default 3000).

Method & pathDescription
GET /api/health / /api/readyLiveness / readiness + poller status.
GET /api/sourcesConfigured sources + live status.
POST /api/sources/{key}/injectManually fire an inject source.
GET /api/transformTransform caps + default + routes.
GET/PUT /api/transform/defaultRead / hot-replace the default route.
GET/PUT/DELETE /api/transform/routes/{key}Manage a per-source route.
POST /api/transform/validateSyntax-check a candidate script (JS or DSL).
POST /api/transform/testDry-run a script against a sample record.
POST /api/transform/generateAI-generate a transform from a description.
GET /api/devices / /api/dataKnown devices + recent retained events.
GET /api/flows / PUT /api/flows/{id}Canvas flow layouts.
GET /api/config / /api/config/reloadRead / hot-reload config.yaml.
GET /api/obs/{traces,series,alerts}Native traces, history, alerts.
GET /metricsPrometheus scrape endpoint.
WS /api/ws/liveLive event stream (canvas preview).

Deployment

systemd

# /etc/systemd/system/opexmx-connector.service
[Unit]
Description=OpexMX Connector
After=network.target

[Service]
Environment=OPEXMX_HMAC_SECRET=change-me
ExecStart=/usr/local/bin/opexmx-connector /etc/opexmx/config.yaml
Restart=on-failure
User=opexmx

[Install]
WantedBy=multi-user.target

Edge (ARM64)

The same static binary cross-compiles to aarch64-unknown-linux-musl. Deploy identical artifacts from Raspberry Pi to rack server.

CLI

The connector takes an optional config path (defaults to config.yaml):

opexmx-connector                 # run with ./config.yaml
opexmx-connector /etc/opexmx/config.yaml
opexmx-connector --help

Validate transforms from the UI (Check) or the API (POST /api/transform/validate) rather than a separate CLI subcommand. Set secrets via environment variables and reference them as ${VAR} in the config.