Skip to content

Example: DAQ read analog HW timed.ยค

"""Example: DAQ read analog HW timed.

Demonstrates publishing measurements/commands to a dataset (Nominal Core publisher).

"""

import time

from nominal_instro.instruments import NominalDAQ
from nominal_instro.instruments.daq.types import DAQVendor, Direction
from nominal_instro.lib.publishers.nominal_core import NominalCorePublisher

# Configuration: Choose your vendor.
VENDOR = DAQVendor.LABJACK_T_SERIES

# Vendor-specific configuration. For example purposes.
# Each vendor uses different channel naming conventions and resource ID formats. See vendor documentation for details.
match VENDOR:
    case DAQVendor.LABJACK_T_SERIES:
        CHANNEL_0 = "AIN0"
        CHANNEL_1 = "AIN1"
        RESOURCE_ID = "440020473"  # LabJack serial number
    case DAQVendor.NI:
        CHANNEL_0 = "ai0"
        CHANNEL_1 = "ai1"
        RESOURCE_ID = "Dev1"  # NI device name, as defined in MAX (e.g., "Dev1", "Dev2")
    case DAQVendor.KEYSIGHT_34980:
        from nominal_instro.lib import ConnectConfig

        CHANNEL_0 = "1009"
        CHANNEL_1 = "1010"
        RESOURCE_ID = ConnectConfig(visa_resource="USB0::0x0957::0x0507::MY44001757::INSTR")  # VISA resource
    case DAQVendor.MCC:
        CHANNEL_0 = "0"
        CHANNEL_1 = "1"
        RESOURCE_ID = "344371:0"  # MCC DAQ device ID, optionally suffixed with ":<board_number>" (default 0)

# Nominal Core dataset to send data to as the instrument is operated.
DATASET_RID = "<dataset_rid>" # Replace with your dataset RID. 

### Main code

daq = NominalDAQ.create(name="myDAQ", vendor=VENDOR, resource=RESOURCE_ID)
daq.add_publisher(NominalCorePublisher(dataset_rid=DATASET_RID))

daq.open()

try:
    daq.configure_analog_channel(
        direction=Direction.INPUT, physical_channel=CHANNEL_0, alias=f"ch_0", range_min=0, range_max=5
    )
    daq.configure_analog_channel(
        direction=Direction.INPUT, physical_channel=CHANNEL_1, alias=f"ch_1", range_min=0, range_max=5
    )

    daq.configure_ai_sample_rate(sample_rate=100)

    # Start the acquisition.
    # This launches a background daemon that fetches samples from the DAQ device buffer.
    daq.start()

    while True:
        try:
            ch_1 = daq.get_channel("myDAQ.ch_0", 1, True)  # This will block for the latest sample
            ch_2 = daq.get_channel("myDAQ.ch_1", 10, False)  # This will immedietly return with 10 samples.
            print(f"Channel 1 latest: {ch_1.latest}")
            print(f"Channel 1 samples: {ch_1.values}")
            print(f"Channel 2 latest: {ch_2.latest}")
            print(f"Channel 2 samples: {ch_2.values}")
        except KeyboardInterrupt:
            print("Exiting main loop")
            break

    daq.stop()
finally:
    print("Closing DAQ")
    daq.close()