Example: DAQ read analog HW timed without a background daemon.
"""Example: DAQ read analog HW timed without a background daemon.
Demonstrates publishing measurements/commands to a dataset (Nominal Core publisher).
"""
from nominal_instro.instruments import NominalDAQ
from nominal_instro.instruments.daq.types import DAQVendor, Direction
from nominal_instro.lib.publishers.nominal_core import NominalCorePublisher
# Configuration: Choose your vendor.
VENDOR = DAQVendor.LABJACK_T_SERIES
# Vendor-specific configuration. For example purposes.
# Each vendor uses different channel naming conventions and resource ID formats. See vendor documentation for details.
match VENDOR:
case DAQVendor.LABJACK_T_SERIES:
CHANNEL_0 = "AIN0"
CHANNEL_1 = "AIN1"
RESOURCE_ID = "440020473" # LabJack serial number
case DAQVendor.NI:
CHANNEL_0 = "ai0"
CHANNEL_1 = "ai1"
RESOURCE_ID = "Dev1" # NI device name, as defined in MAX (e.g., "Dev1", "Dev2")
case DAQVendor.KEYSIGHT_34980:
from nominal_instro.lib import ConnectConfig
CHANNEL_0 = "1009"
CHANNEL_1 = "1010"
RESOURCE_ID = ConnectConfig(visa_resource="USB0::0x0957::0x0507::MY44001757::INSTR") # VISA resource
case DAQVendor.MCC:
CHANNEL_0 = "0"
CHANNEL_1 = "1"
RESOURCE_ID = "344371:0" # MCC DAQ device ID, optionally suffixed with ":<board_number>" (default 0)
# Nominal Core dataset to send data to as the instrument is operated.
DATASET_RID = "<dataset_rid>" # Replace with your dataset RID.
### Main code
daq = NominalDAQ.create(name="myDAQ", vendor=VENDOR, resource=RESOURCE_ID)
daq.add_publisher(NominalCorePublisher(dataset_rid=DATASET_RID))
daq.open()
# Do not allow a background daemon to manage fetching data from the DAQ.
# We will do that ourselves in the main app loop
daq.background_enable = False
try:
daq.configure_analog_channel(
direction=Direction.INPUT, physical_channel=CHANNEL_0, alias=f"ch_0", range_min=0, range_max=5
)
daq.configure_analog_channel(
direction=Direction.INPUT, physical_channel=CHANNEL_1, alias=f"ch_1", range_min=0, range_max=5
)
# Set the sample rate but also the number of samples we will fetch each time daq.read_analog() is called.
daq.configure_ai_sample_rate(sample_rate=100, samples_per_channel=50)
# Start the acquisition
daq.start()
while True:
try:
# Main progam loop
data = daq.read_analog()
except KeyboardInterrupt:
print("Exiting main loop")
break
daq.stop()
finally:
print("Closing DAQ")
daq.close()