watlowlib.streaming¶
Sample, record(), OverflowPolicy, AcquisitionSummary, and the
PollSource Protocol that the recorder drives. See
Streaming and Logging and acquisition.
Public surface¶
watlowlib.streaming ¶
Streaming primitives — :func:record + :class:Sample.
The streaming layer drives a :class:PollSource (a
:class:~watlowlib.devices.controller.Controller or
:class:~watlowlib.manager.WatlowManager) at an absolute-target
cadence and publishes :class:Sample batches into an async receive
stream. Pair with :func:watlowlib.sinks.pipe to drain into a
:class:~watlowlib.sinks.SampleSink.
Design reference: docs/design.md §6.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
tick_duration_ms_p50=0.0,
tick_duration_ms_p99=0.0,
disconnects=0,
)
Per-run summary owned and mutated by the recorder.
Mutability contract (§M of UNIFIED_API_HANDOFF.md): the recorder is the only writer. Counters update in place during the run so progress-polling consumers (TUIs, dashboards) see live values. Consumers must treat this object as read-only.
finished_at is None while the recording is in flight and
is set on context-manager exit. Percentile fields
(tick_duration_ms_p50 / p99) are materialized at exit
only because percentiles are batch-computed; the in-flight
counters reflect the latest observation.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown, or |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late. |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
tick_duration_ms_p50 |
float
|
Median wall-clock duration of a single
|
tick_duration_ms_p99 |
float
|
99th-percentile tick duration, in milliseconds. Set on exit only. Surfaces rare-but-bad cases where a tick stalled behind a contended port lock or a slow EEPROM commit. |
disconnects |
int
|
Count of WatlowConnectionError events the
producer absorbed under |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch and enqueue the newest. Useful for real-time monitoring where the latest reading matters more than historical buffer contents. Each evicted batch is counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
Both :class:~watlowlib.devices.controller.Controller (solo) and
:class:~watlowlib.manager.WatlowManager (multi-device) satisfy
this Protocol. Using a Protocol keeps :func:record testable
against a lightweight stub without standing up a full controller +
transport pipeline.
The contract is intentionally narrow: per call, return a flat
:class:~collections.abc.Sequence of :class:Sample\ s — one
per (device, parameter) read that succeeded. Failed reads are
dropped from the batch and logged by the source; the recorder
never sees them.
poll_many
async
¶
Read every parameters × instances combination on every device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs. |
required |
names
|
Sequence[str] | None
|
Subset of device names to poll (Manager-only;
Controller ignores). |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device.
Single-loop devices use |
(1,)
|
Returns:
| Type | Description |
|---|---|
Sequence[Sample]
|
A flat :class: |
Sequence[Sample]
|
every poll failed. |
Source code in src/watlowlib/streaming/recorder.py
PollSourceAdapter ¶
Wrap one :class:Controller as a named :class:PollSource.
Implements poll_many(parameters, *, names=None, instances=(1,))
-> Sequence[Sample] — the watlow recorder Protocol. When
names is supplied and does not contain this adapter's name,
the call short-circuits to an empty list (cross-library Manager-
style filter semantics).
Sample relabeling: :meth:Controller.poll_many tags each
:class:Sample with the transport label by default. This adapter
rebuilds each sample via :func:dataclasses.replace to set
:attr:Sample.device to the caller-provided name. Cost is
negligible at typical watlow rates (1–5 Hz × small parameter
sets) and stays well under 1 ms/tick at high parameter counts.
Source code in src/watlowlib/streaming/adapter.py
poll_many
async
¶
Poll the wrapped controller and relabel each emitted sample.
Returns [] when names is supplied and this adapter's
:attr:name is not in it — same filter semantics as
:meth:WatlowManager.poll_many.
Source code in src/watlowlib/streaming/adapter.py
Recording
dataclass
¶
Container yielded by :func:record — stream + live summary + rate.
Cross-library shape (alicat / sartorius / watlow / nidaq) so
consumers consume the same recording.stream /
recording.summary / recording.rate_hz accessors regardless
of vendor.
Per-library payload (the T parameter):
- alicat / sartorius:
Recording[Mapping[str, Sample]] - watlow:
Recording[Sequence[Sample]]— per-tick batches - nidaq:
Recording[DaqReading](polled) /Recording[DaqBlock](block)
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
AsyncIterator[T]
|
Async iterator of per-tick payloads. Consume with
|
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
The cadence the recorder is running at, captured at
|
Sample
dataclass
¶
Sample(
device,
address,
protocol,
parameter,
parameter_id,
instance,
value,
unit,
t_mono_ns,
t_utc,
t_midpoint_mono_ns,
requested_at,
received_at,
latency_s,
raw,
)
One parameter read with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks. |
address |
int
|
Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247. |
protocol |
ProtocolKind
|
Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row. |
parameter |
str
|
Canonical parameter name (e.g. |
parameter_id |
int
|
Registry parameter id (e.g. |
instance |
int
|
1-indexed loop / channel selector used for the read. |
value |
float | int | str | bool | None
|
The decoded scalar. |
unit |
Unit | str | None
|
Concrete :class: |
t_mono_ns |
int
|
:func: |
t_utc |
datetime
|
Wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Preferred point estimate when aligning Watlow samples against other sensor streams in human time. |
t_midpoint_mono_ns |
int | None
|
Optional integration-window midpoint in
monotonic nanoseconds. |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
latency_s |
float
|
|
raw |
bytes
|
The wire payload that produced the value. Available for diagnostics; tabular sinks drop it. |
record
async
¶
record(
source,
*,
parameters,
rate_hz,
duration=None,
names=None,
instances=(1,),
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
auto_reconnect=False,
reconnect_factory=None,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(
controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as recording:
async for batch in recording.stream:
for sample in batch:
print(sample.parameter, sample.value)
# recording.summary is live; recording.summary.finished_at is None
# while running and set on CM exit.
The CM yields a :class:Recording[Sequence[Sample]] exposing
.stream (async iterator of per-tick :class:Sample batches),
.summary (live :class:AcquisitionSummary — recorder is sole
writer), and .rate_hz (the cadence the recorder is running
at).
Each batch is a flat :class:Sequence — one entry per (device,
parameter, instance) read that succeeded. Failed reads are dropped
by the source and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs to poll each tick. |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device. Single-
loop devices use |
(1,)
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
auto_reconnect
|
bool
|
When |
False
|
reconnect_factory
|
Callable[[], Awaitable[PollSource]] | None
|
When supplied alongside |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Sequence[Sample]]]
|
class: |
AsyncGenerator[Recording[Sequence[Sample]]]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/watlowlib/streaming/recorder.py
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 | |
Sample¶
watlowlib.streaming.sample ¶
Timed sample — one parameter read with send/receive provenance.
A :class:Sample is the unit the recorder emits into its memory-object
stream. Watlow polls a small group of parameters per device per tick
(unlike Alicat, which returns one wide DataFrame per poll), so a
recorder tick produces N×M samples — one per (device, parameter) pair
that succeeded.
Timestamp contract (uniform across the sibling libraries):
t_mono_ns— :func:time.monotonic_nsmidpoint of the request/ reply round-trip; canonical join key for cross-stream alignment (monotonic, never wall-clock).t_utc— wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Used for human-readable sink timestamps.t_midpoint_mono_ns— optional integration-window midpoint in monotonic nanoseconds. For polled reads this isNone; sensors with integration windows (e.g. multi-sample averages) populate it.
I/O provenance stays alongside the canonical timestamps:
requested_at / received_at / latency_s are the per-round-
trip wire boundaries, available for diagnostics but not the join key.
The shape is deliberately long-format (one row per parameter) so the SQLite cross-vendor test can union Watlow rows with Alicat rows under one schema.
Design reference: docs/design.md §6.
Sample
dataclass
¶
Sample(
device,
address,
protocol,
parameter,
parameter_id,
instance,
value,
unit,
t_mono_ns,
t_utc,
t_midpoint_mono_ns,
requested_at,
received_at,
latency_s,
raw,
)
One parameter read with full timing provenance.
Attributes:
| Name | Type | Description |
|---|---|---|
device |
str
|
Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks. |
address |
int
|
Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247. |
protocol |
ProtocolKind
|
Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row. |
parameter |
str
|
Canonical parameter name (e.g. |
parameter_id |
int
|
Registry parameter id (e.g. |
instance |
int
|
1-indexed loop / channel selector used for the read. |
value |
float | int | str | bool | None
|
The decoded scalar. |
unit |
Unit | str | None
|
Concrete :class: |
t_mono_ns |
int
|
:func: |
t_utc |
datetime
|
Wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Preferred point estimate when aligning Watlow samples against other sensor streams in human time. |
t_midpoint_mono_ns |
int | None
|
Optional integration-window midpoint in
monotonic nanoseconds. |
requested_at |
datetime
|
Wall-clock |
received_at |
datetime
|
Wall-clock |
latency_s |
float
|
|
raw |
bytes
|
The wire payload that produced the value. Available for diagnostics; tabular sinks drop it. |
Recorder¶
watlowlib.streaming.recorder ¶
Absolute-target recorder — record() emits timed :class:Sample batches.
:func:record is the v1 acquisition primitive. It drives a
:class:PollSource (an opened :class:~watlowlib.devices.controller.Controller
or a :class:~watlowlib.manager.WatlowManager) at an absolute-target
cadence and publishes the polled :class:Sample values into an
:class:anyio.abc.ObjectReceiveStream as per-tick batches.
Key invariants:
- Absolute-target scheduling. Target times are computed from
:func:
anyio.current_timeatrecord()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates.anyio.sleep_untiladvances to the next target slot; overruns skip missed slots and incrementsamples_late. - Structured concurrency. The producer task lives strictly inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns.
- Wall-clock provenance.
datetime.now(UTC)is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample— used for sink timestamps, never for scheduling. - Backpressure.
buffer_sizesets the memory-object stream capacity; :class:OverflowPolicycontrols what happens when the producer wants to enqueue but the consumer is behind.
The recorder consumes a :class:PollSource — a narrow Protocol both
:class:~watlowlib.devices.controller.Controller and
:class:~watlowlib.manager.WatlowManager satisfy. Kept as a Protocol
so the recorder is unit-testable against a lightweight stub.
Design reference: docs/design.md §6.
AcquisitionSummary
dataclass
¶
AcquisitionSummary(
started_at,
finished_at=None,
samples_emitted=0,
samples_late=0,
max_drift_ms=0.0,
tick_duration_ms_p50=0.0,
tick_duration_ms_p99=0.0,
disconnects=0,
)
Per-run summary owned and mutated by the recorder.
Mutability contract (§M of UNIFIED_API_HANDOFF.md): the recorder is the only writer. Counters update in place during the run so progress-polling consumers (TUIs, dashboards) see live values. Consumers must treat this object as read-only.
finished_at is None while the recording is in flight and
is set on context-manager exit. Percentile fields
(tick_duration_ms_p50 / p99) are materialized at exit
only because percentiles are batch-computed; the in-flight
counters reflect the latest observation.
Attributes:
| Name | Type | Description |
|---|---|---|
started_at |
datetime
|
Wall-clock at the first scheduled tick. |
finished_at |
datetime | None
|
Wall-clock at producer shutdown, or |
samples_emitted |
int
|
Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch. |
samples_late |
int
|
Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late. |
max_drift_ms |
float
|
Largest observed positive drift of an emitted
batch relative to its absolute target, in milliseconds.
A healthy run stays well under one period; values
approaching |
tick_duration_ms_p50 |
float
|
Median wall-clock duration of a single
|
tick_duration_ms_p99 |
float
|
99th-percentile tick duration, in milliseconds. Set on exit only. Surfaces rare-but-bad cases where a tick stalled behind a contended port lock or a slow EEPROM commit. |
disconnects |
int
|
Count of WatlowConnectionError events the
producer absorbed under |
OverflowPolicy ¶
Bases: Enum
What record() does when the receive-stream buffer is full.
The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.
BLOCK
class-attribute
instance-attribute
¶
Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.
DROP_NEWEST
class-attribute
instance-attribute
¶
Drop the batch that was about to be enqueued. Counted as late.
DROP_OLDEST
class-attribute
instance-attribute
¶
Evict the oldest queued batch and enqueue the newest. Useful for real-time monitoring where the latest reading matters more than historical buffer contents. Each evicted batch is counted as late.
PollSource ¶
Bases: Protocol
Minimal shape the recorder needs from its dispatcher.
Both :class:~watlowlib.devices.controller.Controller (solo) and
:class:~watlowlib.manager.WatlowManager (multi-device) satisfy
this Protocol. Using a Protocol keeps :func:record testable
against a lightweight stub without standing up a full controller +
transport pipeline.
The contract is intentionally narrow: per call, return a flat
:class:~collections.abc.Sequence of :class:Sample\ s — one
per (device, parameter) read that succeeded. Failed reads are
dropped from the batch and logged by the source; the recorder
never sees them.
poll_many
async
¶
Read every parameters × instances combination on every device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs. |
required |
names
|
Sequence[str] | None
|
Subset of device names to poll (Manager-only;
Controller ignores). |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device.
Single-loop devices use |
(1,)
|
Returns:
| Type | Description |
|---|---|
Sequence[Sample]
|
A flat :class: |
Sequence[Sample]
|
every poll failed. |
Source code in src/watlowlib/streaming/recorder.py
Recording
dataclass
¶
Container yielded by :func:record — stream + live summary + rate.
Cross-library shape (alicat / sartorius / watlow / nidaq) so
consumers consume the same recording.stream /
recording.summary / recording.rate_hz accessors regardless
of vendor.
Per-library payload (the T parameter):
- alicat / sartorius:
Recording[Mapping[str, Sample]] - watlow:
Recording[Sequence[Sample]]— per-tick batches - nidaq:
Recording[DaqReading](polled) /Recording[DaqBlock](block)
Attributes:
| Name | Type | Description |
|---|---|---|
stream |
AsyncIterator[T]
|
Async iterator of per-tick payloads. Consume with
|
summary |
AcquisitionSummary
|
Live :class: |
rate_hz |
float
|
The cadence the recorder is running at, captured at
|
record
async
¶
record(
source,
*,
parameters,
rate_hz,
duration=None,
names=None,
instances=(1,),
overflow=OverflowPolicy.BLOCK,
buffer_size=64,
auto_reconnect=False,
reconnect_factory=None,
)
Record polled samples into a receive stream at an absolute cadence.
Usage::
async with record(
controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as recording:
async for batch in recording.stream:
for sample in batch:
print(sample.parameter, sample.value)
# recording.summary is live; recording.summary.finished_at is None
# while running and set on CM exit.
The CM yields a :class:Recording[Sequence[Sample]] exposing
.stream (async iterator of per-tick :class:Sample batches),
.summary (live :class:AcquisitionSummary — recorder is sole
writer), and .rate_hz (the cadence the recorder is running
at).
Each batch is a flat :class:Sequence — one entry per (device,
parameter, instance) read that succeeded. Failed reads are dropped
by the source and logged at WARN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
PollSource
|
Any :class: |
required |
parameters
|
Sequence[str | int]
|
Parameter names or registry IDs to poll each tick. |
required |
rate_hz
|
float
|
Target cadence. Absolute targets are computed
|
required |
duration
|
float | None
|
Total acquisition duration in seconds. |
None
|
names
|
Sequence[str] | None
|
Subset of device names to poll per tick. |
None
|
instances
|
Sequence[int]
|
1-indexed loop / channel numbers per device. Single-
loop devices use |
(1,)
|
overflow
|
OverflowPolicy
|
Backpressure policy when the receive-stream buffer
is full. See :class: |
BLOCK
|
buffer_size
|
int
|
Receive-stream capacity, in per-tick batches. |
64
|
auto_reconnect
|
bool
|
When |
False
|
reconnect_factory
|
Callable[[], Awaitable[PollSource]] | None
|
When supplied alongside |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncGenerator[Recording[Sequence[Sample]]]
|
class: |
AsyncGenerator[Recording[Sequence[Sample]]]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
|
Source code in src/watlowlib/streaming/recorder.py
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 | |