Skip to content

Modbus¤

ModbusDevice is a config-driven Modbus client. Describe your device in a JSON file (registers, addresses, data types) and interact using human-readable aliases instead of raw addresses.

from instro.modbus import ModbusDevice

connection = {"transport": "tcp", "host": "192.168.1.10", "port": 502}
device = ModbusDevice("my_device.json", connection=connection, autostart=True)
device.read("temperature")
device.write("setpoint", 75.5)
device.write("mode", "auto")  # string values via write_value_map
device.close()

Or build the config in code with full IDE autocomplete:

from instro.modbus import ModbusDevice, ModbusConfig, TCPConnection
from instro.modbus.types import (
    DeviceInfo, RegisterDef, TimingConfig,
)

config = ModbusConfig(
    device=DeviceInfo(name="my_device"),
    timing=TimingConfig(poll_interval=1.0, write_delay_ms=300),
    registers=[
        RegisterDef(name="temperature", starting_address=0, data_type="float32",
                    write_min=-40.0, write_max=500.0),
        RegisterDef(name="mode", starting_address=100, data_type="uint16",
                    write_value_map={"off": 0, "auto": 1, "manual": 2}),
    ],
)
connection = TCPConnection(host="192.168.1.10")
device = ModbusDevice(config, connection=connection, autostart=True)

Sample Config¤

A complete config for a heat exchanger with temperature sensors, flow rate, a setpoint, a pump coil, and a status register with bitmap extraction:

{
  "version": 1,
  "protocol": "modbus",
  "device": {
    "name": "heat_exchanger",
    "description": "Heat exchanger monitoring and control",
    "manufacturer": "Acme Thermal",
    "model": "HX-200"
  },
  "timing": {
    "poll_interval": 1.0,
    "write_delay_ms": 100
  },
  "registers": [
    {
      "name": "inlet_temp",
      "starting_address": 0,
      "register_type": "input",
      "data_type": "float32",
      "word_swap": true,
      "read_group": "temperatures"
    },
    {
      "name": "outlet_temp",
      "starting_address": 2,
      "register_type": "input",
      "data_type": "float32",
      "word_swap": true,
      "read_group": "temperatures"
    },
    {
      "name": "flow_rate",
      "starting_address": 4,
      "register_type": "input",
      "data_type": "uint16",
      "scale": {
        "type": "linear",
        "gain": 0.1,
        "offset": 0
      }
    },
    {
      "name": "pressure_psi",
      "starting_address": 5,
      "register_type": "input",
      "data_type": "uint16",
      "scale": {
        "type": "linear",
        "gain": 0.01,
        "offset": 0
      }
    },
    {
      "name": "setpoint",
      "starting_address": 100,
      "register_type": "holding",
      "data_type": "float32",
      "word_swap": true,
      "write_min": 50.0,
      "write_max": 250.0
    },
    {
      "name": "operating_mode",
      "starting_address": 102,
      "register_type": "holding",
      "data_type": "uint16",
      "write_value_map": {
        "off": 0,
        "standby": 1,
        "run": 2,
        "flush": 3
      },
      "write_min": 0,
      "write_max": 3
    },
    {
      "name": "pump_enable",
      "starting_address": 0,
      "register_type": "coil"
    },
    {
      "name": "status_register",
      "starting_address": 200,
      "register_type": "input",
      "data_type": "uint16",
      "bitmap": [
        {"name": "pump_running", "bit_index": 0},
        {"name": "alarm_high_temp", "bit_index": 1},
        {"name": "alarm_low_flow", "bit_index": 2},
        {"name": "fault", "bit_index": 15}
      ]
    }
  ]
}

JSON Config Reference¤

Connection¤

Connection can be provided in the config or passed to the ModbusDevice constructor. The constructor parameter takes precedence, allowing the config to be a standalone device description shared across environments.

device = ModbusDevice(
    "my_device.json",
    connection={"transport": "tcp", "host": "192.168.1.10", "port": 502},
)
{
    "connection": {
        "transport": "tcp",
        "host": "192.168.1.10",
        "port": 502,
        "unit_id": 1,
        "timeout": 3.0
    }
}
{
    "connection": {
        "transport": "rtu",
        "port": "/dev/ttyUSB0",
        "baudrate": 9600,
        "parity": "N",
        "stopbits": 1,
        "bytesize": 8,
        "unit_id": 1,
        "timeout": 3.0
    }
}

Serial port paths vary by platform:

  • Linux: /dev/ttyUSB0, /dev/ttyACM0
  • macOS: /dev/cu.usbserial-1234, /dev/cu.usbmodem1234
  • Windows: COM3, COM4

Timing¤

Controls background polling interval and write delay:

{
    "timing": {
        "poll_interval": 1.0,
        "write_delay_ms": 300
    }
}
Field Type Default Description
poll_interval float required Seconds between polling cycles (0.01 to 10.0)
write_delay_ms int 0 Milliseconds to sleep after each write

Activate polling with autostart=True in the constructor, or call open() then start() manually. The write delay is applied automatically after every write() call, with no manual time.sleep() needed.

Registers¤

Each register entry defines a named channel:

Field Type Default Description
name string required Alias used in read() and write()
starting_address int required Modbus register address (0 to 65535)
register_type string "holding" "holding", "input", "coil", or "discrete"
data_type string "uint16" "uint16", "int16", "uint32", "int32", "uint64", "int64", "float32", "float64", "bool"
byte_swap bool false Swap bytes within 16-bit words
word_swap bool false Swap 16-bit words (32-bit and 64-bit types)
long_swap bool false Swap 32-bit halves (64-bit types only)
scale object null Linear scaling config, e.g. {"type": "linear", "gain": <float>, "offset": <float>}. See Scaling
bitmap list null Bit extraction: [{"name": "alarm", "bit_index": 0}]
poll bool true Include in background polling
write_min number null Minimum allowed write value, in the same units the caller passes to write() (scaled if scale is set, raw otherwise). Holding registers only.
write_max number null Maximum allowed write value, in the same units the caller passes to write() (scaled if scale is set, raw otherwise). Holding registers only.
write_value_map object null Map string labels to register values (holding registers only)
read_group string null Group ID for batched reads (all registers in a group are read in one transaction)

Scaling¤

Linear scaling converts between raw register values and physical units:

physical = offset + (gain * raw)
{
    "name": "pressure_psi",
    "starting_address": 10,
    "data_type": "uint16",
    "scale": {"type": "linear", "gain": 0.01, "offset": 0}
}

Write Value Map¤

Map human-readable strings to raw register values. Eliminates magic numbers in application code:

{
    "name": "control_mode",
    "starting_address": 100,
    "data_type": "uint16",
    "write_value_map": {
        "off": 0,
        "auto": 1,
        "manual": 2
    }
}
device.write("control_mode", "auto")   # writes 1
device.write("control_mode", 1)        # also works

Values in the map must be unique and must fall within write_min/write_max if those are set.

Write Limits¤

Reject writes outside a safe range before they reach the device:

{
    "name": "setpoint",
    "starting_address": 0,
    "data_type": "float32",
    "write_min": 32.0,
    "write_max": 300.0
}
device.write("setpoint", 150.0)   # ok
device.write("setpoint", 999.0)   # raises ValueError

Limits are checked in physical units (before scaling).

Read Groups¤

Registers with the same read_group are read in a single Modbus transaction, reducing the number of round trips per polling cycle:

{
    "name": "heat_power",
    "starting_address": 100,
    "data_type": "float32",
    "read_group": "power"
},
{
    "name": "cool_power",
    "starting_address": 102,
    "data_type": "float32",
    "read_group": "power"
}

Constraints:

  • All registers in a group must share the same register_type
  • All registers in a group must have poll: true
  • Holding/input groups cannot span more than 125 registers
  • Coil/discrete groups cannot span more than 2000 addresses

Bitmap¤

Extract individual bits from a uint16 holding or input register as named channels:

{
    "name": "status_register",
    "starting_address": 100,
    "data_type": "uint16",
    "bitmap": [
        {"name": "alarm_high", "bit_index": 0},
        {"name": "alarm_low", "bit_index": 1},
        {"name": "motor_running", "bit_index": 5}
    ]
}

Reading status_register returns the raw value plus each bit as a separate channel (0 or 1).

API Reference¤

ModbusDevice¤

Bases: Instrument

Config-driven Modbus client. Semantic access by register alias from a ModbusConfig.

Parameters:

  • config ¤

    (ModbusConfig | dict | Path | str) –

    A ModbusConfig, a dict (validated via Pydantic), or a path to a JSON config.

  • connection ¤

    (TCPConnection | RTUConnection | dict | None, default: None ) –

    Overrides config.connection. Accepts a TCPConnection, RTUConnection, or a dict (with transport = "tcp" / "rtu"). Required if the config has no connection section.

  • name ¤

    (str | None, default: None ) –

    Channel-name prefix; falls back to config.device.name.

  • publishers ¤

    (list[Publisher] | None, default: None ) –

    Publishers that receive emitted Measurement/Command data.

  • autostart ¤

    (bool, default: False ) –

    When True, open the connection and start background polling. Requires a timing section (with poll_interval) — passing autostart=True without one is an error.

  • **kwargs ¤

    Default tags applied to every emitted Measurement/Command.

Raises:

  • ValueError

    No connection in args or config, or autostart=True with no timing section.

name instance-attribute ¤

name = name

legacy_naming instance-attribute ¤

legacy_naming = legacy_naming

publishers instance-attribute ¤

publishers = publishers or []

default_tags instance-attribute ¤

default_tags: dict[str, str] = {}

background_enable property writable ¤

background_enable

Whether the background daemon is enabled (must still be start()-ed).

background_interval instance-attribute ¤

background_interval = poll_interval

unit_id property ¤

unit_id: int

Modbus unit/slave ID from the active connection config.

add_publisher ¤

add_publisher(publisher: Publisher)

Register a publisher to receive this instrument's Measurement/Command data.

publish ¤

publish(data: Measurement | Command, **kwargs)

Fan data out to every configured publisher; kwargs pass through.

add_background_daemon_function ¤

add_background_daemon_function(method: Callable, *args, **kwargs)

Append method to the daemon's call list. Use define_background_daemon to replace instead.

start ¤

start()

Start the background daemon thread. No-op if already running.

stop ¤

stop()

Signal the background daemon to stop and join it. No-op if not running.

get_channel ¤

get_channel(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Return the most recent length samples for channel_name from the in-memory buffer.

Parameters:

  • channel_name ¤

    (str) –

    Name of the channel to retrieve.

  • length ¤

    (int, default: 1 ) –

    Number of trailing samples to return.

  • wait_for_latest ¤

    (bool, default: False ) –

    Block until at least length new values arrive.

  • timeout ¤

    (float, default: 10.0 ) –

    Seconds to wait when wait_for_latest=True.

Raises:

  • RuntimeError

    No background buffer; start() was not called.

  • ChannelNotFoundTimeoutError

    wait_for_latest=True and channel did not appear within timeout.

  • ChannelValueTimeoutError

    wait_for_latest=True and values did not arrive within timeout.

define_background_daemon ¤

define_background_daemon(method: Callable, *args, **kwargs)

Replace all daemon functions with a single method (called with the given args).

open ¤

open() -> None

Open the Modbus TCP/RTU connection.

close ¤

close() -> None

Close the connection and stop the daemon.

read ¤

read(alias: str, **kwargs) -> Measurement

Read the register named alias and return the scaled value.

write ¤

write(alias: str, value: float | int | bool | str, **kwargs) -> Command

Write value to the register named alias.

value is in physical units when a scale is configured. For coils, pass True/False. For registers with a write_value_map, pass the string key to look up the mapped value.

Raises:

  • TypeError

    Value type does not match the register's data type.

  • KeyError

    String value not found in write_value_map.

  • ValueError

    Read-only register, value violates write_min/write_max, or scaled raw is out of range for the data type.

Configuration Types¤

Modbus configuration types (Pydantic). DeviceInfo/LinearScale/ScaleType come from instro.lib.types.

ModbusConfig ¤

Bases: BaseModel

Complete Modbus device configuration. Load from JSON via ModbusConfig.from_json(path).

version class-attribute instance-attribute ¤

version: int = 1

protocol class-attribute instance-attribute ¤

protocol: str = 'modbus'

device instance-attribute ¤

device: DeviceInfo

timing class-attribute instance-attribute ¤

timing: TimingConfig | None = None

connection class-attribute instance-attribute ¤

connection: ConnectionType | None = Field(
    default=None, discriminator="transport"
)

registers class-attribute instance-attribute ¤

registers: list[RegisterDef] = Field(default_factory=list)

model_post_init ¤

model_post_init(__context) -> None

Cross-register validation: uniqueness, overlap, groups.

get_group ¤

get_group(group_id: str) -> list[RegisterDef]

Return the registers in group_id, sorted by starting_address.

from_json classmethod ¤

from_json(path: Path | str) -> ModbusConfig

Load and validate a configuration from a JSON file.

get_register ¤

get_register(name: str) -> RegisterDef

Return the register definition for name. Raises KeyError if not found.

TimingConfig ¤

Bases: BaseModel

Timing configuration for Modbus polling and write delays.

poll_interval class-attribute instance-attribute ¤

poll_interval: float = Field(
    ge=0.01, le=10.0, description="Polling interval in seconds"
)

write_delay_ms class-attribute instance-attribute ¤

write_delay_ms: int = Field(
    default=0,
    ge=0,
    description="Delay in milliseconds applied after every write",
)

TCPConnection ¤

Bases: BaseModel

Modbus TCP connection configuration.

transport class-attribute instance-attribute ¤

transport: Literal['tcp'] = 'tcp'

host instance-attribute ¤

host: str

port class-attribute instance-attribute ¤

port: int = Field(default=502, ge=1, le=65535)

unit_id class-attribute instance-attribute ¤

unit_id: int = Field(default=1, ge=0, le=255)

timeout class-attribute instance-attribute ¤

timeout: float = Field(
    default=3.0, gt=0, description="Response timeout in seconds"
)

RTUConnection ¤

Bases: BaseModel

Modbus RTU (serial) connection configuration.

transport class-attribute instance-attribute ¤

transport: Literal['rtu'] = 'rtu'

port instance-attribute ¤

port: str

baudrate class-attribute instance-attribute ¤

baudrate: int = 9600

parity class-attribute instance-attribute ¤

parity: Literal['N', 'E', 'O'] = 'N'

stopbits class-attribute instance-attribute ¤

stopbits: Literal[1, 2] = 1

bytesize class-attribute instance-attribute ¤

bytesize: Literal[5, 6, 7, 8] = 8

unit_id class-attribute instance-attribute ¤

unit_id: int = Field(default=1, ge=0, le=255)

timeout class-attribute instance-attribute ¤

timeout: float = Field(
    default=3.0, gt=0, description="Response timeout in seconds"
)

RegisterDef ¤

Bases: BaseModel

Definition of a Modbus register.

Note
  • Swap options control byte ordering for multi-byte values:
  • byte_swap: swap bytes within each 16-bit word
  • word_swap: swap 16-bit words (for 32-bit and 64-bit types)
  • long_swap: swap 32-bit longs (for 64-bit types only)
  • All swap options default to False (big-endian / network byte order).
  • scale is not allowed for coils and discrete inputs.

name class-attribute instance-attribute ¤

name: str = Field(description='Unique name/alias for this register')

description class-attribute instance-attribute ¤

description: str | None = None

starting_address class-attribute instance-attribute ¤

starting_address: int = Field(ge=0, le=65535)

register_type class-attribute instance-attribute ¤

register_type: Literal["holding", "input", "coil", "discrete"] = (
    "holding"
)

data_type class-attribute instance-attribute ¤

data_type: DataType = 'uint16'

byte_swap class-attribute instance-attribute ¤

byte_swap: bool = False

word_swap class-attribute instance-attribute ¤

word_swap: bool = False

long_swap class-attribute instance-attribute ¤

long_swap: bool = False

scale class-attribute instance-attribute ¤

scale: ScaleType | None = None

bitmap class-attribute instance-attribute ¤

bitmap: list[BitDef] | None = None

poll class-attribute instance-attribute ¤

poll: bool = True

write_min class-attribute instance-attribute ¤

write_min: float | int | None = None

write_max class-attribute instance-attribute ¤

write_max: float | int | None = None

write_value_map class-attribute instance-attribute ¤

write_value_map: dict[str, int | float] | None = None

read_group class-attribute instance-attribute ¤

read_group: str | None = None

register_count property ¤

register_count: int

Number of 16-bit registers this data type spans (uint16→1, uint32→2, uint64→4).

BitDef ¤

Bases: BaseModel

Definition of a single bit to extract from a uint16 register.

name class-attribute instance-attribute ¤

name: str = Field(description='Channel name for this bit')

bit_index class-attribute instance-attribute ¤

bit_index: int = Field(
    ge=0, le=15, description="0-based bit position from LSB"
)

Shared Types¤

Shared types: runtime dataclasses (Measurement/Command) and cross-protocol Pydantic configs.

DeviceInfo ¤

Bases: BaseModel

Device metadata. name is the channel-name prefix on publish (e.g. my_device.temperature).

name instance-attribute ¤

name: str

description class-attribute instance-attribute ¤

description: str = ''

manufacturer class-attribute instance-attribute ¤

manufacturer: str = ''

model class-attribute instance-attribute ¤

model: str = ''

LinearScale ¤

Bases: BaseModel

Linear scaling: physical = offset + (gain * raw).

Applied automatically on reads (raw -> physical) and reversed on writes (physical -> raw). Not all protocols or data point types support scaling -- check the protocol-specific documentation.

type class-attribute instance-attribute ¤

type: Literal['linear'] = 'linear'

gain class-attribute instance-attribute ¤

gain: float = Field(
    default=1.0, description="Scale factor (must not be zero)"
)

offset class-attribute instance-attribute ¤

offset: float = 0.0

model_post_init ¤

model_post_init(__context) -> None

Validate that gain is not zero.

to_physical ¤

to_physical(raw: float) -> float

Convert raw register value to physical units.

to_raw ¤

to_raw(physical: float) -> float

Convert physical value to raw register value.

Returns float to preserve precision for float registers. Integer registers should handle conversion in _encode_value.