Publishers¤
Publisher Protocol & Base Classes¤
File Publishers¤
File-based publishers (JSON, CSV, Avro).
FileWriter
¤
FilePublisher
¤
FilePublisher(
directory: str | Path,
format: Literal["json", "csv", "avro"] = "avro",
custom_file_name: str | None = None,
)
Parameters:
-
(directory¤str | Path) –Output directory.
-
(format¤Literal['json', 'csv', 'avro'], default:'avro') –"json","csv", or"avro"(default). -
(custom_file_name¤str | None, default:None) –Filename without extension; defaults to
measurements-<UTC-timestamp>.
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish data to file using the appropriate writer.
AvroFileWriter
¤
AvroFileWriter(file_path: Path)
Handles Avro format writing with proper file management using fastavro.
schema
instance-attribute
¤
schema = {
"type": "record",
"namespace": "io.nominal.ingest",
"name": "AvroStream",
"fields": [
{"name": "channel", "type": "string"},
{
"name": "timestamps",
"type": {"type": "array", "items": "long"},
},
{
"name": "values",
"type": {"type": "array", "items": ["double", "string"]},
},
{
"name": "tags",
"type": {"type": "map", "values": "string"},
"default": {},
},
],
}
Nominal Core Publisher¤
Publisher that streams Measurement/Command data to Nominal Core datasets.
NominalCorePublisher
¤
NominalCorePublisher(
dataset_rid: str,
batch_size: int | None = None,
max_wait: timedelta | None = None,
file_fallback: Path | None = None,
profile: str | None = None,
api_key: str | None = None,
)
Omitted optional args fall through to the Nominal Core Python API defaults.
Parameters:
-
(dataset_rid¤str) –Target dataset RID.
-
(batch_size¤int | None, default:None) –Max items per write batch.
-
(max_wait¤timedelta | None, default:None) –Max time a batch can age before being flushed.
-
(file_fallback¤Path | None, default:None) –.avropath used when connectivity is intermittent. -
(profile¤str | None, default:None) –Named profile from the on-disk config (defaults to
"default"). -
(api_key¤str | None, default:None) –Inline API key; falls back to on-disk credentials when omitted. See https://docs.nominal.io/core/sdk/python-client/authentication#using-the-api-key
Nominal Connect Publisher¤
Publisher that streams Measurement/Command data to Nominal Connect.
NominalConnectPublisher
¤
NominalConnectPublisher(client: Client, stream_id: str)
Publish Measurement/Command data to a Nominal Connect stream.
String values are silently dropped — Connect does not accept strings.
DEFAULT_STREAM_SOURCE
class-attribute
instance-attribute
¤
DEFAULT_STREAM_SOURCE = 'nominal_instrumentation'
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish data to Nominal Connect, one stream_batch per channel. Strings are skipped.
Channel Buffers¤
ChannelNotFoundError
¤
Bases: TimeoutError
Raised when a channel does not appear within the specified timeout.
ChannelValueTimeoutError
¤
Bases: TimeoutError
Raised when sufficient channel values are not available within the specified timeout.
ChannelBufferPublisher
¤
ChannelBufferPublisher(maxlen: int)
Bases: ABC
Base class for channel buffer publishers that store measurements in memory.
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement data to the buffers with thread synchronization.
get
¤
get(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Return the trailing length samples for channel_name.
With wait_for_latest=True blocks for new samples; otherwise waits for
the buffer to already hold length.
Raises:
-
ChannelNotFoundError–Channel did not appear within
timeout. -
ChannelValueTimeoutError–Values did not arrive within
timeout.
DequeInMemoryPublisher
¤
DequeInMemoryPublisher(maxlen: int)
Bases: ChannelBufferPublisher
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement data to the buffers with thread synchronization.
get
¤
get(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Return the trailing length samples for channel_name.
With wait_for_latest=True blocks for new samples; otherwise waits for
the buffer to already hold length.
Raises:
-
ChannelNotFoundError–Channel did not appear within
timeout. -
ChannelValueTimeoutError–Values did not arrive within
timeout.
NumpyInMemoryPublisher
¤
Bases: ChannelBufferPublisher
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement data to the buffers with thread synchronization.
get
¤
get(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Return the trailing length samples for channel_name.
With wait_for_latest=True blocks for new samples; otherwise waits for
the buffer to already hold length.
Raises:
-
ChannelNotFoundError–Channel did not appear within
timeout. -
ChannelValueTimeoutError–Values did not arrive within
timeout.