I2C¤
Interface¤
Bases: Instrument
I2C interface supporting register-based and command-based devices.
Commands are enum values OR'd together and sent as a single byte stream.
The SystemDefinition is read-only — I2CInterface resolves names to
transactions on every call; the vendor driver never sees it.
Parameters:
-
(name¤str) –Channel-name prefix for published data.
-
(driver¤I2CDriverBase) –Concrete I2C-adapter driver; owns its own transport::
i2c = I2CInterface( "main", driver=Aardvark(serial_number="2239-764425"), system_definition=system, )
-
(system_definition¤SystemDefinition) –Bus description (devices, register maps, comms params).
-
(publishers¤Optional[list[Publisher]], default:None) –Publishers that receive emitted Measurement/Command data.
-
–**kwargs¤Default tags applied to every emitted Measurement/Command. Pass
dataset_rid="<rid>"to auto-create a NominalCorePublisher (uses the on-disk 'default' Nominal credential).
background_interval
property
writable
¤
background_interval
Seconds between background daemon iterations (0 = no wait).
background_enable
property
writable
¤
background_enable
Whether the background daemon is enabled (must still be start()-ed).
add_publisher
¤
add_publisher(publisher: Publisher)
Register a publisher to receive this instrument's Measurement/Command data.
publish
¤
publish(data: Measurement | Command, **kwargs)
Fan data out to every configured publisher; kwargs pass through.
add_background_daemon_function
¤
add_background_daemon_function(method: Callable, *args, **kwargs)
Append method to the daemon's call list. Use define_background_daemon to replace instead.
get_channel
¤
get_channel(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Return the most recent length samples for channel_name from the in-memory buffer.
Parameters:
-
(channel_name¤str) –Name of the channel to retrieve.
-
(length¤int, default:1) –Number of trailing samples to return.
-
(wait_for_latest¤bool, default:False) –Block until at least
lengthnew values arrive. -
(timeout¤float, default:10.0) –Seconds to wait when
wait_for_latest=True.
Raises:
-
RuntimeError–No background buffer;
start()was not called. -
ChannelNotFoundTimeoutError–wait_for_latest=Trueand channel did not appear withintimeout. -
ChannelValueTimeoutError–wait_for_latest=Trueand values did not arrive withintimeout.
define_background_daemon
¤
define_background_daemon(method: Callable, *args, **kwargs)
Replace all daemon functions with a single method (called with the given args).
write_read_raw
¤
write_read_raw(
address: int,
payload: bytes,
length: int,
endianness: Literal["little", "big"],
) -> int
Write-then-read without an intermediate I2C STOP (uses repeated START); decode as integer.
Typical use: write a register address, then read its contents.
Parameters:
read_raw
¤
Read length bytes from the 7-bit I2C address and decode as an integer.
write_then_read_raw
¤
write_then_read_raw(
address: int,
payload: bytes,
length: int,
endianness: Literal["little", "big"],
) -> int
Write-then-read with an I2C STOP between operations (required by devices that need STOP to process input).
read
¤
read(
peripheral: str, register_alias: str, field: str = "", **kwargs
) -> Measurement
Read a register (or one of its named fields) on peripheral.
With field, only the named bit field is extracted (masked and shifted).
Raises ValueError if peripheral is not a register-based device.
write
¤
Write a register (or one of its named fields) on peripheral.
With field, performs a thread-safe read-modify-write of just that
field (mask, insert, write). Raises ValueError if peripheral is
not a register-based device.
reset_reg
¤
Write a register's defined default value back to hardware (initialization/recovery).
query
¤
query(peripheral: str, batch_command: str, **kwargs) -> Measurement
Send a batch command (OR of its enum members) to peripheral and read the response.
Uses write-then-read with an I2C STOP between operations. Raises
ValueError if peripheral is not a command-based device, or if
batch_command references undefined command enums.
Types & Configuration¤
I2C system definition: scaling, data formats, fields, registers, devices.
ScalingFunction
¤
Bases: ABC
Convert between raw register integers and physical units (forward and inverse).
LinearScaling
¤
Bases: ScalingFunction
Linear scaling: physical = offset + gain * raw.
Example
Temperature sensor: 0.1°C per count, -40°C offset¤
LinearScaling(gain=0.1, offset=-40.0).to_physical(500) 10.0
CustomScaling
¤
CustomScaling(
to_physical_fn: Callable[[int], float],
to_raw_fn: Callable[[float], int] | None = None,
)
Bases: ScalingFunction
User-defined scaling. to_raw_fn is optional; to_raw raises NotImplementedError without it.
Example
CustomScaling( ... to_physical_fn=lambda x: x / 4095 * 5.0 * 7.2, ... to_raw_fn=lambda v: int(v / 7.2 / 5.0 * 4095), ... )
DataFormat
dataclass
¤
DataFormat(
transfer_bits: int,
data_width_bits: Optional[int] = None,
data_lsb: int = 0,
signed: bool = False,
scaling: Optional[ScalingFunction] = None,
units: str = "",
)
How to extract logical data from an I2C transfer and convert to physical units.
Attributes:
-
transfer_bits(int) –Total bits transferred over I2C.
-
data_width_bits(Optional[int]) –Logical data width if narrower than transfer_bits;
Noneuses all. -
data_lsb(int) –Starting bit position of the logical data.
-
signed(bool) –Whether the logical data is 2's-complement signed.
-
scaling(Optional[ScalingFunction]) –Optional scaling to convert raw → physical.
-
units(str) –Display string for the physical units.
FieldDef
dataclass
¤
A contiguous bit field within a register (e.g. FieldDef("mode", lsb=3, width_bits=2) for bits [4:3]).
RegisterDef
dataclass
¤
RegisterDef(
alias: str,
register: int,
default_value: int = 0,
format: DataFormat = field(
default_factory=lambda: DataFormat(transfer_bits=8)
),
endianness: Literal["little", "big"] = "big",
fields: dict[str, FieldDef] = field(default_factory=dict),
)
I2C register: address, data format, power-on default, byte order, and named bit fields.
format
class-attribute
instance-attribute
¤
format: DataFormat = field(
default_factory=lambda: DataFormat(transfer_bits=8)
)
fields
class-attribute
instance-attribute
¤
CommandDef
dataclass
¤
RegisterDevice
dataclass
¤
RegisterDevice(
name: str,
address: int,
addr_width_bytes: int = 1,
registers: dict[str, RegisterDef] = dict(),
)
CommandDevice
dataclass
¤
CommandDevice(
name: str,
address: int,
data_format: DataFormat,
valid_commands: dict[str, CommandDef] = dict(),
batch_commands: dict[str, list[Enum]] = dict(),
endianness: Literal["little", "big"] = "big",
)
SystemDefinition
dataclass
¤
Top-level container of all I2C device definitions, passed to I2CInterface.
system = SystemDefinition() system.add_device(RegisterDevice(name="power_gpio", address=0x20)) system.add_device(CommandDevice(name="adc", address=0x09, data_format=DataFormat(...))) i2c = I2CInterface( ... name="main_i2c", ... driver=Aardvark(serial_number="123456"), ... system_definition=system, ... ) i2c.write("power_gpio", "LED_OUTPUT_STATE", 0xFF)
Driver Interface¤
Bases: ABC
Vendor I2C-adapter driver contract. Concrete drivers own their transport and lifecycle.
address arguments are 7-bit device addresses (the adapter SDK handles
the R/W bit). Drivers translate this contract to the vendor SDK's master
primitives.
open
abstractmethod
¤
open() -> None
Open the adapter (typically a USB host adapter) and prepare it for transactions.
read
abstractmethod
¤
Read length bytes from the 7-bit address (master read transaction).
write
abstractmethod
¤
Write data to the 7-bit address (master write transaction).
write_read
abstractmethod
¤
Write data then immediately read read_len bytes from address using a repeated START.
Used for register reads: write the register address, then read its contents without releasing the bus. No STOP is issued between the write and the read.
set_bitrate
abstractmethod
¤
set_bitrate(bitrate: int) -> None
Set the master clock rate in kHz. Drivers may snap to the nearest supported rate.