Skip to content

Publishers¤

Publisher Protocol & Base Classes¤

Publisher (internal support module).

Primary entry point: Publisher.

logger module-attribute ¤

logger = getLogger(__name__)

Publisher ¤

Bases: Protocol

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

BufferedPublisher ¤

BufferedPublisher(publisher: Publisher, buffer_size: int = 1000)

Bases: ABC

publisher instance-attribute ¤

publisher = publisher

buffer instance-attribute ¤

buffer: list[Measurement | Command] = []

buffer_size instance-attribute ¤

buffer_size = buffer_size

publish ¤

publish(data: Measurement | Command, **kwargs)

publish_batch abstractmethod ¤

publish_batch()

close ¤

close()

BasicBufferedPublisher ¤

BasicBufferedPublisher(publisher: Publisher, buffer_size: int = 1000)

Bases: BufferedPublisher

publisher instance-attribute ¤

publisher = publisher

buffer instance-attribute ¤

buffer: list[Measurement | Command] = []

buffer_size instance-attribute ¤

buffer_size = buffer_size

publish_batch ¤

publish_batch()

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

QueuedPublisher ¤

QueuedPublisher(
    publisher: Publisher,
    max_queue_size: int = 1000,
    wait_for_queue: bool = False,
)

Bases: Publisher

publisher instance-attribute ¤

publisher = publisher

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

File Publishers¤

Files (internal support module).

Primary entry point: FileWriter.

FileWriter ¤

Bases: Protocol

file_path instance-attribute ¤

file_path: Path

write ¤

write(data: Measurement | Command)

open ¤

open()

close ¤

close()

FilePublisher ¤

FilePublisher(
    directory: str | Path,
    format: Literal["json", "csv", "avro"] = "avro",
    custom_file_name: str | None = None,
)

Parameters:

  • directory ¤

    (str | Path) –

    Directory where the output file will be saved.

  • format ¤

    (Literal['json', 'csv', 'avro'], default: 'avro' ) –

    File format to use, either "json","csv","avro". Defaults to "avro".

  • custom_file_name ¤

    (str | None, default: None ) –

    Custom file name (without extension). If not provided, a name based on the current datetime will be generated.

directory instance-attribute ¤

directory = Path(directory)

format instance-attribute ¤

format = format

custom_file_name instance-attribute ¤

custom_file_name = custom_file_name

file_path instance-attribute ¤

file_path = directory / file_name

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish data to file using the appropriate writer.

open ¤

open()

close ¤

close()

Close the publisher and ensure all data is written.

JsonFileWriter ¤

JsonFileWriter(file_path: Path)

Handles JSON format writing with proper file management.

file_path instance-attribute ¤

file_path = file_path

write ¤

write(data: Measurement | Command)

Append data to JSON file.

open ¤

open()

close ¤

close()

Close the file writer.

CsvFileWriter ¤

CsvFileWriter(file_path: Path)

Handles CSV format writing with proper file management.

file_path instance-attribute ¤

file_path = file_path

write ¤

write(data: Measurement | Command)

Append data to CSV file.

open ¤

open()

close ¤

close()

Close the file writer.

AvroFileWriter ¤

AvroFileWriter(file_path: Path)

Handles Avro format writing with proper file management using fastavro.

file_path instance-attribute ¤

file_path = file_path

schema instance-attribute ¤

schema = {
    "type": "record",
    "namespace": "io.nominal.ingest",
    "name": "AvroStream",
    "fields": [
        {"name": "channel", "type": "string"},
        {
            "name": "timestamps",
            "type": {"type": "array", "items": "long"},
        },
        {
            "name": "values",
            "type": {"type": "array", "items": ["double", "string"]},
        },
        {
            "name": "tags",
            "type": {"type": "map", "values": "string"},
            "default": {},
        },
    ],
}

write ¤

write(data: Measurement | Command)

Append data to Avro file.

open ¤

open()

close ¤

close()

Close the file writer.

Nominal Core Publisher¤

Nominal Core (internal support module).

Primary entry point: NominalCorePublisher.

NominalCorePublisher ¤

NominalCorePublisher(
    dataset_rid: str,
    batch_size: int | None = None,
    max_wait: timedelta | None = None,
    file_fallback: Path | None = None,
    profile: str | None = None,
    api_key: str | None = None,
)

Any optional arguments will use the Nominal Core Python API's default values if omitted.

Parameters:

  • dataset_rid ¤

    (str) –

    The RID of the dataset/datasource you wish to push data to.

  • batch_size ¤

    (int | None, default: None ) –

    How big the batch can get before writing to Nominal.

  • max_wait ¤

    (timedelta | None, default: None ) –

    How long a batch can exist before being flushed to Nominal.

  • file_fallback ¤

    (Path | None, default: None ) –

    Path to the backup '.avro' file, written to when connectivity is intermittent.

  • profile ¤

    (str | None, default: None ) –

    Uses a preconfigured configuration stored on disk. Default is "default".

  • api_key ¤

    (str | None, default: None ) –

    Pass in a Nominal Core API Key. Default is to use credentials stored on disk. See https://docs.nominal.io/core/sdk/python-client/authentication#using-the-api-key

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

Nominal Connect Publisher¤

Nominal Connect (internal support module).

Primary entry point: NominalConnectPublisher.

logger module-attribute ¤

logger = getLogger(__name__)

NominalConnectPublisher ¤

NominalConnectPublisher(client: Client, stream_id: str)

Publisher for sending measurement and command data to Nominal Connect.

Implements the Publisher protocol to publish Measurement and Command data to Nominal Connect as streams.

Note

String values are silently skipped as Nominal Connect does not support string data types.

Parameters:

  • client ¤

    (Client) –

    The Nominal Connect client instance used for publishing.

  • stream_id ¤

    (str) –

    The identifier of the stream to publish data to.

DEFAULT_STREAM_SOURCE class-attribute instance-attribute ¤

DEFAULT_STREAM_SOURCE = 'nominal_instrumentation'

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement or command data to Nominal Connect.

Publishes each channel in the data as a separate stream batch. For Measurement objects, all timestamps and values are published. For Command objects, a single timestamp-value pair is published per channel.

Parameters:

  • data ¤
    (Measurement | Command) –

    Measurement or Command object containing channel data to publish.

  • **kwargs ¤

    Additional keyword arguments (currently unused).

Note

String values are silently skipped as Nominal Connect does not support string data types.

close ¤

close()

Close the publisher and release any resources.

Currently a no-op as no resources need to be explicitly released.

Channel Buffers¤

ChannelNotFoundError ¤

Bases: TimeoutError

Raised when a channel does not appear within the specified timeout.

ChannelValueTimeoutError ¤

Bases: TimeoutError

Raised when sufficient channel values are not available within the specified timeout.

ChannelBufferPublisher ¤

ChannelBufferPublisher(maxlen: int)

Bases: ABC

Base class for channel buffer publishers that store measurements in memory.

maxlen instance-attribute ¤

maxlen = maxlen

size_bytes abstractmethod property ¤

size_bytes: int

Return the current memory in bytes.

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement data to the buffers with thread synchronization.

get ¤

get(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Get the latest measurements from a channel.

Parameters:

  • channel_name ¤
    (str) –

    The name of the channel from which to retrieve data.

  • length ¤
    (int, default: 1 ) –

    The number of most recent samples to return.

  • wait_for_latest ¤
    (bool, default: False ) –

    Block and wait for the next channel value(s).

  • timeout ¤
    (float, default: 10.0 ) –

    Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. When wait_for_latest=True, waits for new samples. When wait_for_latest=False, waits for sufficient samples to exist in the buffer.

Returns:

  • Measurement ( Measurement ) –

    A Measurement object containing the requested channel data and timestamps.

Raises:

close ¤

close() -> None

Close the publisher and clear all buffers with thread synchronization.

DequeInMemoryPublisher ¤

DequeInMemoryPublisher(maxlen: int)

Bases: ChannelBufferPublisher

size_bytes property ¤

size_bytes: int

Return the current memory in bytes.

maxlen instance-attribute ¤

maxlen = maxlen

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement data to the buffers with thread synchronization.

get ¤

get(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Get the latest measurements from a channel.

Parameters:

  • channel_name ¤
    (str) –

    The name of the channel from which to retrieve data.

  • length ¤
    (int, default: 1 ) –

    The number of most recent samples to return.

  • wait_for_latest ¤
    (bool, default: False ) –

    Block and wait for the next channel value(s).

  • timeout ¤
    (float, default: 10.0 ) –

    Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. When wait_for_latest=True, waits for new samples. When wait_for_latest=False, waits for sufficient samples to exist in the buffer.

Returns:

  • Measurement ( Measurement ) –

    A Measurement object containing the requested channel data and timestamps.

Raises:

close ¤

close() -> None

Close the publisher and clear all buffers with thread synchronization.

NumpyInMemoryPublisher ¤

NumpyInMemoryPublisher(maxlen: int, value_dtype: Any = float32)

Bases: ChannelBufferPublisher

size_bytes property ¤

size_bytes: int

Return the current memory in bytes.

maxlen instance-attribute ¤

maxlen = maxlen

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement data to the buffers with thread synchronization.

get ¤

get(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Get the latest measurements from a channel.

Parameters:

  • channel_name ¤
    (str) –

    The name of the channel from which to retrieve data.

  • length ¤
    (int, default: 1 ) –

    The number of most recent samples to return.

  • wait_for_latest ¤
    (bool, default: False ) –

    Block and wait for the next channel value(s).

  • timeout ¤
    (float, default: 10.0 ) –

    Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. When wait_for_latest=True, waits for new samples. When wait_for_latest=False, waits for sufficient samples to exist in the buffer.

Returns:

  • Measurement ( Measurement ) –

    A Measurement object containing the requested channel data and timestamps.

Raises:

close ¤

close() -> None

Close the publisher and clear all buffers with thread synchronization.

NumpyRingBuffer ¤

NumpyRingBuffer(maxlen: int, dtype: Any)

extend ¤

extend(values) -> None

get_latest ¤

get_latest(n: int) -> NDArray