Example: Publishers: publish custom.
"""Example: Publishers: publish custom."""
import time
from nominal_instro.instruments import NominalPSU
from nominal_instro.lib import ConnectConfig
from nominal_instro.lib.types import Command, Measurement
VISA_RESOURCE = "TCPIP0::127.0.1::5025::SOCKET"
class PrintChannelDelegate:
def __init__(self, channel_name: str):
self.channel_name = channel_name
def publish(self, data: Measurement | Command, **kwargs):
"""Handle the data - return True on success, False on failure."""
try:
if isinstance(data, Measurement):
if data.channel_data.get(self.channel_name, None):
print(f"{self.channel_name}: {data.latest}")
except Exception as e:
print(f"Error publishing data: {e}")
def close(self):
pass
# Create instrument instances
psu = NominalPSU.auto_create(name="myPSU", resource=ConnectConfig(visa_resource=VISA_RESOURCE), num_channels=2)
psu.add_publisher(PrintChannelDelegate(channel_name="myPSU_ch2_i"))
try:
# Set up initial state of test
psu.open()
psu.output_enable(False, channel=2)
psu.set_current_limit(0.2, channel=2)
psu.set_voltage(0, channel=2)
psu.get_current(channel=2)
psu.get_voltage(channel=2)
# Start
psu.output_enable(True, channel=2)
for v in range(10):
psu.set_voltage(v, channel=2)
time.sleep(1) # Simulate some delay
psu.get_current(channel=2) # This value will be printed
psu.get_voltage(channel=2) # This value will not be printed
psu.output_enable(False, channel=2)
finally:
# Shut it down
psu.close()