Publishers¤
Publisher Protocol & Base Classes¤
File Publishers¤
Files (internal support module).
Primary entry point: FileWriter.
FileWriter
¤
FilePublisher
¤
FilePublisher(
directory: str | Path,
format: Literal["json", "csv", "avro"] = "avro",
custom_file_name: str | None = None,
)
Parameters:
-
(directory¤str | Path) –Directory where the output file will be saved.
-
(format¤Literal['json', 'csv', 'avro'], default:'avro') –File format to use, either "json","csv","avro". Defaults to "avro".
-
(custom_file_name¤str | None, default:None) –Custom file name (without extension). If not provided, a name based on the current datetime will be generated.
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish data to file using the appropriate writer.
AvroFileWriter
¤
AvroFileWriter(file_path: Path)
Handles Avro format writing with proper file management using fastavro.
schema
instance-attribute
¤
schema = {
"type": "record",
"namespace": "io.nominal.ingest",
"name": "AvroStream",
"fields": [
{"name": "channel", "type": "string"},
{
"name": "timestamps",
"type": {"type": "array", "items": "long"},
},
{
"name": "values",
"type": {"type": "array", "items": ["double", "string"]},
},
{
"name": "tags",
"type": {"type": "map", "values": "string"},
"default": {},
},
],
}
Nominal Core Publisher¤
Nominal Core (internal support module).
Primary entry point: NominalCorePublisher.
NominalCorePublisher
¤
NominalCorePublisher(
dataset_rid: str,
batch_size: int | None = None,
max_wait: timedelta | None = None,
file_fallback: Path | None = None,
profile: str | None = None,
api_key: str | None = None,
)
Any optional arguments will use the Nominal Core Python API's default values if omitted.
Parameters:
-
(dataset_rid¤str) –The RID of the dataset/datasource you wish to push data to.
-
(batch_size¤int | None, default:None) –How big the batch can get before writing to Nominal.
-
(max_wait¤timedelta | None, default:None) –How long a batch can exist before being flushed to Nominal.
-
(file_fallback¤Path | None, default:None) –Path to the backup '.avro' file, written to when connectivity is intermittent.
-
(profile¤str | None, default:None) –Uses a preconfigured configuration stored on disk. Default is "default".
-
(api_key¤str | None, default:None) –Pass in a Nominal Core API Key. Default is to use credentials stored on disk. See https://docs.nominal.io/core/sdk/python-client/authentication#using-the-api-key
Nominal Connect Publisher¤
Nominal Connect (internal support module).
Primary entry point: NominalConnectPublisher.
NominalConnectPublisher
¤
Publisher for sending measurement and command data to Nominal Connect.
Implements the Publisher protocol to publish Measurement and Command data to Nominal Connect as streams.
Note
String values are silently skipped as Nominal Connect does not support string data types.
Parameters:
-
(client¤Client) –The Nominal Connect client instance used for publishing.
-
(stream_id¤str) –The identifier of the stream to publish data to.
DEFAULT_STREAM_SOURCE
class-attribute
instance-attribute
¤
DEFAULT_STREAM_SOURCE = 'nominal_instrumentation'
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement or command data to Nominal Connect.
Publishes each channel in the data as a separate stream batch. For Measurement objects, all timestamps and values are published. For Command objects, a single timestamp-value pair is published per channel.
Parameters:
-
(data¤Measurement | Command) –Measurement or Command object containing channel data to publish.
-
–**kwargs¤Additional keyword arguments (currently unused).
Note
String values are silently skipped as Nominal Connect does not support string data types.
close
¤
close()
Close the publisher and release any resources.
Currently a no-op as no resources need to be explicitly released.
Channel Buffers¤
ChannelNotFoundError
¤
Bases: TimeoutError
Raised when a channel does not appear within the specified timeout.
ChannelValueTimeoutError
¤
Bases: TimeoutError
Raised when sufficient channel values are not available within the specified timeout.
ChannelBufferPublisher
¤
ChannelBufferPublisher(maxlen: int)
Bases: ABC
Base class for channel buffer publishers that store measurements in memory.
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement data to the buffers with thread synchronization.
get
¤
get(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Get the latest measurements from a channel.
Parameters:
-
(channel_name¤str) –The name of the channel from which to retrieve data.
-
(length¤int, default:1) –The number of most recent samples to return.
-
(wait_for_latest¤bool, default:False) –Block and wait for the next channel value(s).
-
(timeout¤float, default:10.0) –Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. When wait_for_latest=True, waits for new samples. When wait_for_latest=False, waits for sufficient samples to exist in the buffer.
Returns:
-
Measurement(Measurement) –A Measurement object containing the requested channel data and timestamps.
Raises:
-
ChannelNotFoundError–If the channel does not appear within timeout.
-
ChannelValueTimeoutError–If sufficient values are not available within timeout.
DequeInMemoryPublisher
¤
DequeInMemoryPublisher(maxlen: int)
Bases: ChannelBufferPublisher
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement data to the buffers with thread synchronization.
get
¤
get(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Get the latest measurements from a channel.
Parameters:
-
(channel_name¤str) –The name of the channel from which to retrieve data.
-
(length¤int, default:1) –The number of most recent samples to return.
-
(wait_for_latest¤bool, default:False) –Block and wait for the next channel value(s).
-
(timeout¤float, default:10.0) –Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. When wait_for_latest=True, waits for new samples. When wait_for_latest=False, waits for sufficient samples to exist in the buffer.
Returns:
-
Measurement(Measurement) –A Measurement object containing the requested channel data and timestamps.
Raises:
-
ChannelNotFoundError–If the channel does not appear within timeout.
-
ChannelValueTimeoutError–If sufficient values are not available within timeout.
NumpyInMemoryPublisher
¤
Bases: ChannelBufferPublisher
publish
¤
publish(data: Measurement | Command, **kwargs)
Publish measurement data to the buffers with thread synchronization.
get
¤
get(
channel_name: str,
length: int = 1,
wait_for_latest: bool = False,
timeout: float = 10.0,
) -> Measurement
Get the latest measurements from a channel.
Parameters:
-
(channel_name¤str) –The name of the channel from which to retrieve data.
-
(length¤int, default:1) –The number of most recent samples to return.
-
(wait_for_latest¤bool, default:False) –Block and wait for the next channel value(s).
-
(timeout¤float, default:10.0) –Timeout in seconds when waiting for channel or values. Defaults to 10.0 seconds. When wait_for_latest=True, waits for new samples. When wait_for_latest=False, waits for sufficient samples to exist in the buffer.
Returns:
-
Measurement(Measurement) –A Measurement object containing the requested channel data and timestamps.
Raises:
-
ChannelNotFoundError–If the channel does not appear within timeout.
-
ChannelValueTimeoutError–If sufficient values are not available within timeout.