Skip to content

watlowlib.streaming

Sample, record(), OverflowPolicy, AcquisitionSummary, and the PollSource Protocol that the recorder drives. See Streaming and Logging and acquisition.

Public surface

watlowlib.streaming

Streaming primitives — :func:record + :class:Sample.

The streaming layer drives a :class:PollSource (a :class:~watlowlib.devices.controller.Controller or :class:~watlowlib.manager.WatlowManager) at an absolute-target cadence and publishes :class:Sample batches into an async receive stream. Pair with :func:watlowlib.sinks.pipe to drain into a :class:~watlowlib.sinks.SampleSink.

Design reference: docs/design.md §6.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    tick_duration_ms_p50=0.0,
    tick_duration_ms_p99=0.0,
    disconnects=0,
)

Per-run summary owned and mutated by the recorder.

Mutability contract (§M of UNIFIED_API_HANDOFF.md): the recorder is the only writer. Counters update in place during the run so progress-polling consumers (TUIs, dashboards) see live values. Consumers must treat this object as read-only.

finished_at is None while the recording is in flight and is set on context-manager exit. Percentile fields (tick_duration_ms_p50 / p99) are materialized at exit only because percentiles are batch-computed; the in-flight counters reflect the latest observation.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown, or None while running.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

tick_duration_ms_p50 float

Median wall-clock duration of a single source.poll_many(...) call across the run, in milliseconds. Set on exit only. Compares directly to 1000 / rate_hz — if it approaches the period, the schedule is saturated.

tick_duration_ms_p99 float

99th-percentile tick duration, in milliseconds. Set on exit only. Surfaces rare-but-bad cases where a tick stalled behind a contended port lock or a slow EEPROM commit.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch and enqueue the newest. Useful for real-time monitoring where the latest reading matters more than historical buffer contents. Each evicted batch is counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

Both :class:~watlowlib.devices.controller.Controller (solo) and :class:~watlowlib.manager.WatlowManager (multi-device) satisfy this Protocol. Using a Protocol keeps :func:record testable against a lightweight stub without standing up a full controller + transport pipeline.

The contract is intentionally narrow: per call, return a flat :class:~collections.abc.Sequence of :class:Sample\ s — one per (device, parameter) read that succeeded. Failed reads are dropped from the batch and logged by the source; the recorder never sees them.

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Read every parameters × instances combination on every device.

Parameters:

Name Type Description Default
parameters Sequence[str | int]

Parameter names or registry IDs.

required
names Sequence[str] | None

Subset of device names to poll (Manager-only; Controller ignores). None polls everything the source manages.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single-loop devices use (1,) (the default).

(1,)

Returns:

Type Description
Sequence[Sample]

A flat :class:Sequence of :class:Sample. Empty when

Sequence[Sample]

every poll failed.

Source code in src/watlowlib/streaming/recorder.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> Sequence[Sample]:
    """Read every ``parameters`` × ``instances`` combination on every device.

    Args:
        parameters: Parameter names or registry IDs.
        names: Subset of device names to poll (Manager-only;
            Controller ignores). ``None`` polls everything the
            source manages.
        instances: 1-indexed loop / channel numbers per device.
            Single-loop devices use ``(1,)`` (the default).

    Returns:
        A flat :class:`Sequence` of :class:`Sample`. Empty when
        every poll failed.
    """
    ...

PollSourceAdapter

PollSourceAdapter(name, device)

Wrap one :class:Controller as a named :class:PollSource.

Implements poll_many(parameters, *, names=None, instances=(1,)) -> Sequence[Sample] — the watlow recorder Protocol. When names is supplied and does not contain this adapter's name, the call short-circuits to an empty list (cross-library Manager- style filter semantics).

Sample relabeling: :meth:Controller.poll_many tags each :class:Sample with the transport label by default. This adapter rebuilds each sample via :func:dataclasses.replace to set :attr:Sample.device to the caller-provided name. Cost is negligible at typical watlow rates (1–5 Hz × small parameter sets) and stays well under 1 ms/tick at high parameter counts.

Source code in src/watlowlib/streaming/adapter.py
def __init__(self, name: str, device: Controller) -> None:
    self._name = name
    self._device = device

device property

device

The wrapped :class:Controller.

name property

name

The caller-provided device name attached to every emitted sample.

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Poll the wrapped controller and relabel each emitted sample.

Returns [] when names is supplied and this adapter's :attr:name is not in it — same filter semantics as :meth:WatlowManager.poll_many.

Source code in src/watlowlib/streaming/adapter.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Poll the wrapped controller and relabel each emitted sample.

    Returns ``[]`` when ``names`` is supplied and this adapter's
    :attr:`name` is not in it — same filter semantics as
    :meth:`WatlowManager.poll_many`.
    """
    if names is not None and self._name not in set(names):
        return []
    samples = await self._device.poll_many(parameters, instances=instances)
    return [dataclasses.replace(sample, device=self._name) for sample in samples]

Recording dataclass

Recording(stream, summary, rate_hz)

Container yielded by :func:record — stream + live summary + rate.

Cross-library shape (alicat / sartorius / watlow / nidaq) so consumers consume the same recording.stream / recording.summary / recording.rate_hz accessors regardless of vendor.

Per-library payload (the T parameter):

  • alicat / sartorius: Recording[Mapping[str, Sample]]
  • watlow: Recording[Sequence[Sample]] — per-tick batches
  • nidaq: Recording[DaqReading] (polled) / Recording[DaqBlock] (block)

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick payloads. Consume with async for batch in recording.stream.

summary AcquisitionSummary

Live :class:AcquisitionSummary — the recorder mutates this in place; consumers read. summary.finished_at is None while running and is populated on context-manager exit.

rate_hz float

The cadence the recorder is running at, captured at record() entry. Useful for queue-sizing downstream buffers.

Sample dataclass

Sample(
    device,
    address,
    protocol,
    parameter,
    parameter_id,
    instance,
    value,
    unit,
    t_mono_ns,
    t_utc,
    t_midpoint_mono_ns,
    requested_at,
    received_at,
    latency_s,
    raw,
)

One parameter read with full timing provenance.

Attributes:

Name Type Description
device str

Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks.

address int

Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247.

protocol ProtocolKind

Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row.

parameter str

Canonical parameter name (e.g. "process_value").

parameter_id int

Registry parameter id (e.g. 4001).

instance int

1-indexed loop / channel selector used for the read.

value float | int | str | bool | None

The decoded scalar. None when the device reported the value as unavailable (sensor-fail, overload, ...).

unit Unit | str | None

Concrete :class:Unit from the Watlow polling path, a free-form string for cross-vendor rows (Alicat's "psia", "sccm" etc. via examples/mixed_watlow_alicat_sqlite.py), or None when the parameter has no unit (counts, IDs, time constants). The Watlow recorder always populates a :class:Unit; the str branch only fires for hand-built cross-vendor samples.

t_mono_ns int

:func:time.monotonic_ns midpoint of the request/ reply round-trip — canonical join key. Monotonic since OS boot; no calendar meaning.

t_utc datetime

Wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Preferred point estimate when aligning Watlow samples against other sensor streams in human time.

t_midpoint_mono_ns int | None

Optional integration-window midpoint in monotonic nanoseconds. None for single polled reads; sensors that average over a window populate it.

requested_at datetime

Wall-clock datetime (UTC) captured just before the read leaves the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is decoded.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

raw bytes

The wire payload that produced the value. Available for diagnostics; tabular sinks drop it.

record async

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(
    controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as recording:
    async for batch in recording.stream:
        for sample in batch:
            print(sample.parameter, sample.value)
    # recording.summary is live; recording.summary.finished_at is None
    # while running and set on CM exit.

The CM yields a :class:Recording[Sequence[Sample]] exposing .stream (async iterator of per-tick :class:Sample batches), .summary (live :class:AcquisitionSummary — recorder is sole writer), and .rate_hz (the cadence the recorder is running at).

Each batch is a flat :class:Sequence — one entry per (device, parameter, instance) read that succeeded. Failed reads are dropped by the source and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (a :class:Controller or a :class:WatlowManager).

required
parameters Sequence[str | int]

Parameter names or registry IDs to poll each tick.

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages. Ignored for solo controllers.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single- loop devices use (1,).

(1,)
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

When True, treat :class:WatlowConnectionError raised by source.poll_many as a transient transport drop rather than a fatal error. The producer logs recorder.disconnected, waits per the backoff schedule, and either rebuilds the source via reconnect_factory (if supplied) or simply retries the same source.poll_many on the next tick. samples_late ticks up for each tick missed during the gap.

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

When supplied alongside auto_reconnect, invoked to rebuild the :class:PollSource after a disconnect. Useful when the source's transport needs to be re-opened explicitly (e.g. a fresh :func:watlowlib.open_device call). The returned source replaces source for subsequent ticks. Without a factory, the recorder relies on source.poll_many itself to recover (which works for callers that wrap their own transport-reopen logic inside poll_many).

None

Yields:

Name Type Description
A AsyncGenerator[Recording[Sequence[Sample]]]

class:Recording[Sequence[Sample]] exposing .stream,

AsyncGenerator[Recording[Sequence[Sample]]]

.summary, and .rate_hz.

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/watlowlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[Recording[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(
            controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
        ) as recording:
            async for batch in recording.stream:
                for sample in batch:
                    print(sample.parameter, sample.value)
            # recording.summary is live; recording.summary.finished_at is None
            # while running and set on CM exit.

    The CM yields a :class:`Recording[Sequence[Sample]]` exposing
    ``.stream`` (async iterator of per-tick :class:`Sample` batches),
    ``.summary`` (live :class:`AcquisitionSummary` — recorder is sole
    writer), and ``.rate_hz`` (the cadence the recorder is running
    at).

    Each batch is a flat :class:`Sequence` — one entry per (device,
    parameter, instance) read that succeeded. Failed reads are dropped
    by the source and logged at WARN.

    Args:
        source: Any :class:`PollSource` (a :class:`Controller` or a
            :class:`WatlowManager`).
        parameters: Parameter names or registry IDs to poll each tick.
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages. Ignored for solo controllers.
        instances: 1-indexed loop / channel numbers per device. Single-
            loop devices use ``(1,)``.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: When ``True``, treat
            :class:`WatlowConnectionError` raised by ``source.poll_many``
            as a transient transport drop rather than a fatal error.
            The producer logs ``recorder.disconnected``, waits per the
            backoff schedule, and either rebuilds the source via
            ``reconnect_factory`` (if supplied) or simply retries the
            same ``source.poll_many`` on the next tick. ``samples_late``
            ticks up for each tick missed during the gap.
        reconnect_factory: When supplied alongside ``auto_reconnect``,
            invoked to rebuild the :class:`PollSource` after a
            disconnect. Useful when the source's transport needs to be
            re-opened explicitly (e.g. a fresh
            :func:`watlowlib.open_device` call). The returned source
            replaces ``source`` for subsequent ticks. Without a
            factory, the recorder relies on ``source.poll_many`` itself to
            recover (which works for callers that wrap their own
            transport-reopen logic inside ``poll_many``).

    Yields:
        A :class:`Recording[Sequence[Sample]]` exposing ``.stream``,
        ``.summary``, and ``.rate_hz``.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if not parameters:
        raise ValueError("parameters must be a non-empty sequence")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    # Producer-side clone of the receive stream — used to evict the
    # oldest queued batch under DROP_OLDEST. Cloning here (before the
    # consumer starts iterating) keeps the eviction path off the
    # consumer's iterator and avoids racing with it.
    drop_rx = receive_stream.clone()

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    tick_durations_ms: list[float] = []
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    async with anyio.create_task_group() as tg, receive_stream:

        async def _producer_entrypoint() -> None:
            await _run_producer(
                source,
                send_stream,
                drop_rx,
                tuple(parameters),
                tuple(instances),
                names,
                period,
                total_ticks,
                overflow,
                summary,
                tick_durations_ms,
                auto_reconnect=auto_reconnect,
                reconnect_factory=reconnect_factory,
            )

        # ``start_soon`` returns a TaskHandle (anyio >= 4.14); the producer is
        # fire-and-forget and cancelled via the group below, so discard it.
        _ = tg.start_soon(_producer_entrypoint)
        try:
            yield Recording(stream=receive_stream, summary=summary, rate_hz=rate_hz)
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with``.
            tg.cancel()

    finished_at = datetime.now(UTC)
    p50, p99 = _tick_percentiles(tick_durations_ms)
    summary.finished_at = finished_at
    summary.tick_duration_ms_p50 = p50
    summary.tick_duration_ms_p99 = p99
    _logger.info(
        "recorder.stop emitted=%s late=%s max_drift_ms=%.3f "
        "tick_p50_ms=%.3f tick_p99_ms=%.3f duration_s=%.3f",
        summary.samples_emitted,
        summary.samples_late,
        summary.max_drift_ms,
        summary.tick_duration_ms_p50,
        summary.tick_duration_ms_p99,
        (finished_at - started_at).total_seconds(),
    )

Sample

watlowlib.streaming.sample

Timed sample — one parameter read with send/receive provenance.

A :class:Sample is the unit the recorder emits into its memory-object stream. Watlow polls a small group of parameters per device per tick (unlike Alicat, which returns one wide DataFrame per poll), so a recorder tick produces N×M samples — one per (device, parameter) pair that succeeded.

Timestamp contract (uniform across the sibling libraries):

  • t_mono_ns — :func:time.monotonic_ns midpoint of the request/ reply round-trip; canonical join key for cross-stream alignment (monotonic, never wall-clock).
  • t_utc — wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Used for human-readable sink timestamps.
  • t_midpoint_mono_ns — optional integration-window midpoint in monotonic nanoseconds. For polled reads this is None; sensors with integration windows (e.g. multi-sample averages) populate it.

I/O provenance stays alongside the canonical timestamps: requested_at / received_at / latency_s are the per-round- trip wire boundaries, available for diagnostics but not the join key.

The shape is deliberately long-format (one row per parameter) so the SQLite cross-vendor test can union Watlow rows with Alicat rows under one schema.

Design reference: docs/design.md §6.

Sample dataclass

Sample(
    device,
    address,
    protocol,
    parameter,
    parameter_id,
    instance,
    value,
    unit,
    t_mono_ns,
    t_utc,
    t_midpoint_mono_ns,
    requested_at,
    received_at,
    latency_s,
    raw,
)

One parameter read with full timing provenance.

Attributes:

Name Type Description
device str

Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks.

address int

Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247.

protocol ProtocolKind

Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row.

parameter str

Canonical parameter name (e.g. "process_value").

parameter_id int

Registry parameter id (e.g. 4001).

instance int

1-indexed loop / channel selector used for the read.

value float | int | str | bool | None

The decoded scalar. None when the device reported the value as unavailable (sensor-fail, overload, ...).

unit Unit | str | None

Concrete :class:Unit from the Watlow polling path, a free-form string for cross-vendor rows (Alicat's "psia", "sccm" etc. via examples/mixed_watlow_alicat_sqlite.py), or None when the parameter has no unit (counts, IDs, time constants). The Watlow recorder always populates a :class:Unit; the str branch only fires for hand-built cross-vendor samples.

t_mono_ns int

:func:time.monotonic_ns midpoint of the request/ reply round-trip — canonical join key. Monotonic since OS boot; no calendar meaning.

t_utc datetime

Wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Preferred point estimate when aligning Watlow samples against other sensor streams in human time.

t_midpoint_mono_ns int | None

Optional integration-window midpoint in monotonic nanoseconds. None for single polled reads; sensors that average over a window populate it.

requested_at datetime

Wall-clock datetime (UTC) captured just before the read leaves the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is decoded.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

raw bytes

The wire payload that produced the value. Available for diagnostics; tabular sinks drop it.

Recorder

watlowlib.streaming.recorder

Absolute-target recorder — record() emits timed :class:Sample batches.

:func:record is the v1 acquisition primitive. It drives a :class:PollSource (an opened :class:~watlowlib.devices.controller.Controller or a :class:~watlowlib.manager.WatlowManager) at an absolute-target cadence and publishes the polled :class:Sample values into an :class:anyio.abc.ObjectReceiveStream as per-tick batches.

Key invariants:

  • Absolute-target scheduling. Target times are computed from :func:anyio.current_time at record()-entry, not from a running monotonic; drift across cycles is bounded by one tick and never accumulates. anyio.sleep_until advances to the next target slot; overruns skip missed slots and increment samples_late.
  • Structured concurrency. The producer task lives strictly inside the async CM body. The CM yields the receive stream, user code iterates it, and on CM exit the task group is cancelled and joined before the CM returns.
  • Wall-clock provenance. datetime.now(UTC) is captured at the send/receive boundaries of each device's poll and attached to the emitted :class:Sample — used for sink timestamps, never for scheduling.
  • Backpressure. buffer_size sets the memory-object stream capacity; :class:OverflowPolicy controls what happens when the producer wants to enqueue but the consumer is behind.

The recorder consumes a :class:PollSource — a narrow Protocol both :class:~watlowlib.devices.controller.Controller and :class:~watlowlib.manager.WatlowManager satisfy. Kept as a Protocol so the recorder is unit-testable against a lightweight stub.

Design reference: docs/design.md §6.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    tick_duration_ms_p50=0.0,
    tick_duration_ms_p99=0.0,
    disconnects=0,
)

Per-run summary owned and mutated by the recorder.

Mutability contract (§M of UNIFIED_API_HANDOFF.md): the recorder is the only writer. Counters update in place during the run so progress-polling consumers (TUIs, dashboards) see live values. Consumers must treat this object as read-only.

finished_at is None while the recording is in flight and is set on context-manager exit. Percentile fields (tick_duration_ms_p50 / p99) are materialized at exit only because percentiles are batch-computed; the in-flight counters reflect the latest observation.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown, or None while running.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

tick_duration_ms_p50 float

Median wall-clock duration of a single source.poll_many(...) call across the run, in milliseconds. Set on exit only. Compares directly to 1000 / rate_hz — if it approaches the period, the schedule is saturated.

tick_duration_ms_p99 float

99th-percentile tick duration, in milliseconds. Set on exit only. Surfaces rare-but-bad cases where a tick stalled behind a contended port lock or a slow EEPROM commit.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch and enqueue the newest. Useful for real-time monitoring where the latest reading matters more than historical buffer contents. Each evicted batch is counted as late.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

Both :class:~watlowlib.devices.controller.Controller (solo) and :class:~watlowlib.manager.WatlowManager (multi-device) satisfy this Protocol. Using a Protocol keeps :func:record testable against a lightweight stub without standing up a full controller + transport pipeline.

The contract is intentionally narrow: per call, return a flat :class:~collections.abc.Sequence of :class:Sample\ s — one per (device, parameter) read that succeeded. Failed reads are dropped from the batch and logged by the source; the recorder never sees them.

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Read every parameters × instances combination on every device.

Parameters:

Name Type Description Default
parameters Sequence[str | int]

Parameter names or registry IDs.

required
names Sequence[str] | None

Subset of device names to poll (Manager-only; Controller ignores). None polls everything the source manages.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single-loop devices use (1,) (the default).

(1,)

Returns:

Type Description
Sequence[Sample]

A flat :class:Sequence of :class:Sample. Empty when

Sequence[Sample]

every poll failed.

Source code in src/watlowlib/streaming/recorder.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> Sequence[Sample]:
    """Read every ``parameters`` × ``instances`` combination on every device.

    Args:
        parameters: Parameter names or registry IDs.
        names: Subset of device names to poll (Manager-only;
            Controller ignores). ``None`` polls everything the
            source manages.
        instances: 1-indexed loop / channel numbers per device.
            Single-loop devices use ``(1,)`` (the default).

    Returns:
        A flat :class:`Sequence` of :class:`Sample`. Empty when
        every poll failed.
    """
    ...

Recording dataclass

Recording(stream, summary, rate_hz)

Container yielded by :func:record — stream + live summary + rate.

Cross-library shape (alicat / sartorius / watlow / nidaq) so consumers consume the same recording.stream / recording.summary / recording.rate_hz accessors regardless of vendor.

Per-library payload (the T parameter):

  • alicat / sartorius: Recording[Mapping[str, Sample]]
  • watlow: Recording[Sequence[Sample]] — per-tick batches
  • nidaq: Recording[DaqReading] (polled) / Recording[DaqBlock] (block)

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick payloads. Consume with async for batch in recording.stream.

summary AcquisitionSummary

Live :class:AcquisitionSummary — the recorder mutates this in place; consumers read. summary.finished_at is None while running and is populated on context-manager exit.

rate_hz float

The cadence the recorder is running at, captured at record() entry. Useful for queue-sizing downstream buffers.

record async

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(
    controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as recording:
    async for batch in recording.stream:
        for sample in batch:
            print(sample.parameter, sample.value)
    # recording.summary is live; recording.summary.finished_at is None
    # while running and set on CM exit.

The CM yields a :class:Recording[Sequence[Sample]] exposing .stream (async iterator of per-tick :class:Sample batches), .summary (live :class:AcquisitionSummary — recorder is sole writer), and .rate_hz (the cadence the recorder is running at).

Each batch is a flat :class:Sequence — one entry per (device, parameter, instance) read that succeeded. Failed reads are dropped by the source and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (a :class:Controller or a :class:WatlowManager).

required
parameters Sequence[str | int]

Parameter names or registry IDs to poll each tick.

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages. Ignored for solo controllers.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single- loop devices use (1,).

(1,)
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

When True, treat :class:WatlowConnectionError raised by source.poll_many as a transient transport drop rather than a fatal error. The producer logs recorder.disconnected, waits per the backoff schedule, and either rebuilds the source via reconnect_factory (if supplied) or simply retries the same source.poll_many on the next tick. samples_late ticks up for each tick missed during the gap.

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

When supplied alongside auto_reconnect, invoked to rebuild the :class:PollSource after a disconnect. Useful when the source's transport needs to be re-opened explicitly (e.g. a fresh :func:watlowlib.open_device call). The returned source replaces source for subsequent ticks. Without a factory, the recorder relies on source.poll_many itself to recover (which works for callers that wrap their own transport-reopen logic inside poll_many).

None

Yields:

Name Type Description
A AsyncGenerator[Recording[Sequence[Sample]]]

class:Recording[Sequence[Sample]] exposing .stream,

AsyncGenerator[Recording[Sequence[Sample]]]

.summary, and .rate_hz.

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/watlowlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[Recording[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(
            controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
        ) as recording:
            async for batch in recording.stream:
                for sample in batch:
                    print(sample.parameter, sample.value)
            # recording.summary is live; recording.summary.finished_at is None
            # while running and set on CM exit.

    The CM yields a :class:`Recording[Sequence[Sample]]` exposing
    ``.stream`` (async iterator of per-tick :class:`Sample` batches),
    ``.summary`` (live :class:`AcquisitionSummary` — recorder is sole
    writer), and ``.rate_hz`` (the cadence the recorder is running
    at).

    Each batch is a flat :class:`Sequence` — one entry per (device,
    parameter, instance) read that succeeded. Failed reads are dropped
    by the source and logged at WARN.

    Args:
        source: Any :class:`PollSource` (a :class:`Controller` or a
            :class:`WatlowManager`).
        parameters: Parameter names or registry IDs to poll each tick.
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages. Ignored for solo controllers.
        instances: 1-indexed loop / channel numbers per device. Single-
            loop devices use ``(1,)``.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: When ``True``, treat
            :class:`WatlowConnectionError` raised by ``source.poll_many``
            as a transient transport drop rather than a fatal error.
            The producer logs ``recorder.disconnected``, waits per the
            backoff schedule, and either rebuilds the source via
            ``reconnect_factory`` (if supplied) or simply retries the
            same ``source.poll_many`` on the next tick. ``samples_late``
            ticks up for each tick missed during the gap.
        reconnect_factory: When supplied alongside ``auto_reconnect``,
            invoked to rebuild the :class:`PollSource` after a
            disconnect. Useful when the source's transport needs to be
            re-opened explicitly (e.g. a fresh
            :func:`watlowlib.open_device` call). The returned source
            replaces ``source`` for subsequent ticks. Without a
            factory, the recorder relies on ``source.poll_many`` itself to
            recover (which works for callers that wrap their own
            transport-reopen logic inside ``poll_many``).

    Yields:
        A :class:`Recording[Sequence[Sample]]` exposing ``.stream``,
        ``.summary``, and ``.rate_hz``.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if not parameters:
        raise ValueError("parameters must be a non-empty sequence")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    # Producer-side clone of the receive stream — used to evict the
    # oldest queued batch under DROP_OLDEST. Cloning here (before the
    # consumer starts iterating) keeps the eviction path off the
    # consumer's iterator and avoids racing with it.
    drop_rx = receive_stream.clone()

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    tick_durations_ms: list[float] = []
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    async with anyio.create_task_group() as tg, receive_stream:

        async def _producer_entrypoint() -> None:
            await _run_producer(
                source,
                send_stream,
                drop_rx,
                tuple(parameters),
                tuple(instances),
                names,
                period,
                total_ticks,
                overflow,
                summary,
                tick_durations_ms,
                auto_reconnect=auto_reconnect,
                reconnect_factory=reconnect_factory,
            )

        # ``start_soon`` returns a TaskHandle (anyio >= 4.14); the producer is
        # fire-and-forget and cancelled via the group below, so discard it.
        _ = tg.start_soon(_producer_entrypoint)
        try:
            yield Recording(stream=receive_stream, summary=summary, rate_hz=rate_hz)
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with``.
            tg.cancel()

    finished_at = datetime.now(UTC)
    p50, p99 = _tick_percentiles(tick_durations_ms)
    summary.finished_at = finished_at
    summary.tick_duration_ms_p50 = p50
    summary.tick_duration_ms_p99 = p99
    _logger.info(
        "recorder.stop emitted=%s late=%s max_drift_ms=%.3f "
        "tick_p50_ms=%.3f tick_p99_ms=%.3f duration_s=%.3f",
        summary.samples_emitted,
        summary.samples_late,
        summary.max_drift_ms,
        summary.tick_duration_ms_p50,
        summary.tick_duration_ms_p99,
        (finished_at - started_at).total_seconds(),
    )