Skip to content

I2C¤

Interface¤

Bases: Instrument

I2C interface supporting register-based and command-based devices.

Commands are enum values OR'd together and sent as a single byte stream. The SystemDefinition is read-only — I2CInterface resolves names to transactions on every call; the vendor driver never sees it.

Parameters:

  • name ¤

    (str) –

    Channel-name prefix for published data.

  • driver ¤

    (I2CDriverBase) –

    Concrete I2C-adapter driver; owns its own transport::

    i2c = I2CInterface( "main", driver=Aardvark(serial_number="2239-764425"), system_definition=system, )

  • system_definition ¤

    (SystemDefinition) –

    Bus description (devices, register maps, comms params).

  • publishers ¤

    (Optional[list[Publisher]], default: None ) –

    Publishers that receive emitted Measurement/Command data.

  • **kwargs ¤

    Default tags applied to every emitted Measurement/Command. Pass dataset_rid="<rid>" to auto-create a NominalCorePublisher (uses the on-disk 'default' Nominal credential).

name instance-attribute ¤

name = name

legacy_naming instance-attribute ¤

legacy_naming = legacy_naming

publishers instance-attribute ¤

publishers = publishers or []

default_tags instance-attribute ¤

default_tags: dict[str, str] = {}

background_interval property writable ¤

background_interval

Seconds between background daemon iterations (0 = no wait).

background_enable property writable ¤

background_enable

Whether the background daemon is enabled (must still be start()-ed).

add_publisher ¤

add_publisher(publisher: Publisher)

Register a publisher to receive this instrument's Measurement/Command data.

publish ¤

publish(data: Measurement | Command, **kwargs)

Fan data out to every configured publisher; kwargs pass through.

add_background_daemon_function ¤

add_background_daemon_function(method: Callable, *args, **kwargs)

Append method to the daemon's call list. Use define_background_daemon to replace instead.

start ¤

start()

Start the background daemon thread. No-op if already running.

stop ¤

stop()

Signal the background daemon to stop and join it. No-op if not running.

get_channel ¤

get_channel(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Return the most recent length samples for channel_name from the in-memory buffer.

Parameters:

  • channel_name ¤

    (str) –

    Name of the channel to retrieve.

  • length ¤

    (int, default: 1 ) –

    Number of trailing samples to return.

  • wait_for_latest ¤

    (bool, default: False ) –

    Block until at least length new values arrive.

  • timeout ¤

    (float, default: 10.0 ) –

    Seconds to wait when wait_for_latest=True.

Raises:

  • RuntimeError

    No background buffer; start() was not called.

  • ChannelNotFoundTimeoutError

    wait_for_latest=True and channel did not appear within timeout.

  • ChannelValueTimeoutError

    wait_for_latest=True and values did not arrive within timeout.

define_background_daemon ¤

define_background_daemon(method: Callable, *args, **kwargs)

Replace all daemon functions with a single method (called with the given args).

write_read_raw ¤

write_read_raw(
    address: int,
    payload: bytes,
    length: int,
    endianness: Literal["little", "big"],
) -> int

Write-then-read without an intermediate I2C STOP (uses repeated START); decode as integer.

Typical use: write a register address, then read its contents.

Parameters:

  • address ¤

    (int) –

    7-bit I2C device address.

  • payload ¤

    (bytes) –

    Bytes to write (typically the register address).

  • length ¤

    (int) –

    Number of bytes to read back.

  • endianness ¤

    (Literal['little', 'big']) –

    Byte order for the integer decode.

write_raw ¤

write_raw(address: int, data: bytes)

Write data to the 7-bit I2C address.

read_raw ¤

read_raw(
    address: int, length: int, endianness: Literal["little", "big"]
) -> int

Read length bytes from the 7-bit I2C address and decode as an integer.

write_then_read_raw ¤

write_then_read_raw(
    address: int,
    payload: bytes,
    length: int,
    endianness: Literal["little", "big"],
) -> int

Write-then-read with an I2C STOP between operations (required by devices that need STOP to process input).

read ¤

read(
    peripheral: str, register_alias: str, field: str = "", **kwargs
) -> Measurement

Read a register (or one of its named fields) on peripheral.

With field, only the named bit field is extracted (masked and shifted). Raises ValueError if peripheral is not a register-based device.

write ¤

write(
    peripheral: str,
    register_alias: str,
    value: int,
    field: str = "",
    **kwargs,
) -> Command

Write a register (or one of its named fields) on peripheral.

With field, performs a thread-safe read-modify-write of just that field (mask, insert, write). Raises ValueError if peripheral is not a register-based device.

reset_reg ¤

reset_reg(peripheral: str, register_alias: str, **kwargs)

Write a register's defined default value back to hardware (initialization/recovery).

query ¤

query(peripheral: str, batch_command: str, **kwargs) -> Measurement

Send a batch command (OR of its enum members) to peripheral and read the response.

Uses write-then-read with an I2C STOP between operations. Raises ValueError if peripheral is not a command-based device, or if batch_command references undefined command enums.

open ¤

open()

Open the underlying I2C-adapter driver.

close ¤

close()

Close the underlying driver and stop the daemon.

Types & Configuration¤

I2C system definition: scaling, data formats, fields, registers, devices.

Device module-attribute ¤

ScalingFunction ¤

Bases: ABC

Convert between raw register integers and physical units (forward and inverse).

to_physical abstractmethod ¤

to_physical(raw: int) -> float

Raw register value → physical units.

to_raw abstractmethod ¤

to_raw(physical: float) -> int

Physical value → raw integer. May raise NotImplementedError if inverse is not supported.

LinearScaling ¤

LinearScaling(gain: float = 1.0, offset: float = 0.0)

Bases: ScalingFunction

Linear scaling: physical = offset + gain * raw.

Example

Temperature sensor: 0.1°C per count, -40°C offset¤

LinearScaling(gain=0.1, offset=-40.0).to_physical(500) 10.0

gain instance-attribute ¤

gain = gain

offset instance-attribute ¤

offset = offset

to_physical ¤

to_physical(raw: int) -> float

to_raw ¤

to_raw(physical: float) -> int

CustomScaling ¤

CustomScaling(
    to_physical_fn: Callable[[int], float],
    to_raw_fn: Callable[[float], int] | None = None,
)

Bases: ScalingFunction

User-defined scaling. to_raw_fn is optional; to_raw raises NotImplementedError without it.

Example

CustomScaling( ... to_physical_fn=lambda x: x / 4095 * 5.0 * 7.2, ... to_raw_fn=lambda v: int(v / 7.2 / 5.0 * 4095), ... )

to_physical ¤

to_physical(raw: int) -> float

to_raw ¤

to_raw(physical: float) -> int

DataFormat dataclass ¤

DataFormat(
    transfer_bits: int,
    data_width_bits: Optional[int] = None,
    data_lsb: int = 0,
    signed: bool = False,
    scaling: Optional[ScalingFunction] = None,
    units: str = "",
)

How to extract logical data from an I2C transfer and convert to physical units.

Attributes:

transfer_bits instance-attribute ¤

transfer_bits: int

data_width_bits class-attribute instance-attribute ¤

data_width_bits: Optional[int] = None

data_lsb class-attribute instance-attribute ¤

data_lsb: int = 0

signed class-attribute instance-attribute ¤

signed: bool = False

scaling class-attribute instance-attribute ¤

scaling: Optional[ScalingFunction] = None

units class-attribute instance-attribute ¤

units: str = ''

data_width_bytes property ¤

data_width_bytes: int

Return number of bytes for transfer.

extract_data ¤

extract_data(transfer_raw: int) -> int

Slice the logical data field out of transfer_raw (sign-extended when signed).

pack_data ¤

pack_data(data_raw: int) -> int

Pack data_raw into the transfer container at data_lsb; validates range vs signed.

float_from_raw ¤

float_from_raw(transfer_raw: int) -> float

Extract logical data and apply scaling; falls back to raw float if no scaling is set.

raw_from_float ¤

raw_from_float(physical: float) -> int

Pack physical into a raw transfer value. Raises ValueError if out of range.

FieldDef dataclass ¤

FieldDef(name: str, lsb: int, width_bits: int = 1)

A contiguous bit field within a register (e.g. FieldDef("mode", lsb=3, width_bits=2) for bits [4:3]).

name instance-attribute ¤

name: str

lsb instance-attribute ¤

lsb: int

width_bits class-attribute instance-attribute ¤

width_bits: int = 1

mask ¤

mask() -> int

Bitmask covering this field's bits (e.g. lsb=3, width=20b00011000).

RegisterDef dataclass ¤

RegisterDef(
    alias: str,
    register: int,
    default_value: int = 0,
    format: DataFormat = field(
        default_factory=lambda: DataFormat(transfer_bits=8)
    ),
    endianness: Literal["little", "big"] = "big",
    fields: dict[str, FieldDef] = field(default_factory=dict),
)

I2C register: address, data format, power-on default, byte order, and named bit fields.

alias instance-attribute ¤

alias: str

register instance-attribute ¤

register: int

default_value class-attribute instance-attribute ¤

default_value: int = 0

format class-attribute instance-attribute ¤

format: DataFormat = field(
    default_factory=lambda: DataFormat(transfer_bits=8)
)

endianness class-attribute instance-attribute ¤

endianness: Literal['little', 'big'] = 'big'

fields class-attribute instance-attribute ¤

fields: dict[str, FieldDef] = field(default_factory=dict)

data_width_bytes property ¤

data_width_bytes: int

field ¤

field(field_name: str) -> FieldDef

extract_data ¤

extract_data(transfer_raw: int) -> int

pack_data ¤

pack_data(data_raw: int) -> int

float_from_raw ¤

float_from_raw(transfer_raw: int) -> float

raw_from_float ¤

raw_from_float(physical: float) -> int

CommandDef dataclass ¤

CommandDef(name: str, values: type[Enum])

A named command enum whose members are OR'd together when sent.

name instance-attribute ¤

name: str

values instance-attribute ¤

values: type[Enum]

RegisterDevice dataclass ¤

RegisterDevice(
    name: str,
    address: int,
    addr_width_bytes: int = 1,
    registers: dict[str, RegisterDef] = dict(),
)

Register-based I2C device (e.g. a GPIO expander). Read/write by register alias.

name instance-attribute ¤

name: str

address instance-attribute ¤

address: int

addr_width_bytes class-attribute instance-attribute ¤

addr_width_bytes: int = 1

registers class-attribute instance-attribute ¤

registers: dict[str, RegisterDef] = field(default_factory=dict)

register ¤

register(register_alias: str) -> RegisterDef

CommandDevice dataclass ¤

CommandDevice(
    name: str,
    address: int,
    data_format: DataFormat,
    valid_commands: dict[str, CommandDef] = dict(),
    batch_commands: dict[str, list[Enum]] = dict(),
    endianness: Literal["little", "big"] = "big",
)

Command-based I2C device (e.g. an ADC). Sends a command byte; no register addressing.

name instance-attribute ¤

name: str

address instance-attribute ¤

address: int

data_format instance-attribute ¤

data_format: DataFormat

valid_commands class-attribute instance-attribute ¤

valid_commands: dict[str, CommandDef] = field(default_factory=dict)

batch_commands class-attribute instance-attribute ¤

batch_commands: dict[str, list[Enum]] = field(default_factory=dict)

endianness class-attribute instance-attribute ¤

endianness: Literal['little', 'big'] = 'big'

data_width_bytes property ¤

data_width_bytes: int

command ¤

command(command_alias: str) -> CommandDef

SystemDefinition dataclass ¤

SystemDefinition(devices: dict[str, Device] = dict())

Top-level container of all I2C device definitions, passed to I2CInterface.

system = SystemDefinition() system.add_device(RegisterDevice(name="power_gpio", address=0x20)) system.add_device(CommandDevice(name="adc", address=0x09, data_format=DataFormat(...))) i2c = I2CInterface( ... name="main_i2c", ... driver=Aardvark(serial_number="123456"), ... system_definition=system, ... ) i2c.write("power_gpio", "LED_OUTPUT_STATE", 0xFF)

devices class-attribute instance-attribute ¤

devices: dict[str, Device] = field(default_factory=dict)

add_device ¤

add_device(dev: Device) -> None

device ¤

device(periph: str) -> Device

Driver Interface¤

Bases: ABC

Vendor I2C-adapter driver contract. Concrete drivers own their transport and lifecycle.

address arguments are 7-bit device addresses (the adapter SDK handles the R/W bit). Drivers translate this contract to the vendor SDK's master primitives.

open abstractmethod ¤

open() -> None

Open the adapter (typically a USB host adapter) and prepare it for transactions.

close abstractmethod ¤

close() -> None

Close the adapter handle. Idempotent.

read abstractmethod ¤

read(address: int, length: int) -> bytes

Read length bytes from the 7-bit address (master read transaction).

write abstractmethod ¤

write(address: int, data: bytes) -> None

Write data to the 7-bit address (master write transaction).

write_read abstractmethod ¤

write_read(address: int, data: bytes, read_len: int) -> bytes

Write data then immediately read read_len bytes from address using a repeated START.

Used for register reads: write the register address, then read its contents without releasing the bus. No STOP is issued between the write and the read.

set_bitrate abstractmethod ¤

set_bitrate(bitrate: int) -> None

Set the master clock rate in kHz. Drivers may snap to the nearest supported rate.

set_pullups abstractmethod ¤

set_pullups(enable: bool) -> None

Enable or disable the adapter's internal bus pull-ups (typical: ~10 kΩ to VCC).

set_power_enable abstractmethod ¤

set_power_enable(enable: bool) -> None

Enable or disable target-power output on the adapter (where supported).