Skip to content

Publishers¤

Publisher Protocol & Base Classes¤

Publisher protocol and buffering wrappers.

logger module-attribute ¤

logger = getLogger(__name__)

Publisher ¤

Bases: Protocol

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

BufferedPublisher ¤

BufferedPublisher(publisher: Publisher, buffer_size: int = 1000)

Bases: ABC

publisher instance-attribute ¤

publisher = publisher

buffer instance-attribute ¤

buffer: list[Measurement | Command] = []

buffer_size instance-attribute ¤

buffer_size = buffer_size

publish ¤

publish(data: Measurement | Command, **kwargs)

publish_batch abstractmethod ¤

publish_batch()

close ¤

close()

BasicBufferedPublisher ¤

BasicBufferedPublisher(publisher: Publisher, buffer_size: int = 1000)

Bases: BufferedPublisher

publisher instance-attribute ¤

publisher = publisher

buffer instance-attribute ¤

buffer: list[Measurement | Command] = []

buffer_size instance-attribute ¤

buffer_size = buffer_size

publish_batch ¤

publish_batch()

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

QueuedPublisher ¤

QueuedPublisher(
    publisher: Publisher,
    max_queue_size: int = 1000,
    wait_for_queue: bool = False,
)

Bases: Publisher

publisher instance-attribute ¤

publisher = publisher

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

File Publishers¤

File-based publishers (JSON, CSV, Avro).

FileWriter ¤

Bases: Protocol

file_path instance-attribute ¤

file_path: Path

write ¤

write(data: Measurement | Command)

open ¤

open()

close ¤

close()

FilePublisher ¤

FilePublisher(
    directory: str | Path,
    format: Literal["json", "csv", "avro"] = "avro",
    custom_file_name: str | None = None,
)

Parameters:

  • directory ¤

    (str | Path) –

    Output directory.

  • format ¤

    (Literal['json', 'csv', 'avro'], default: 'avro' ) –

    "json", "csv", or "avro" (default).

  • custom_file_name ¤

    (str | None, default: None ) –

    Filename without extension; defaults to measurements-<UTC-timestamp>.

directory instance-attribute ¤

directory = Path(directory)

format instance-attribute ¤

format = format

custom_file_name instance-attribute ¤

custom_file_name = custom_file_name

file_path instance-attribute ¤

file_path = directory / file_name

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish data to file using the appropriate writer.

open ¤

open()

close ¤

close()

Close the publisher and ensure all data is written.

JsonFileWriter ¤

JsonFileWriter(file_path: Path)

Handles JSON format writing with proper file management.

file_path instance-attribute ¤

file_path = file_path

write ¤

write(data: Measurement | Command)

Append data to JSON file.

open ¤

open()

close ¤

close()

Close the file writer.

CsvFileWriter ¤

CsvFileWriter(file_path: Path)

Handles CSV format writing with proper file management.

file_path instance-attribute ¤

file_path = file_path

write ¤

write(data: Measurement | Command)

Append data to CSV file.

open ¤

open()

close ¤

close()

Close the file writer.

AvroFileWriter ¤

AvroFileWriter(file_path: Path)

Handles Avro format writing with proper file management using fastavro.

file_path instance-attribute ¤

file_path = file_path

schema instance-attribute ¤

schema = {
    "type": "record",
    "namespace": "io.nominal.ingest",
    "name": "AvroStream",
    "fields": [
        {"name": "channel", "type": "string"},
        {
            "name": "timestamps",
            "type": {"type": "array", "items": "long"},
        },
        {
            "name": "values",
            "type": {"type": "array", "items": ["double", "string"]},
        },
        {
            "name": "tags",
            "type": {"type": "map", "values": "string"},
            "default": {},
        },
    ],
}

write ¤

write(data: Measurement | Command)

Append data to Avro file.

open ¤

open()

close ¤

close()

Close the file writer.

Nominal Core Publisher¤

Publisher that streams Measurement/Command data to Nominal Core datasets.

NominalCorePublisher ¤

NominalCorePublisher(
    dataset_rid: str,
    batch_size: int | None = None,
    max_wait: timedelta | None = None,
    file_fallback: Path | None = None,
    profile: str | None = None,
    api_key: str | None = None,
)

Omitted optional args fall through to the Nominal Core Python API defaults.

Parameters:

  • dataset_rid ¤

    (str) –

    Target dataset RID.

  • batch_size ¤

    (int | None, default: None ) –

    Max items per write batch.

  • max_wait ¤

    (timedelta | None, default: None ) –

    Max time a batch can age before being flushed.

  • file_fallback ¤

    (Path | None, default: None ) –

    .avro path used when connectivity is intermittent.

  • profile ¤

    (str | None, default: None ) –

    Named profile from the on-disk config (defaults to "default").

  • api_key ¤

    (str | None, default: None ) –

    Inline API key; falls back to on-disk credentials when omitted. See https://docs.nominal.io/core/sdk/python-client/authentication#using-the-api-key

publish ¤

publish(data: Measurement | Command, **kwargs)

close ¤

close()

Nominal Connect Publisher¤

Publisher that streams Measurement/Command data to Nominal Connect.

logger module-attribute ¤

logger = getLogger(__name__)

NominalConnectPublisher ¤

NominalConnectPublisher(client: Client, stream_id: str)

Publish Measurement/Command data to a Nominal Connect stream.

String values are silently dropped — Connect does not accept strings.

DEFAULT_STREAM_SOURCE class-attribute instance-attribute ¤

DEFAULT_STREAM_SOURCE = 'nominal_instrumentation'

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish data to Nominal Connect, one stream_batch per channel. Strings are skipped.

close ¤

close()

No-op; no resources to release.

Channel Buffers¤

ChannelNotFoundError ¤

Bases: TimeoutError

Raised when a channel does not appear within the specified timeout.

ChannelValueTimeoutError ¤

Bases: TimeoutError

Raised when sufficient channel values are not available within the specified timeout.

ChannelBufferPublisher ¤

ChannelBufferPublisher(maxlen: int)

Bases: ABC

Base class for channel buffer publishers that store measurements in memory.

maxlen instance-attribute ¤

maxlen = maxlen

size_bytes abstractmethod property ¤

size_bytes: int

Return the current memory in bytes.

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement data to the buffers with thread synchronization.

get ¤

get(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Return the trailing length samples for channel_name.

With wait_for_latest=True blocks for new samples; otherwise waits for the buffer to already hold length.

Raises:

close ¤

close() -> None

Close the publisher and clear all buffers with thread synchronization.

DequeInMemoryPublisher ¤

DequeInMemoryPublisher(maxlen: int)

Bases: ChannelBufferPublisher

size_bytes property ¤

size_bytes: int

Return the current memory in bytes.

maxlen instance-attribute ¤

maxlen = maxlen

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement data to the buffers with thread synchronization.

get ¤

get(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Return the trailing length samples for channel_name.

With wait_for_latest=True blocks for new samples; otherwise waits for the buffer to already hold length.

Raises:

close ¤

close() -> None

Close the publisher and clear all buffers with thread synchronization.

NumpyInMemoryPublisher ¤

NumpyInMemoryPublisher(maxlen: int, value_dtype: Any = float32)

Bases: ChannelBufferPublisher

size_bytes property ¤

size_bytes: int

Return the current memory in bytes.

maxlen instance-attribute ¤

maxlen = maxlen

publish ¤

publish(data: Measurement | Command, **kwargs)

Publish measurement data to the buffers with thread synchronization.

get ¤

get(
    channel_name: str,
    length: int = 1,
    wait_for_latest: bool = False,
    timeout: float = 10.0,
) -> Measurement

Return the trailing length samples for channel_name.

With wait_for_latest=True blocks for new samples; otherwise waits for the buffer to already hold length.

Raises:

close ¤

close() -> None

Close the publisher and clear all buffers with thread synchronization.

NumpyRingBuffer ¤

NumpyRingBuffer(maxlen: int, dtype: Any)

extend ¤

extend(values) -> None

get_latest ¤

get_latest(n: int) -> NDArray