I2C¤
Interface¤
I2C instrument interface and helpers.
Defines InstroI2C and related helpers.
Public API:
InstroI2C, InstroI2CFacade
InstroI2C
¤
InstroI2C(
name: str,
driver: I2CDriverBase,
system_definition: SystemDefinition,
publishers: Optional[list[Publisher]] = None,
**kwargs,
)
Bases: Instrument
I2C interface with support for both register-based and command-based operations.
This class provides methods for: - Reading and writing registers with field-level access - Sending commands to devices and reading responses
Commands are defined as enums that can be reused across multiple devices. Users simply pass the enum values they want to combine, and the system automatically ORs them together and sends the command.
Parameters:
-
(name¤str) –A name to identify this I2C instance. Used in channel naming and published data.
-
(driver¤I2CDriverBase) –The driver instance for the specific I2C adapter vendor.
-
(system_definition¤SystemDefinition) –Complete system definition containing device configurations, register maps, and communication parameters.
-
(publishers¤list[Publisher] | None, default:None) –List of publishers to send data to when executing methods. Defaults to None.
-
–**kwargs¤Optional keyword arguments used as tags throughout the life of the instrument. These tags are applied to the Measurement and Command objects and can be utilized by publishers like NominalCorePublisher as added metadata.
Special keyword arguments: dataset_rid (str): If provided, automatically creates and adds a NominalCorePublisher with the specified dataset RID. Assumes a Nominal 'default' credential is stored on disk.
data_handler
instance-attribute
¤
data_handler = (
None
if connection_config is None
else DataHandler(select(connection_config))
)
background_interval
property
writable
¤
background_interval
Get the background worker interval setting.
Returns:
-
float–The current interval in seconds between background worker iterations. The worker waits this duration between executing background daemon functions.
background_enable
property
writable
¤
background_enable
Get the background worker enable state.
Returns:
-
bool–True if the background worker is enabled, False otherwise.
create
classmethod
¤
create(
name: str,
vendor: I2CVendor,
resource: str,
system_definition: SystemDefinition,
publishers: list[Publisher] | None = None,
**kwargs,
) -> InstroI2C
Create a InstroI2C instrument with the appropriate driver.
This factory method automatically selects and initializes the correct I2C driver based on the specified vendor, then creates a InstroI2C instrument instance configured with the provided system definition.
Parameters:
-
(name¤str) –A name to identify this I2C instance. Used in channel naming and published data.
-
(vendor¤I2CVendor) –The I2C adapter vendor enum (e.g., TOTAL_PHASE).
-
(resource¤str) –Vendor-specific resource identifier (e.g., serial number, device_id).
-
(system_definition¤SystemDefinition) –Complete system definition containing device configurations, register maps, and communication parameters.
-
(publishers¤list[Publisher] | None, default:None) –List of publishers to send data to when executing methods. Defaults to None.
-
–**kwargs¤Optional keyword arguments used as tags throughout the life of the instrument. These tags are applied to the Measurement and Command objects and can be utilized by publishers like NominalCorePublisher as added metadata.
Special keyword arguments: dataset_rid (str): If provided, automatically creates and adds a NominalCorePublisher with the specified dataset RID. Assumes a Nominal 'default' credential is stored on disk.
Returns:
-
InstroI2C(InstroI2C) –A fully initialized InstroI2C instrument ready for I2C communication.
write_read_raw
¤
write_read_raw(
address: int,
payload: bytes,
length: int,
endianness: Literal["little", "big"],
) -> int
Perform a raw I2C write-read operation without an intermediate stop condition.
This method writes data to the specified I2C address and immediately reads back a response without issuing an I2C stop condition between operations. This is useful for register reads where the register address is written first, followed by reading the register contents.
Parameters:
-
(address¤int) –7-bit I2C device address.
-
(payload¤bytes) –Bytes to write (typically the register address).
-
(length¤int) –Number of bytes to read back.
-
(endianness¤Literal['little', 'big']) –Byte order for integer conversion.
Returns:
-
int(int) –The read data converted to an integer using the specified endianness.
write_raw
¤
read_raw
¤
Perform a raw I2C read operation.
Reads arbitrary bytes from the specified I2C device address.
Parameters:
-
(address¤int) –7-bit I2C device address.
-
(length¤int) –Number of bytes to read.
-
(endianness¤Literal['little', 'big']) –Byte order for integer conversion.
Returns:
-
int(int) –The read data converted to an integer using the specified endianness.
write_then_read_raw
¤
write_then_read_raw(
address: int,
payload: bytes,
length: int,
endianness: Literal["little", "big"],
) -> int
Perform a raw I2C write-then-read operation with an I2C stop condition between operations.
This method writes data to the specified I2C address and then reads back a response, with an I2C stop condition issued between the write and read operations. This is critical for devices that require the stop condition to process the data before responding.
Parameters:
-
(address¤int) –7-bit I2C device address.
-
(payload¤bytes) –Bytes to write.
-
(length¤int) –Number of bytes to read back.
-
(endianness¤Literal['little', 'big']) –Byte order for integer conversion.
Returns:
-
int(int) –The read data converted to an integer using the specified endianness.
read
¤
read(
peripheral: str, register_alias: str, field: str = "", **kwargs
) -> Measurement
Read a register or specific field of a register from an I2C device using the system definition.
Reads from a device register defined in the system definition. The method handles all low-level details including register addressing, byte width, and endianness automatically based on the system definition configuration.
If a field name is provided, only that specific bit field within the register is extracted and returned. The field is masked and shifted according to its definition.
Parameters:
-
(peripheral¤str) –Name of the peripheral device in the system definition.
-
(register_alias¤str) –Alias of the register to read from.
-
(field¤str, default:'') –Optional field name within the register to extract. If provided, only that specific bit field is extracted. Defaults to "".
-
–**kwargs¤Optional keyword arguments used as tags. These tags are applied to the Measurement object and can be utilized by publishers like NominalCorePublisher as added metadata.
Returns:
-
Measurement(Measurement) –A Measurement object containing the channel name, read value, timestamp, and tags. The measurement is automatically published to all configured publishers.
Raises:
-
ValueError–If the peripheral is not a register-based device.
write
¤
write(
peripheral: str,
register_alias: str,
value: int,
field: str = "",
**kwargs,
) -> Command
Write to a register or specific field of the register on an I2C device using the system definition.
Writes to a device register defined in the system definition. The method handles all low-level details including register addressing, byte width, and endianness automatically based on the system definition configuration.
If a field name is provided, this performs a read-modify-write (RMW) operation: 1. Read the current register value 2. Mask out the field bits 3. Insert the new field value 4. Write the modified register back
The RMW operation is thread-safe, protected by an internal lock.
Parameters:
-
(peripheral¤str) –Name of the peripheral device in the system definition.
-
(register_alias¤str) –Alias of the register to write to.
-
(value¤int) –Value to write. For fields, this is the field value, not the full register.
-
(field¤str, default:'') –Optional field name within the register to modify. If provided, performs a read-modify-write operation. Defaults to "".
-
–**kwargs¤Optional keyword arguments used as tags. These tags are applied to the Command object and can be utilized by publishers like NominalCorePublisher as added metadata.
Returns:
-
Command(Command) –A Command object containing the channel name, written value, timestamp, and tags. The command is automatically published to all configured publishers.
Raises:
-
ValueError–If the peripheral is not a register-based device.
reset_reg
¤
reset_reg(peripheral: str, register_alias: str, **kwargs)
Reset a register to its default value as defined in the system definition.
Writes the default value specified in the system definition's register configuration back to the hardware register. This is useful for initialization or recovery operations.
Parameters:
-
(peripheral¤str) –Name of the peripheral device in the system definition.
-
(register_alias¤str) –Alias of the register to reset.
-
–**kwargs¤Optional keyword arguments used as tags. These tags are applied to the Command object and can be utilized by publishers like NominalCorePublisher as added metadata.
Raises:
-
ValueError–If the peripheral is not a register-based device.
query
¤
query(peripheral: str, batch_command: str, **kwargs) -> Measurement
Send a command to a device and read back the response with an I2C stop condition between operations.
This method performs a write operation followed by a read operation with an I2C stop condition in between. This is critical for devices that require the stop condition to process the command before responding.
Parameters:
-
(peripheral¤str) –The peripheral device name in the system definition.
-
(batch_command¤str) –A predefined batch command name that combines multiple command enums. The system automatically ORs the enum values together.
-
–**kwargs¤Optional keyword arguments used as tags. These tags are applied to the Measurement object and can be utilized by publishers like NominalCorePublisher as added metadata.
Returns:
-
Measurement(Measurement) –A Measurement object containing the channel name, response value, timestamp, and tags. The measurement is automatically published to all configured publishers.
Raises:
-
ValueError–If the peripheral is not a command-based device, or if any of the command enums in the batch_command are not defined in the device's commands.
close
¤
close()
Close the I2C instrument and release hardware resources.
Performs cleanup by closing the underlying driver connection and releasing any hardware resources. This should be called when the instrument is no longer needed to ensure proper resource cleanup.
add_publisher
¤
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish a Measurement or Command to all configured publishers.
This method sends the data to all publishers that have been added to this instrument. Additional keyword arguments are passed through to each publisher's publish method.
Parameters:
-
(data¤Measurement | Command) –The measurement or command data to publish.
-
–**kwargs¤Optional keyword arguments passed to each publisher's publish method.
get_identity
¤
get_identity() -> str
Query the instrument's identity string.
This method sends the SCPI "*IDN?" command to retrieve the instrument's identification information, which typically includes manufacturer, model number, serial number, and firmware version.
Returns:
-
str(str) –The instrument's identity string.
Raises:
-
RuntimeError–If no connection is configured for this instrument (data_handler is None).
open_and_get_identity
staticmethod
¤
open_and_get_identity(connection_config: ConnectConfig) -> str
Convenience method to open a connection, query identity, and close.
This static method creates a temporary instrument instance, opens a connection, queries the identity, closes the connection, and returns the identity string. Useful for instrument discovery without maintaining a long-lived connection.
Parameters:
-
(connection_config¤ConnectConfig) –The connection configuration for the instrument.
Returns:
-
str(str) –The instrument's identity string.
Raises:
-
RuntimeError–If the connection cannot be established or the identity cannot be queried.
open
¤
open()
Open a connection to the instrument.
This method establishes communication with the instrument if a connection configuration has been provided. If no data_handler exists (connection_config was None), this method does nothing.
send_arbitrary_command
¤
Send an arbitrary command string to the instrument.
This method allows sending custom commands directly to the instrument without going through the instrument's high-level API. Useful for accessing instrument-specific features not exposed by the standard interface.
Parameters:
Raises:
-
RuntimeError–If no connection is configured for this instrument (data_handler is None).
query_arbitrary_command
¤
Send an arbitrary command string and return the instrument's response.
This method sends a command and waits for a response, useful for querying instrument state or configuration. The response is returned as a string.
Parameters:
Returns:
-
str(str) –The instrument's response to the query command.
Raises:
-
RuntimeError–If no connection is configured for this instrument (data_handler is None).
add_background_daemon_function
¤
Adds a function (and its arguments) to be periodically called by the background worker daemon.
There may already be functions defined and this will add to the end of that list.
Call define_background_daemon to clear and define a new function if you do not want the default
functions to run.
Parameters:
start
¤
start()
Start the background worker thread.
This method creates and starts a daemon thread that periodically executes all
registered background daemon functions. If the thread is already running, this
method does nothing. The thread will continue running until stop() is called.
stop
¤
stop()
Stop the background worker thread.
This method signals the background worker thread to stop and waits for it to complete. If the thread is not running, this method does nothing.
get_channel
¤
get_channel(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Get most recent Measurement data for a specific channel from the buffer.
Parameters:
-
(channel_name¤str) –The name of the channel from which to retrieve data.
-
(length¤int, default:1) –The number of most recent samples to return.
-
(wait_for_latest¤bool, default:False) –Block and wait for the next channel value(s).
-
(timeout¤float, default:10.0) –Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. Only applies when wait_for_latest=True.
Returns:
-
Measurement(Measurement) –A Measurement object containing the requested channel data and timestamps.
Raises:
-
RuntimeError–No background buffer exists because start() was not called.
-
ChannelNotFoundTimeoutError–If wait_for_latest=True and the channel does not appear within timeout.
-
ChannelValueTimeoutError–If wait_for_latest=True and sufficient values are not available within timeout.
define_background_daemon
¤
Clear all background daemon functions and register a single custom method.
This method clears any previously registered background daemon functions and registers a new method to be called in the background daemon loop. This is useful when you want to replace the default background functions with a custom implementation.
Parameters:
InstroI2CFacade
¤
InstroI2CFacade(nominal_i2c: InstroI2C)
Facade class that provides drivers with access to InstroI2C configuration.
This class implements the APIInstroI2C interface and allows drivers to access I2C configuration without direct access to the InstroI2C instance's internal state.
Parameters:
Types & Configuration¤
I2C instrument interface and helpers.
Defines I2CVendor and related helpers.
Public API:
I2CVendor, ScalingFunction, LinearScaling, CustomScaling, DataFormat, FieldDef, RegisterDef, CommandDef, RegisterDevice, CommandDevice, SystemDefinition
I2CVendor
¤
ScalingFunction
¤
LinearScaling
¤
Bases: ScalingFunction
Linear scaling transformation: physical = offset + gain * raw.
Most common scaling type for sensors with linear response. Works with both signed and unsigned raw integer values (sign extension handled at register level).
Parameters:
-
(gain¤float, default:1.0) –Multiplication factor (default: 1.0)
-
(offset¤float, default:0.0) –Additive offset (default: 0.0)
Example
Temperature sensor: 0.1°C per count, -40°C offset¤
temp_scaling = LinearScaling(gain=0.1, offset=-40.0) temp_scaling.to_physical(500) # Returns 10.0°C 10.0
CustomScaling
¤
CustomScaling(
to_physical_fn: Callable[[int], float],
to_raw_fn: Callable[[float], int] | None = None,
)
Bases: ScalingFunction
Custom scaling using user-provided callable functions.
Allows arbitrary scaling transformations for non-linear sensors or complex conversion requirements. Optionally supports inverse transformation.
Parameters:
-
(to_physical_fn¤Callable[[int], float]) –Function to convert raw → physical
-
(to_raw_fn¤Callable[[float], int] | None, default:None) –Optional function to convert physical → raw
Example
ADC with non-linear voltage divider¤
adc_scaling = CustomScaling( ... to_physical=lambda x: x / 4095 * 5.0 * 7.2, ... to_raw=lambda v: int(v / 7.2 / 5.0 * 4095) ... )
DataFormat
dataclass
¤
DataFormat(
transfer_bits: int,
data_width_bits: Optional[int] = None,
data_lsb: int = 0,
signed: bool = False,
scaling: Optional[ScalingFunction] = None,
units: str = "",
)
Unified data format specification for I2C data extraction and scaling.
Defines how to extract logical data from transfer bits, handle sign extension, and convert between raw integer values and physical units.
Attributes:
-
transfer_bits(int) –Total bits transferred over I2C
-
data_width_bits(Optional[int]) –Logical data bits if different from transfer_bits (None = use all)
-
data_lsb(int) –Starting bit position of logical data (default: 0)
-
signed(bool) –Whether data is signed (default: False)
-
scaling(Optional[ScalingFunction]) –Optional scaling function to convert raw data to physical units
-
units(str) –Physical units string (default: "")
FieldDef
dataclass
¤
Definition of a bit field within a register.
Represents a contiguous group of bits within a register that stores a specific setting or value (e.g., enable flag, mode selection).
Attributes:
-
name(str) –Field identifier (e.g., "enable", "mode")
-
lsb(int) –Least significant bit position (0-indexed)
-
width_bits(int) –Number of bits in the field (default: 1)
Example
A 2-bit MODE field at bits [4:3]¤
mode_field = FieldDef(name="mode", lsb=3, width_bits=2)
mask
¤
mask() -> int
Return a bitmask for this field.
Purpose¤
A field is a contiguous set of bits inside a register that stores a
specific setting or value (e.g., ENABLE flag, MODE selection). The
mask() method computes a bitmask where those field bits are set to 1
and all other bits are 0. This allows easy extraction or modification of
just the field bits without disturbing the rest of the register.
For example: If lsb = 3 and width = 2, the mask is: ((1 << 2) - 1) << 3 -> 0b00011000
This mask is used in:
• Reading a field: (reg_value & mask) >> lsb
• Writing a field: reg_value = (reg_value & ~mask) | ((new_val << lsb) & mask)
RegisterDef
dataclass
¤
RegisterDef(
alias: str,
register: int,
default_value: int = 0,
format: DataFormat = field(
default_factory=lambda: DataFormat(transfer_bits=8)
),
endianness: Literal["little", "big"] = "big",
fields: dict[str, FieldDef] = field(default_factory=dict),
)
Definition of an I2C device register.
Complete specification of a register including its address, data format, default value, and bit field definitions.
Attributes:
-
alias(str) –Human-readable register name (e.g., "LED_OUTPUT_STATE")
-
register(int) –Register address value
-
default_value(int) –Power-on reset value (default: 0x00)
-
format(DataFormat) –Data format for extraction and scaling
-
endianness(Literal['little', 'big']) –Multi-byte register byte order (default: "big")
-
fields(dict[str, FieldDef]) –Named bit fields within this register
format
class-attribute
instance-attribute
¤
format: DataFormat = field(
default_factory=lambda: DataFormat(transfer_bits=8)
)
fields
class-attribute
instance-attribute
¤
extract_data
¤
Extract logical data bits from transfer container.
float_from_raw
¤
Convert raw register value to physical units.
CommandDef
dataclass
¤
Definition of a command enum for command-based I2C operations.
Associates a command name with its enum type. Commands are enum values that can be OR'd together and sent to devices.
Attributes:
Example
class ADCChannel(Enum): ... CH0 = 0x00 ... CH1 = 0x10 cmd = CommandDef(name="channel", values=ADCChannel)
RegisterDevice
dataclass
¤
RegisterDevice(
name: str,
address: int,
addr_width_bytes: int = 1,
registers: dict[str, RegisterDef] = dict(),
)
Register-based I2C device (e.g., GPIO expanders).
Uses register addressing to read/write specific registers.
Attributes:
-
name(str) –Human-readable device identifier
-
address(int) –7-bit I2C address
-
addr_width_bytes(int) –Register address width (1 or 2 bytes, default: 1)
-
registers(dict[str, RegisterDef]) –Register definitions keyed by alias
CommandDevice
dataclass
¤
CommandDevice(
name: str,
address: int,
data_format: DataFormat,
valid_commands: dict[str, CommandDef] = dict(),
batch_commands: dict[str, list[Enum]] = dict(),
endianness: Literal["little", "big"] = "big",
)
Command-based I2C device (e.g., ADCs).
Sends commands and reads back responses. No register addressing.
Attributes:
-
name(str) –Human-readable device identifier
-
address(int) –7-bit I2C address
-
data_format(DataFormat) –Data format for command responses
-
valid_commands(dict[str, CommandDef]) –Valid command enums keyed by name
-
batch_commands(dict[str, list[Enum]]) –Predefined command combinations keyed by alias
-
endianness(Literal['little', 'big']) –Multi-byte response byte order (default: "big")
SystemDefinition
dataclass
¤
Complete I2C system definition containing all devices.
Top-level container for all I2C device definitions in a system. Pass this to InstroI2C.create() to enable register and command access by name.
Attributes:
Example
system = SystemDefinition() system.add_device(RegisterDevice(name="power_gpio", address=0x20)) system.add_device(CommandDevice(name="adc", address=0x09, data_format=DataFormat(...)))
Use with InstroI2C¤
i2c = InstroI2C.create( ... name="main_i2c", ... vendor=I2CVendor.TOTAL_PHASE, ... resource="123456", ... system_definition=system ... ) i2c.write("power_gpio", "LED_OUTPUT_STATE", 0xFF)