Skip to content

Modbus¤

InstroModbus is a config-driven Modbus client. Describe your device in a JSON file — registers, addresses, data types — and interact using human-readable aliases instead of raw addresses.

from instro.protocols.modbus import InstroModbus

connection = {"transport": "tcp", "host": "192.168.1.10", "port": 502}
device = InstroModbus("my_device.json", connection=connection, autostart=True)
device.read("temperature")
device.write("setpoint", 75.5)
device.write("mode", "auto")  # string values via write_value_map
device.close()

Or build the config in code with full IDE autocomplete:

from instro.protocols.modbus import InstroModbus, ModbusConfig, TCPConnection
from instro.protocols.modbus.modbus_types import (
    DeviceInfo, RegisterDef, TimingConfig,
)

config = ModbusConfig(
    device=DeviceInfo(name="my_device"),
    timing=TimingConfig(poll_interval=1.0, write_delay_ms=300),
    registers=[
        RegisterDef(name="temperature", starting_address=0, data_type="float32",
                    write_min=-40.0, write_max=500.0),
        RegisterDef(name="mode", starting_address=100, data_type="uint16",
                    write_value_map={"off": 0, "auto": 1, "manual": 2}),
    ],
)
connection = TCPConnection(host="192.168.1.10")
device = InstroModbus(config, connection=connection, autostart=True)

Sample Config¤

A complete config for a heat exchanger with temperature sensors, flow rate, a setpoint, a pump coil, and a status register with bitmap extraction:

{
  "version": 1,
  "protocol": "modbus",
  "device": {
    "name": "heat_exchanger",
    "description": "Heat exchanger monitoring and control",
    "manufacturer": "Acme Thermal",
    "model": "HX-200"
  },
  "timing": {
    "poll_interval": 1.0,
    "write_delay_ms": 100
  },
  "registers": [
    {
      "name": "inlet_temp",
      "starting_address": 0,
      "register_type": "input",
      "data_type": "float32",
      "word_swap": true,
      "read_group": "temperatures"
    },
    {
      "name": "outlet_temp",
      "starting_address": 2,
      "register_type": "input",
      "data_type": "float32",
      "word_swap": true,
      "read_group": "temperatures"
    },
    {
      "name": "flow_rate",
      "starting_address": 4,
      "register_type": "input",
      "data_type": "uint16",
      "scale": {
        "type": "linear",
        "gain": 0.1,
        "offset": 0
      }
    },
    {
      "name": "pressure_psi",
      "starting_address": 5,
      "register_type": "input",
      "data_type": "uint16",
      "scale": {
        "type": "linear",
        "gain": 0.01,
        "offset": 0
      }
    },
    {
      "name": "setpoint",
      "starting_address": 100,
      "register_type": "holding",
      "data_type": "float32",
      "word_swap": true,
      "write_min": 50.0,
      "write_max": 250.0
    },
    {
      "name": "operating_mode",
      "starting_address": 102,
      "register_type": "holding",
      "data_type": "uint16",
      "write_value_map": {
        "off": 0,
        "standby": 1,
        "run": 2,
        "flush": 3
      },
      "write_min": 0,
      "write_max": 3
    },
    {
      "name": "pump_enable",
      "starting_address": 0,
      "register_type": "coil"
    },
    {
      "name": "status_register",
      "starting_address": 200,
      "register_type": "input",
      "data_type": "uint16",
      "bitmap": [
        {"name": "pump_running", "bit_index": 0},
        {"name": "alarm_high_temp", "bit_index": 1},
        {"name": "alarm_low_flow", "bit_index": 2},
        {"name": "fault", "bit_index": 15}
      ]
    }
  ]
}

JSON Config Reference¤

Connection¤

Connection can be provided in the config or passed to the InstroModbus constructor. The constructor parameter takes precedence, allowing the config to be a standalone device description shared across environments.

device = InstroModbus(
    "my_device.json",
    connection={"transport": "tcp", "host": "192.168.1.10", "port": 502},
)
{
    "connection": {
        "transport": "tcp",
        "host": "192.168.1.10",
        "port": 502,
        "unit_id": 1,
        "timeout": 3.0
    }
}
{
    "connection": {
        "transport": "rtu",
        "port": "/dev/ttyUSB0",
        "baudrate": 9600,
        "parity": "N",
        "stopbits": 1,
        "bytesize": 8,
        "unit_id": 1,
        "timeout": 3.0
    }
}

Serial port paths vary by platform:

  • Linux: /dev/ttyUSB0, /dev/ttyACM0
  • macOS: /dev/cu.usbserial-1234, /dev/cu.usbmodem1234
  • Windows: COM3, COM4

Timing¤

Controls background polling interval and write delay:

{
    "timing": {
        "poll_interval": 1.0,
        "write_delay_ms": 300
    }
}
Field Type Default Description
poll_interval float required Seconds between polling cycles (0.01–10.0)
write_delay_ms int 0 Milliseconds to sleep after each write

Activate polling with autostart=True in the constructor, or call open() then start() manually. The write delay is applied automatically after every write() call — no manual time.sleep() needed.

Registers¤

Each register entry defines a named channel:

Field Type Default Description
name string required Alias used in read() and write()
starting_address int required Modbus register address (0–65535)
register_type string "holding" "holding", "input", "coil", or "discrete"
data_type string "uint16" "uint16", "int16", "uint32", "int32", "uint64", "int64", "float32", "float64", "bool"
byte_swap bool false Swap bytes within 16-bit words
word_swap bool false Swap 16-bit words (32-bit and 64-bit types)
long_swap bool false Swap 32-bit halves (64-bit types only)
scale object null Linear scaling config, e.g. {"type": "linear", "gain": <float>, "offset": <float>} — see Scaling
bitmap list null Bit extraction: [{"name": "alarm", "bit_index": 0}]
poll bool true Include in background polling
write_min number null Minimum allowed write value, in the same units the caller passes to write() (scaled if scale is set, raw otherwise). Holding registers only.
write_max number null Maximum allowed write value, in the same units the caller passes to write() (scaled if scale is set, raw otherwise). Holding registers only.
write_value_map object null Map string labels to register values (holding registers only)
read_group string null Group ID for batched reads (all registers in a group are read in one transaction)

Scaling¤

Linear scaling converts between raw register values and physical units:

physical = offset + (gain * raw)
{
    "name": "pressure_psi",
    "starting_address": 10,
    "data_type": "uint16",
    "scale": {"type": "linear", "gain": 0.01, "offset": 0}
}

Write Value Map¤

Map human-readable strings to raw register values. Eliminates magic numbers in application code:

{
    "name": "control_mode",
    "starting_address": 100,
    "data_type": "uint16",
    "write_value_map": {
        "off": 0,
        "auto": 1,
        "manual": 2
    }
}
device.write("control_mode", "auto")   # writes 1
device.write("control_mode", 1)        # also works

Values in the map must be unique and must fall within write_min/write_max if those are set.

Write Limits¤

Reject writes outside a safe range before they reach the device:

{
    "name": "setpoint",
    "starting_address": 0,
    "data_type": "float32",
    "write_min": 32.0,
    "write_max": 300.0
}
device.write("setpoint", 150.0)   # ok
device.write("setpoint", 999.0)   # raises ValueError

Limits are checked in physical units (before scaling).

Read Groups¤

Registers with the same read_group are read in a single Modbus transaction, reducing the number of round trips per polling cycle:

{
    "name": "heat_power",
    "starting_address": 100,
    "data_type": "float32",
    "read_group": "power"
},
{
    "name": "cool_power",
    "starting_address": 102,
    "data_type": "float32",
    "read_group": "power"
}

Constraints:

  • All registers in a group must share the same register_type
  • All registers in a group must have poll: true
  • Holding/input groups cannot span more than 125 registers
  • Coil/discrete groups cannot span more than 2000 addresses

Bitmap¤

Extract individual bits from a uint16 holding or input register as named channels:

{
    "name": "status_register",
    "starting_address": 100,
    "data_type": "uint16",
    "bitmap": [
        {"name": "alarm_high", "bit_index": 0},
        {"name": "alarm_low", "bit_index": 1},
        {"name": "motor_running", "bit_index": 5}
    ]
}

Reading status_register returns the raw value plus each bit as a separate channel (0 or 1).

API Reference¤

InstroModbus¤

Bases: Instrument

Modbus client with config-driven register access and publishing.

Provides semantic access to Modbus devices by register alias from config.

Parameters:

  • config ¤

    (ModbusConfig | dict | Path | str) –

    A ModbusConfig instance, a dict (validated via Pydantic), or a path to a JSON configuration file.

  • connection ¤

    (TCPConnection | RTUConnection | dict | None, default: None ) –

    Connection configuration. Takes precedence over any connection in the config. Accepts a TCPConnection, RTUConnection, or a dict. Required if the config does not include a connection section.

  • name ¤

    (str | None, default: None ) –

    Instrument name used as the channel prefix for all data and commands. If omitted, config.device.name is used.

  • publishers ¤

    (list[Publisher] | None, default: None ) –

    Optional list of publishers for data output.

  • autostart ¤

    (bool, default: False ) –

    If True, automatically open the connection and start background polling. Requires a timing section in the config (with poll_interval); passing autostart=True without it is an error, since there is nothing to auto-start. For manual / one-shot usage, leave this False and call open() yourself.

  • **kwargs ¤

    Additional tags passed to Instrument.

Raises:

  • ValueError

    If no connection is provided either as a parameter or in the config, or if autostart=True without a timing section.

background_interval instance-attribute ¤

background_interval = poll_interval

unit_id property ¤

unit_id: int

Get the unit/slave ID from configuration.

name instance-attribute ¤

name = name

data_handler instance-attribute ¤

data_handler = (
    None
    if connection_config is None
    else DataHandler(select(connection_config))
)

publishers instance-attribute ¤

publishers = publishers or []

default_tags instance-attribute ¤

default_tags: dict[str, str] = {}

background_enable property writable ¤

background_enable

Get the background worker enable state.

Returns:

  • bool

    True if the background worker is enabled, False otherwise.

open ¤

open() -> None

Open connection to the Modbus device.

close ¤

close() -> None

Close connection and clean up resources.

read ¤

read(alias: str, **kwargs) -> Measurement

Read a register by alias and publish the result.

Parameters:

  • alias ¤

    (str) –

    Register alias as defined in the configuration.

  • **kwargs ¤

    Additional tags for the measurement.

Returns:

  • Measurement

    Measurement containing the scaled value.

write ¤

write(alias: str, value: float | int | bool | str, **kwargs) -> Command

Write a value to a register by alias and publish the command.

Parameters:

  • alias ¤

    (str) –

    Register alias as defined in the configuration.

  • value ¤

    (float | int | bool | str) –

    Value to write (in physical units if scaling is defined). For coil registers, pass True/False directly. For registers with a write_value_map, pass a string key to write the corresponding mapped value.

  • **kwargs ¤

    Additional tags for the command.

Returns:

  • Command

    Command recording the write operation.

Raises:

  • TypeError

    If value type doesn't match the register's data type.

  • KeyError

    If a string value is not found in the register's write_value_map.

  • ValueError

    If the register is read-only, the value violates write_min/write_max, or the value or scaled raw is out of range for the data type.

add_publisher ¤

add_publisher(publisher: Publisher)

Add a publisher to the list of publishers for this instrument.

Publishers receive all Measurement and Command objects that are published by this instrument. Multiple publishers can be added to send data to different destinations.

Parameters:

  • publisher ¤

    (Publisher) –

    The publisher instance to add.

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish a Measurement or Command to all configured publishers.

This method sends the data to all publishers that have been added to this instrument. Additional keyword arguments are passed through to each publisher's publish method.

Parameters:

  • data ¤

    (Measurement | Command) –

    The measurement or command data to publish.

  • **kwargs ¤

    Optional keyword arguments passed to each publisher's publish method.

get_identity ¤

get_identity() -> str

Query the instrument's identity string.

This method sends the SCPI "*IDN?" command to retrieve the instrument's identification information, which typically includes manufacturer, model number, serial number, and firmware version.

Returns:

  • str ( str ) –

    The instrument's identity string.

Raises:

  • RuntimeError

    If no connection is configured for this instrument (data_handler is None).

open_and_get_identity staticmethod ¤

open_and_get_identity(connection_config: ConnectConfig) -> str

Convenience method to open a connection, query identity, and close.

This static method creates a temporary instrument instance, opens a connection, queries the identity, closes the connection, and returns the identity string. Useful for instrument discovery without maintaining a long-lived connection.

Parameters:

  • connection_config ¤

    (ConnectConfig) –

    The connection configuration for the instrument.

Returns:

  • str ( str ) –

    The instrument's identity string.

Raises:

  • RuntimeError

    If the connection cannot be established or the identity cannot be queried.

send_arbitrary_command ¤

send_arbitrary_command(command: str)

Send an arbitrary command string to the instrument.

This method allows sending custom commands directly to the instrument without going through the instrument's high-level API. Useful for accessing instrument-specific features not exposed by the standard interface.

Parameters:

  • command ¤

    (str) –

    The command string to send to the instrument.

Raises:

  • RuntimeError

    If no connection is configured for this instrument (data_handler is None).

query_arbitrary_command ¤

query_arbitrary_command(command: str) -> str

Send an arbitrary command string and return the instrument's response.

This method sends a command and waits for a response, useful for querying instrument state or configuration. The response is returned as a string.

Parameters:

  • command ¤

    (str) –

    The query command string to send to the instrument.

Returns:

  • str ( str ) –

    The instrument's response to the query command.

Raises:

  • RuntimeError

    If no connection is configured for this instrument (data_handler is None).

add_background_daemon_function ¤

add_background_daemon_function(method: Callable, *args, **kwargs)

Adds a function (and its arguments) to be periodically called by the background worker daemon.

There may already be functions defined and this will add to the end of that list. Call define_background_daemon to clear and define a new function if you do not want the default functions to run.

Parameters:

  • method ¤

    (Callable) –

    The method/function to be invoked in the background thread.

  • *args ¤

    Positional arguments to pass to the method.

  • **kwargs ¤

    Keyword arguments to pass to the method.

start ¤

start()

Start the background worker thread.

This method creates and starts a daemon thread that periodically executes all registered background daemon functions. If the thread is already running, this method does nothing. The thread will continue running until stop() is called.

stop ¤

stop()

Stop the background worker thread.

This method signals the background worker thread to stop and waits for it to complete. If the thread is not running, this method does nothing.

get_channel ¤

get_channel(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Get most recent Measurement data for a specific channel from the buffer.

Parameters:

  • channel_name ¤

    (str) –

    The name of the channel from which to retrieve data.

  • length ¤

    (int, default: 1 ) –

    The number of most recent samples to return.

  • wait_for_latest ¤

    (bool, default: False ) –

    Block and wait for the next channel value(s).

  • timeout ¤

    (float, default: 10.0 ) –

    Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. Only applies when wait_for_latest=True.

Returns:

  • Measurement ( Measurement ) –

    A Measurement object containing the requested channel data and timestamps.

Raises:

  • RuntimeError

    No background buffer exists because start() was not called.

  • ChannelNotFoundTimeoutError

    If wait_for_latest=True and the channel does not appear within timeout.

  • ChannelValueTimeoutError

    If wait_for_latest=True and sufficient values are not available within timeout.

define_background_daemon ¤

define_background_daemon(method: Callable, *args, **kwargs)

Clear all background daemon functions and register a single custom method.

This method clears any previously registered background daemon functions and registers a new method to be called in the background daemon loop. This is useful when you want to replace the default background functions with a custom implementation.

Parameters:

  • method ¤

    (Callable) –

    The method to call as part of the background daemon.

  • *args ¤

    Positional arguments to pass to the method when it is called.

  • **kwargs ¤

    Keyword arguments to pass to the method when it is called.

Configuration Types¤

Modbus protocol configuration types.

Pydantic models for JSON-based Modbus device configuration.

Shared types (DeviceInfo, LinearScale, ScaleType) are imported from instro.protocols.common_types. See that module for details.

Public API

ModbusConfig, TimingConfig, TCPConnection, RTUConnection, RegisterDef, LinearScale

ModbusConfig ¤

Bases: BaseModel

Complete Modbus device configuration.

Load from JSON using ModbusConfig.from_json(path).

version class-attribute instance-attribute ¤

version: int = 1

protocol class-attribute instance-attribute ¤

protocol: str = 'modbus'

device instance-attribute ¤

device: DeviceInfo

timing class-attribute instance-attribute ¤

timing: TimingConfig | None = None

connection class-attribute instance-attribute ¤

connection: ConnectionType | None = Field(
    default=None, discriminator="transport"
)

registers class-attribute instance-attribute ¤

registers: list[RegisterDef] = Field(default_factory=list)

model_post_init ¤

model_post_init(__context) -> None

Validate registers.

get_group ¤

get_group(group_id: str) -> list[RegisterDef]

Get all registers in a group, sorted by starting address.

from_json classmethod ¤

from_json(path: Path | str) -> ModbusConfig

Load configuration from a JSON file.

Parameters:

  • path ¤
    (Path | str) –

    Path to the JSON configuration file.

Returns:

Raises:

  • FileNotFoundError

    If the file doesn't exist.

  • ValidationError

    If the JSON doesn't match the schema.

get_register ¤

get_register(name: str) -> RegisterDef

Get a register definition by name.

Parameters:

  • name ¤
    (str) –

    Register name as defined in the config.

Returns:

  • RegisterDef

    RegisterDef for the specified register.

Raises:

  • KeyError

    If the register name doesn't exist.

TimingConfig ¤

Bases: BaseModel

Timing configuration for Modbus polling and write delays.

poll_interval class-attribute instance-attribute ¤

poll_interval: float = Field(
    ge=0.01, le=10.0, description="Polling interval in seconds"
)

write_delay_ms class-attribute instance-attribute ¤

write_delay_ms: int = Field(
    default=0,
    ge=0,
    description="Delay in milliseconds applied after every write",
)

TCPConnection ¤

Bases: BaseModel

Modbus TCP connection configuration.

transport class-attribute instance-attribute ¤

transport: Literal['tcp'] = 'tcp'

host instance-attribute ¤

host: str

port class-attribute instance-attribute ¤

port: int = Field(default=502, ge=1, le=65535)

unit_id class-attribute instance-attribute ¤

unit_id: int = Field(default=1, ge=0, le=255)

timeout class-attribute instance-attribute ¤

timeout: float = Field(
    default=3.0, gt=0, description="Response timeout in seconds"
)

RTUConnection ¤

Bases: BaseModel

Modbus RTU (serial) connection configuration.

transport class-attribute instance-attribute ¤

transport: Literal['rtu'] = 'rtu'

port instance-attribute ¤

port: str

baudrate class-attribute instance-attribute ¤

baudrate: int = 9600

parity class-attribute instance-attribute ¤

parity: Literal['N', 'E', 'O'] = 'N'

stopbits class-attribute instance-attribute ¤

stopbits: Literal[1, 2] = 1

bytesize class-attribute instance-attribute ¤

bytesize: Literal[5, 6, 7, 8] = 8

unit_id class-attribute instance-attribute ¤

unit_id: int = Field(default=1, ge=0, le=255)

timeout class-attribute instance-attribute ¤

timeout: float = Field(
    default=3.0, gt=0, description="Response timeout in seconds"
)

RegisterDef ¤

Bases: BaseModel

Definition of a Modbus register.

Note
  • Swap options control byte ordering for multi-byte values:
  • byte_swap: swap bytes within each 16-bit word
  • word_swap: swap 16-bit words (for 32-bit and 64-bit types)
  • long_swap: swap 32-bit longs (for 64-bit types only)
  • All swap options default to False (big-endian / network byte order).
  • scale is not allowed for coils and discrete inputs.

name class-attribute instance-attribute ¤

name: str = Field(description='Unique name/alias for this register')

description class-attribute instance-attribute ¤

description: str | None = None

starting_address class-attribute instance-attribute ¤

starting_address: int = Field(ge=0, le=65535)

register_type class-attribute instance-attribute ¤

register_type: Literal["holding", "input", "coil", "discrete"] = (
    "holding"
)

data_type class-attribute instance-attribute ¤

data_type: DataType = 'uint16'

byte_swap class-attribute instance-attribute ¤

byte_swap: bool = False

word_swap class-attribute instance-attribute ¤

word_swap: bool = False

long_swap class-attribute instance-attribute ¤

long_swap: bool = False

scale class-attribute instance-attribute ¤

scale: ScaleType | None = None

bitmap class-attribute instance-attribute ¤

bitmap: list[BitDef] | None = None

poll class-attribute instance-attribute ¤

poll: bool = True

write_min class-attribute instance-attribute ¤

write_min: float | int | None = None

write_max class-attribute instance-attribute ¤

write_max: float | int | None = None

write_value_map class-attribute instance-attribute ¤

write_value_map: dict[str, int | float] | None = None

read_group class-attribute instance-attribute ¤

read_group: str | None = None

register_count property ¤

register_count: int

Number of 16-bit registers this data type spans.

BitDef ¤

Bases: BaseModel

Definition of a single bit to extract from a uint16 register.

name class-attribute instance-attribute ¤

name: str = Field(description='Channel name for this bit')

bit_index class-attribute instance-attribute ¤

bit_index: int = Field(
    ge=0, le=15, description="0-based bit position from LSB"
)

Shared Types¤

Shared configuration types for all protocol implementations.

This module contains Pydantic models that are protocol-agnostic and reused across multiple protocol implementations (Modbus, EtherNet/IP, OPC-UA, etc.).

If you are adding a new protocol, import these types rather than redefining them. If you need to extend a type for protocol-specific behavior, subclass it in your protocol's own types module.

Shared types

DeviceInfo - Device metadata (name, manufacturer, model, etc.) LinearScale - Linear scaling: physical = offset + (gain * raw) ScaleType - Union of all supported scaling types

DeviceInfo ¤

Bases: BaseModel

Device metadata.

The name field is particularly important -- it is used as the channel name prefix when publishing data (e.g., "my_device.temperature").

name instance-attribute ¤

name: str

description class-attribute instance-attribute ¤

description: str = ''

manufacturer class-attribute instance-attribute ¤

manufacturer: str = ''

model class-attribute instance-attribute ¤

model: str = ''

LinearScale ¤

Bases: BaseModel

Linear scaling: physical = offset + (gain * raw).

Applied automatically on reads (raw -> physical) and reversed on writes (physical -> raw). Not all protocols or data point types support scaling -- check the protocol-specific documentation.

type class-attribute instance-attribute ¤

type: Literal['linear'] = 'linear'

gain class-attribute instance-attribute ¤

gain: float = Field(
    default=1.0, description="Scale factor (must not be zero)"
)

offset class-attribute instance-attribute ¤

offset: float = 0.0

model_post_init ¤

model_post_init(__context) -> None

Validate that gain is not zero.

to_physical ¤

to_physical(raw: float) -> float

Convert raw register value to physical units.

to_raw ¤

to_raw(physical: float) -> float

Convert physical value to raw register value.

Returns float to preserve precision for float registers. Integer registers should handle conversion in _encode_value.