Skip to content

watlowlib

The top-level package re-exports the public surface from each subpackage. Importing names from watlowlib directly is the supported form; importing from submodules is supported but not stability-pinned.

Public surface

watlowlib

watlowlib — Python library for Watlow temperature controllers.

Supports both wire protocols Watlow controllers expose:

  • Standard Bus: BACnet MS/TP outer framing with a small Watlow attribute service inside.
  • Modbus RTU: via the in-house anymodbus package.

The public API is semantic and protocol-neutral — a caller asks for read_pv(), set_setpoint(), read_parameter(); the session dispatches the Standard Bus or Modbus variant selected at open time. Both protocols decode into the same frozen Reading / ParameterEntry / DeviceInfo models.

Core API is async (built on anyio); :mod:watlowlib.sync provides a blocking facade for scripts, notebooks, and REPL use.

See docs/design.md for the architectural design.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    tick_duration_ms_p50=0.0,
    tick_duration_ms_p99=0.0,
    disconnects=0,
)

Per-run summary owned and mutated by the recorder.

Mutability contract (§M of UNIFIED_API_HANDOFF.md): the recorder is the only writer. Counters update in place during the run so progress-polling consumers (TUIs, dashboards) see live values. Consumers must treat this object as read-only.

finished_at is None while the recording is in flight and is set on context-manager exit. Percentile fields (tick_duration_ms_p50 / p99) are materialized at exit only because percentiles are batch-computed; the in-flight counters reflect the latest observation.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown, or None while running.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

tick_duration_ms_p50 float

Median wall-clock duration of a single source.poll_many(...) call across the run, in milliseconds. Set on exit only. Compares directly to 1000 / rate_hz — if it approaches the period, the schedule is saturated.

tick_duration_ms_p99 float

99th-percentile tick duration, in milliseconds. Set on exit only. Surfaces rare-but-bad cases where a tick stalled behind a contended port lock or a slow EEPROM commit.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

AlarmState dataclass

AlarmState(loop, high, low, silenced, raw_bits)

Decoded alarm bits for one loop.

Availability

Bases: StrEnum

Per-command session state.

Sticky for the session: once a command transitions to :attr:UNSUPPORTED, the session short-circuits subsequent invocations with a typed error pre-I/O. The transition table lives in docs/design.md §5b.

Capability

Bases: Flag

Coarse hardware capability bits.

Bits are derived from a decoded part number when one is available (see :func:watlowlib.registry.families.capabilities_for_part_number) and fall back to a per-family prior otherwise. The session widens the set at runtime when a command succeeds against a parameter that proves the capability.

The vocabulary is small on purpose — most Watlow gating is by :class:watlowlib.registry.families.ControllerFamily and by :attr:watlowlib.registry.parameters.ParameterSpec.parameter_id, not by per-feature bits. New bits are added when captured family behaviour requires them.

Controller

Controller(session, transport, *, serial_settings)

Async facade for a single Watlow controller.

Source code in src/watlowlib/devices/controller.py
def __init__(
    self,
    session: Session,
    transport: Transport,
    *,
    serial_settings: SerialSettings,
) -> None:
    self._session = session
    self._transport = transport
    self._serial_settings = serial_settings
    # Cached loop count populated by :meth:`identify`. ``None`` means
    # "we haven't asked yet" — :meth:`loop` then defers validation
    # to the registry's per-spec ``max_instance``. Concrete count is
    # what the part-number decoder produced from the captured part
    # string (PM3 → 1, PM6/8/9 + ``U`` control → 2, etc.).
    self._loops: int | None = None
    # Cached SKU-derived capabilities populated by :meth:`identify`.
    # ``None`` until the part number has been decoded; downstream
    # operations that gate on bits (cool-side PID, etc.) treat
    # ``None`` as "no information, no gate" so calls work pre-
    # identify without surprising the user.
    self._capabilities: Capability | None = None
    # Full cached :class:`DeviceInfo` populated by :meth:`identify`.
    # Drives :meth:`snapshot` so no wire I/O is needed to render
    # the controller's identity. ``None`` until identify runs.
    self._device_info: DeviceInfo | None = None

capabilities property

capabilities

Cached SKU capabilities (set after :meth:identify).

None pre-identify so capability-gated operations behave permissively until the part number is captured. After :meth:identify, callers can branch on :attr:Capability.HAS_COOLING etc. without re-issuing identify.

loops property

loops

Cached loop count (set after :meth:identify).

None until the device's part number has been decoded; :meth:loop accepts any 1-indexed value while loops is None and falls back to per-spec validation at the first wire call. After :meth:identify, loops reflects the decoded value.

serial_settings property

serial_settings

Serial framing the controller was opened with.

Exposed so an identity strategy (see :mod:watlowlib.devices.profile) can stamp it onto the :class:DeviceInfo it builds.

session property

session

Underlying session used for command dispatch.

close async

close()

Close the underlying transport and dispose the protocol client.

Source code in src/watlowlib/devices/controller.py
async def close(self) -> None:
    """Close the underlying transport and dispose the protocol client."""
    # The session holds a reference to the protocol client; dispose
    # it so any pending caller learns the controller is gone before
    # the transport close races them.
    try:
        self._session.dispose()
    finally:
        await self._transport.close()

identify async

identify(
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Read the identity parameters and return a :class:DeviceInfo.

Reads (in order): part number (1009), hardware id (1001), firmware id (1002), serial number. Missing secondary fields stay None and the result's :attr:DeviceInfo.health is promoted from :attr:DeviceHealth.OK to :attr:DeviceHealth.PARTIAL. If the part-number read itself fails, the result's health is :attr:DeviceHealth.FAILED and capability decoding is skipped (the family prior still applies).

Parameters:

Name Type Description Default
timeout float | None

Per-read timeout override.

None
strict bool

If True, raise the underlying error when the part-number read fails instead of returning a health=FAILED info. Use this in maintenance code paths that need to know the device actually answered before declaring success.

False
query_configured_protocol bool

If True, also read parameter 17009 (Protocol) and populate :attr:DeviceInfo.configured_protocol. Off by default because the read costs an extra round-trip; the maintenance verify pass and the discover CLI opt in.

False

Raises:

Type Description
WatlowError

When strict=True and the part-number read fails. The original transport / protocol error class is preserved.

Source code in src/watlowlib/devices/controller.py
async def identify(
    self,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Read the identity parameters and return a :class:`DeviceInfo`.

    Reads (in order): part number (1009), hardware id (1001),
    firmware id (1002), serial number. Missing secondary fields
    stay ``None`` and the result's :attr:`DeviceInfo.health` is
    promoted from :attr:`DeviceHealth.OK` to
    :attr:`DeviceHealth.PARTIAL`. If the part-number read itself
    fails, the result's health is :attr:`DeviceHealth.FAILED` and
    capability decoding is skipped (the family prior still
    applies).

    Args:
        timeout: Per-read timeout override.
        strict: If ``True``, raise the underlying error when the
            part-number read fails instead of returning a
            ``health=FAILED`` info. Use this in maintenance code
            paths that need to know the device actually answered
            before declaring success.
        query_configured_protocol: If ``True``, also read parameter
            17009 (Protocol) and populate
            :attr:`DeviceInfo.configured_protocol`. Off by default
            because the read costs an extra round-trip; the
            maintenance verify pass and the discover CLI opt in.

    Raises:
        WatlowError: When ``strict=True`` and the part-number read
            fails. The original transport / protocol error class
            is preserved.
    """
    # Device-neutral: the bound profile owns the family-specific
    # identity sequence (EZ-ZONE PM reads 1009/1001/1002/serial;
    # Series SD reads the numeric 10/11/13 + serial 7-8 + reg 18).
    info = await self._session.profile.identify(
        self,
        timeout=timeout,
        strict=strict,
        query_configured_protocol=query_configured_protocol,
    )
    # Cache for ``self.loop(n)``'s eager validator and ``snapshot``.
    # Identify is the canonical place that sets these — open()
    # doesn't have the identity yet, and a device's loop count /
    # capabilities never change mid-session.
    self._loops = info.loops
    self._capabilities = info.capabilities
    self._device_info = info
    return info

loop

loop(n)

Return a sub-facade bound to loop n (1-indexed).

n is validated eagerly when :attr:loops is known, otherwise per-spec max_instance validation kicks in at the first wire call. Multi-loop access is the public way to reach loop 2 on dual-loop devices — :meth:Controller.read_pv defaults to instance=1.

Source code in src/watlowlib/devices/controller.py
def loop(self, n: int) -> ControllerLoop:
    """Return a sub-facade bound to loop ``n`` (1-indexed).

    ``n`` is validated eagerly when :attr:`loops` is known,
    otherwise per-spec ``max_instance`` validation kicks in at the
    first wire call. Multi-loop access is the public way to reach
    loop 2 on dual-loop devices —
    :meth:`Controller.read_pv` defaults to ``instance=1``.
    """
    return ControllerLoop(self, n)

poll async

poll(*, instance=1, timeout=None)

Read the active process value — the canonical no-arg snapshot.

Equivalent to :meth:read_pv. The no-arg form aligns with the ecosystem poll() convention shared by alicatlib.Device, sartoriuslib.Balance, and nidaqlib.DaqSession: a single, default-shaped reading per call.

For multi-parameter polling use :meth:poll_many.

Source code in src/watlowlib/devices/controller.py
async def poll(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active process value — the canonical no-arg snapshot.

    Equivalent to :meth:`read_pv`. The no-arg form aligns with the
    ecosystem ``poll()`` convention shared by ``alicatlib.Device``,
    ``sartoriuslib.Balance``, and ``nidaqlib.DaqSession``: a
    single, default-shaped reading per call.

    For multi-parameter polling use :meth:`poll_many`.
    """
    return await self.read_pv(instance=instance, timeout=timeout)

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Read every (parameter × instance) and return them as :class:Sample\ s.

Satisfies the :class:watlowlib.streaming.PollSource Protocol so a solo :class:Controller can drive :func:watlowlib.streaming.record directly without a manager. names is accepted for Protocol compatibility but ignored — a Controller has only one device.

Failed reads are dropped from the returned list and logged at WARN. The recorder treats absence as "drop this row from the batch" and continues with the next tick.

Source code in src/watlowlib/devices/controller.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    r"""Read every (parameter × instance) and return them as :class:`Sample`\ s.

    Satisfies the :class:`watlowlib.streaming.PollSource` Protocol so
    a solo :class:`Controller` can drive :func:`watlowlib.streaming.record`
    directly without a manager. ``names`` is accepted for Protocol
    compatibility but ignored — a Controller has only one device.

    Failed reads are dropped from the returned list and logged at
    WARN. The recorder treats absence as "drop this row from the
    batch" and continues with the next tick.
    """
    del names  # solo controller has no name-keyed device map
    from watlowlib.streaming._poll import poll_controller  # noqa: PLC0415 — avoid cycle

    return await poll_controller(
        self,
        name=self._transport.label,
        parameters=parameters,
        instances=instances,
    )

read_comms_unit_label async

read_comms_unit_label(*, timeout=None)

Read (and cache) the value parameter 17050 reports.

Inspection / diagnostics helper. Does not drive :class:Reading.unit: on at least one PM3 firmware revision 17050 is a label-only register that changes the enum the device reports for itself but does not affect the scale of temperature values exchanged over comms.

To tell watlowlib what scale temperatures actually travel in over the wire, pass assert_wire_temperature_unit= to :func:watlowlib.open_device. That assertion is what feeds :class:Reading.unit.

Distinct from read_parameter("units"), which targets parameter 3005 (front-panel display). The two can disagree on a real device.

Returns None if the device doesn't report a known code.

Source code in src/watlowlib/devices/controller.py
async def read_comms_unit_label(self, *, timeout: float | None = None) -> Unit | None:
    """Read (and cache) the value parameter 17050 reports.

    Inspection / diagnostics helper. **Does not** drive
    :class:`Reading.unit`: on at least one PM3 firmware revision
    17050 is a label-only register that changes the enum the
    device reports for itself but does not affect the scale of
    temperature values exchanged over comms.

    To tell watlowlib what scale temperatures actually travel in
    over the wire, pass ``assert_wire_temperature_unit=`` to
    :func:`watlowlib.open_device`. That assertion is what feeds
    :class:`Reading.unit`.

    Distinct from ``read_parameter("units")``, which targets
    parameter 3005 (front-panel display). The two can disagree on
    a real device.

    Returns ``None`` if the device doesn't report a known code.
    """
    del timeout  # comms_unit_label() is cached + uses session defaults
    return await self._session.comms_unit_label()

read_parameter async

read_parameter(name_or_id, *, instance=1, timeout=None)

Read any registry parameter.

instance=1 is the default for single-loop devices and the first loop / channel on multi-loop devices.

Source code in src/watlowlib/devices/controller.py
async def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Read any registry parameter.

    ``instance=1`` is the default for single-loop devices and the
    first loop / channel on multi-loop devices.
    """
    return await self._session.execute(
        READ_PARAMETER,
        ReadParameterRequest(name_or_id, instance=instance),
        timeout=timeout,
    )

read_pv async

read_pv(*, instance=1, timeout=None)

Read the process value for instance (loop number, 1-indexed).

Source code in src/watlowlib/devices/controller.py
async def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the process value for ``instance`` (loop number, 1-indexed)."""
    entry = await self.read_parameter("process_value", instance=instance, timeout=timeout)
    return await reading_from_entry(self._session, entry)

read_setpoint async

read_setpoint(*, instance=1, timeout=None)

Read the active setpoint for instance.

Source code in src/watlowlib/devices/controller.py
async def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active setpoint for ``instance``."""
    entry = await self.read_parameter("setpoint", instance=instance, timeout=timeout)
    return await reading_from_entry(self._session, entry)

set_comms_unit_label async

set_comms_unit_label(unit, *, confirm=False, timeout=None)

Set parameter 17050 ("Communications - Display Units").

Accepts a :class:Unit or a case-insensitive string alias ("C" / "F" / "celsius" / "fahrenheit" / "degC" / "degF" / "°C" / "°F"). :attr:Unit.PERCENT is rejected pre-I/O — the register is temperature-only.

Raw enumeration codes (15 / 30) are not accepted here. Callers who want the lower-level path use write_parameter("display_units", 30).

.. warning::

On at least one PM3 firmware revision this register is
**label-only**: writing it changes the enum the device
reports when 17050 is read back, but does not change the
scale of temperature values exchanged over comms. This
setter therefore does **not** affect
:class:`Reading.unit`. To tell watlowlib what scale
temperatures are actually on, pass
``assert_wire_temperature_unit=`` to
:func:`watlowlib.open_device`.

Persistent write (parameter 17050 is RWE); pass confirm=True to acknowledge the EEPROM write. The session raises :class:WatlowConfirmationRequiredError pre-I/O if missing.

Returns the device-echoed label after the write. None if the device's echo decodes outside the known codes.

Source code in src/watlowlib/devices/controller.py
async def set_comms_unit_label(
    self,
    unit: Unit | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Unit | None:
    """Set parameter 17050 ("Communications - Display Units").

    Accepts a :class:`Unit` or a case-insensitive string alias
    (``"C"`` / ``"F"`` / ``"celsius"`` / ``"fahrenheit"`` /
    ``"degC"`` / ``"degF"`` / ``"°C"`` / ``"°F"``).
    :attr:`Unit.PERCENT` is rejected pre-I/O — the register is
    temperature-only.

    Raw enumeration codes (15 / 30) are not accepted here. Callers
    who want the lower-level path use
    ``write_parameter("display_units", 30)``.

    .. warning::

        On at least one PM3 firmware revision this register is
        **label-only**: writing it changes the enum the device
        reports when 17050 is read back, but does not change the
        scale of temperature values exchanged over comms. This
        setter therefore does **not** affect
        :class:`Reading.unit`. To tell watlowlib what scale
        temperatures are actually on, pass
        ``assert_wire_temperature_unit=`` to
        :func:`watlowlib.open_device`.

    Persistent write (parameter 17050 is RWE); pass ``confirm=True``
    to acknowledge the EEPROM write. The session raises
    :class:`WatlowConfirmationRequiredError` pre-I/O if missing.

    Returns the device-echoed label after the write. ``None`` if
    the device's echo decodes outside the known codes.
    """
    resolved = coerce_unit(unit)
    code = display_code_for_unit(resolved)
    if code is None:
        raise WatlowValidationError(
            "set_comms_unit_label accepts CELSIUS / FAHRENHEIT only; "
            "PERCENT is not a valid display-unit code",
        )
    # PERSISTENT write — session enforces ``confirm=True`` pre-I/O.
    await self.write_parameter(
        "display_units",
        code,
        confirm=confirm,
        timeout=timeout,
    )
    self._session.invalidate_comms_unit_label()
    return await self._session.comms_unit_label()

set_persistent_writes async

set_persistent_writes(
    enabled, *, confirm=False, timeout=None
)

Toggle whether subsequent writes persist to non-volatile memory.

Series-SD-specific. The SD persists every register write to EEPROM by default, so a high-rate writer (ramping setpoints, a tuning loop) can wear the EEPROM out and brick the controller. Writing 0 to register 17 keeps subsequent writes in RAM only; the device resets register 17 to 1 on every power cycle, so call set_persistent_writes(False) once after each power-up before a burst of writes (see sd_manual.txt p.84).

Parameters:

Name Type Description Default
enabled bool

True → persist writes to EEPROM (the power-on default); False → keep writes in RAM only (lost on power cycle, but spares the EEPROM).

required
confirm bool

The write itself is gated like any other parameter write — pass confirm=True to acknowledge it.

False
timeout float | None

Per-write timeout override.

None

Raises:

Type Description
WatlowConfirmationRequiredError

confirm is False.

WatlowValidationError

the bound profile's registry has no eeprom_write_enable parameter (e.g. an EZ-ZONE PM, which has no such register).

Source code in src/watlowlib/devices/controller.py
async def set_persistent_writes(
    self,
    enabled: bool,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    """Toggle whether subsequent writes persist to non-volatile memory.

    Series-SD-specific. The SD persists every register write to
    EEPROM by default, so a high-rate writer (ramping setpoints, a
    tuning loop) can wear the EEPROM out and brick the controller.
    Writing ``0`` to register 17 keeps subsequent writes in RAM
    only; the device resets register 17 to ``1`` on every power
    cycle, so call ``set_persistent_writes(False)`` once after each
    power-up before a burst of writes (see ``sd_manual.txt`` p.84).

    Args:
        enabled: ``True`` → persist writes to EEPROM (the power-on
            default); ``False`` → keep writes in RAM only (lost on
            power cycle, but spares the EEPROM).
        confirm: The write itself is gated like any other parameter
            write — pass ``confirm=True`` to acknowledge it.
        timeout: Per-write timeout override.

    Raises:
        WatlowConfirmationRequiredError: ``confirm`` is ``False``.
        WatlowValidationError: the bound profile's registry has no
            ``eeprom_write_enable`` parameter (e.g. an EZ-ZONE PM,
            which has no such register).
    """
    await self.write_parameter(
        "eeprom_write_enable",
        1 if enabled else 0,
        confirm=confirm,
        timeout=timeout,
    )

set_setpoint async

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Write the setpoint and return the device-echoed value as a :class:Reading.

Setpoint is RWES — pass confirm=True to acknowledge the EEPROM write. The returned reading is the device's echo of the value it accepted.

Source code in src/watlowlib/devices/controller.py
async def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write the setpoint and return the device-echoed value as a :class:`Reading`.

    Setpoint is RWES — pass ``confirm=True`` to acknowledge the
    EEPROM write. The returned reading is the device's echo of
    the value it accepted.
    """
    entry = await self.write_parameter(
        "setpoint",
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )
    return await reading_from_entry(self._session, entry)

snapshot async

snapshot(*, name=None)

Return an I/O-free :class:WatlowDeviceSnapshot.

Built from cached identity (populated by :meth:identify, which :func:watlowlib.open_device calls by default) plus the session's last error and per-command availability cache. Does not issue any reads — safe to call from monitoring loops at high cadence.

Parameters:

Name Type Description Default
name str | None

Override the snapshot's name field. Defaults to the controller's transport label, matching the manager-assigned name surfaced into emitted samples.

None
Source code in src/watlowlib/devices/controller.py
async def snapshot(self, *, name: str | None = None) -> WatlowDeviceSnapshot:
    """Return an I/O-free :class:`WatlowDeviceSnapshot`.

    Built from cached identity (populated by :meth:`identify`,
    which :func:`watlowlib.open_device` calls by default) plus
    the session's last error and per-command availability cache.
    Does **not** issue any reads — safe to call from monitoring
    loops at high cadence.

    Args:
        name: Override the snapshot's ``name`` field. Defaults to
            the controller's transport label, matching the
            manager-assigned name surfaced into emitted samples.
    """
    info = self._device_info
    model = info.part_number.raw if info is not None else None
    firmware = (
        str(info.firmware_id) if info is not None and info.firmware_id is not None else None
    )
    serial = info.serial_number if info is not None else None
    # Snapshot is built from cached state; availability_summary is
    # a frozen view of the session's UNSUPPORTED-marked commands.
    availability = {
        key: state
        for key, state in self._session.availability_summary().items()
        if state.name == "UNSUPPORTED"
    }
    return WatlowDeviceSnapshot(
        name=name if name is not None else self._transport.label,
        model=model,
        firmware=firmware,
        serial=serial,
        connected=self._transport.is_open,
        last_error=self._session.last_error,
        recoverable_error_count=self._session.recoverable_error_count,
        captured_at=datetime.now(UTC),
        family=info.family if info is not None else None,
        capabilities=self._capabilities if self._capabilities is not None else Capability.NONE,
        availability_summary=availability,
    )

write_parameter async

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Write any registry parameter.

Persistent (RWE / RWES) writes require confirm=True; the session raises :class:WatlowConfirmationRequiredError before any I/O if the gate is missing.

Source code in src/watlowlib/devices/controller.py
async def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Write any registry parameter.

    Persistent (RWE / RWES) writes require ``confirm=True``;
    the session raises :class:`WatlowConfirmationRequiredError`
    before any I/O if the gate is missing.
    """
    return await self._session.execute(
        WRITE_PARAMETER,
        WriteParameterRequest(name_or_id, value, instance=instance),
        confirm=confirm,
        timeout=timeout,
    )

ControllerFamily

Bases: StrEnum

Watlow controller family discriminator.

Membership here is advisory — :class:watlowlib.devices.session.Session treats family hints as priors, not gates. See docs/design.md §5b.

ControllerLoop

ControllerLoop(controller, loop_number)

A view over one control loop on a :class:Controller.

Construct via :meth:Controller.loop; never instantiated directly by user code. The sub-facade lives only as long as the parent controller's session — closing the controller is the only cleanup needed.

Source code in src/watlowlib/devices/loop.py
def __init__(self, controller: Controller, loop_number: int) -> None:
    if loop_number < 1:
        raise WatlowValidationError(
            f"loop number must be 1-indexed and >= 1; got {loop_number}",
        )
    # If the controller has identified the device, validate
    # eagerly. Otherwise defer to the registry's per-spec
    # ``validate_instance`` at first call: a registered parameter
    # with ``max_instance=1`` will raise a clear
    # ``WatlowValidationError`` when ``loop(2).read_pv()`` is
    # invoked. That keeps ``Controller.loop(2)`` cheap when called
    # before identify, but still fails before I/O.
    loops = controller.loops
    if loops is not None and loop_number > loops:
        raise WatlowValidationError(
            f"loop {loop_number} out of range for this device (1..{loops})",
        )
    self._controller = controller
    self._loop = loop_number

number property

number

The 1-indexed loop number this view binds.

read_alarms async

read_alarms()

Read the alarm word for this loop.

Currently raises :class:watlowlib.errors.WatlowProtocolUnsupportedError — see :func:watlowlib.commands.alarms.read_alarms for why the decoder is not yet wired up.

Source code in src/watlowlib/devices/loop.py
async def read_alarms(self) -> AlarmState:
    """Read the alarm word for this loop.

    Currently raises :class:`watlowlib.errors.WatlowProtocolUnsupportedError` —
    see :func:`watlowlib.commands.alarms.read_alarms` for why the
    decoder is not yet wired up.
    """
    return await _read_alarms(self._controller.session, instance=self._loop)

read_output async

read_output()

Read this loop's working output (output_power).

Source code in src/watlowlib/devices/loop.py
async def read_output(self) -> Reading:
    """Read this loop's working output (``output_power``)."""
    return await _read_output(self._controller.session, instance=self._loop)

read_pid async

read_pid()

Read every PID gain for this loop. Missing gains return None.

Cool-side gains (cool_proportional_band, dead_band) are skipped when the controller's identified capabilities lack :attr:Capability.HAS_COOLING (e.g. PM output_2 == 'A'). Pre-identify, the gate is permissive.

Source code in src/watlowlib/devices/loop.py
async def read_pid(self) -> PidGains:
    """Read every PID gain for this loop. Missing gains return ``None``.

    Cool-side gains (``cool_proportional_band``, ``dead_band``)
    are skipped when the controller's identified capabilities
    lack :attr:`Capability.HAS_COOLING` (e.g. PM ``output_2 ==
    'A'``). Pre-identify, the gate is permissive.
    """
    return await _read_pid(
        self._controller.session,
        instance=self._loop,
        capabilities=self._controller.capabilities,
    )

read_pv async

read_pv(*, timeout=None)

Read this loop's process value.

Source code in src/watlowlib/devices/loop.py
async def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's process value."""
    return await self._controller.read_pv(instance=self._loop, timeout=timeout)

read_setpoint async

read_setpoint(*, timeout=None)

Read this loop's active setpoint.

Source code in src/watlowlib/devices/loop.py
async def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's active setpoint."""
    return await self._controller.read_setpoint(instance=self._loop, timeout=timeout)

set_setpoint async

set_setpoint(value, *, confirm=False, timeout=None)

Write this loop's setpoint (RWES → confirm=True required).

Source code in src/watlowlib/devices/loop.py
async def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write this loop's setpoint (RWES → ``confirm=True`` required)."""
    return await self._controller.set_setpoint(
        value,
        instance=self._loop,
        confirm=confirm,
        timeout=timeout,
    )

write_pid async

write_pid(gains, *, confirm=False)

Write the supplied gains for this loop.

Persistent — passing confirm=True is required. Fields left None on gains skip the wire entirely. Setting a cool-side field on a controller without :attr:Capability.HAS_COOLING raises :class:watlowlib.errors.WatlowConfigurationError.

Source code in src/watlowlib/devices/loop.py
async def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Write the supplied gains for this loop.

    Persistent — passing ``confirm=True`` is required. Fields
    left ``None`` on ``gains`` skip the wire entirely. Setting a
    cool-side field on a controller without
    :attr:`Capability.HAS_COOLING` raises
    :class:`watlowlib.errors.WatlowConfigurationError`.
    """
    return await _write_pid(
        self._controller.session,
        gains,
        instance=self._loop,
        confirm=confirm,
        capabilities=self._controller.capabilities,
    )

CsvSink

CsvSink(path)

Single-run CSV writer with first-batch schema lock.

Each :meth:open truncates the destination and writes a fresh header on the first :meth:write_many. Cross-run appending is intentionally not supported: the column set is inferred from the first batch and locked for the run, and a re-open against an existing file with a different column shape would silently produce a CSV with mismatched columns. For append semantics, use :class:~watlowlib.sinks.jsonl.JsonlSink (no schema to coordinate) or :class:~watlowlib.sinks.sqlite.SqliteSink (schema captured in the table).

Attributes:

Name Type Description
path Path

Destination file. Created or overwritten on :meth:open.

columns tuple[str, ...] | None

Locked column order after the first :meth:write_many. None before the first flush.

Source code in src/watlowlib/sinks/csv.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None
    self._writer: csv.DictWriter[str] | None = None
    self._columns: tuple[str, ...] | None = None
    self._unknown_columns_warned: set[str] = set()

columns property

columns

The locked column order, or None if no batch has been flushed.

path property

path

Destination file path.

close async

close()

Flush and close the CSV file. Idempotent.

Source code in src/watlowlib/sinks/csv.py
async def close(self) -> None:
    """Flush and close the CSV file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None
        self._writer = None

open async

open()

Open the CSV file for writing.

Truncates any existing file: the first :meth:write_many will write a fresh header row. Idempotent on already-open sinks.

Source code in src/watlowlib/sinks/csv.py
async def open(self) -> None:
    """Open the CSV file for writing.

    Truncates any existing file: the first :meth:`write_many` will
    write a fresh header row. Idempotent on already-open sinks.
    """
    if self._file is not None:
        return
    self._file = self._path.open("w", encoding="utf-8", newline="")

write_many async

write_many(samples)

Append samples as CSV rows.

On first call, infers the column set from the first sample and writes the header. Subsequent calls validate each row's keys against that locked set — unknown keys are dropped with a one-shot WARN log per unseen key.

Source code in src/watlowlib/sinks/csv.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as CSV rows.

    On first call, infers the column set from the first sample and
    writes the header. Subsequent calls validate each row's keys
    against that locked set — unknown keys are dropped with a
    one-shot WARN log per unseen key.
    """
    if self._file is None:
        raise RuntimeError("CsvSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]
    if self._writer is None:
        self._columns = tuple(rows[0].keys())
        self._writer = csv.DictWriter(self._file, fieldnames=list(self._columns))
        self._writer.writeheader()

    columns = self._columns
    assert columns is not None  # noqa: S101 — narrow for type checker

    for row in rows:
        unknown = row.keys() - set(columns)
        for key in unknown:
            if key not in self._unknown_columns_warned:
                self._unknown_columns_warned.add(key)
                _logger.warning(
                    "sinks.csv.unknown_column path=%s column=%s action=drop",
                    str(self._path),
                    key,
                )
        filtered = {k: row.get(k) for k in columns}
        self._writer.writerow(filtered)
    self._file.flush()

DeviceInfo dataclass

DeviceInfo(
    part_number,
    hardware_id,
    firmware_id,
    serial_number,
    family,
    protocol,
    address,
    capabilities,
    serial_settings,
    loops,
    health=DeviceHealth.OK,
    configured_protocol=None,
)

Identity + connection metadata for an open controller.

Returned by :meth:Controller.identify. Capabilities are decoded from the part number when one is captured (see :func:watlowlib.registry.families.capabilities_for_part_number) and OR-ed with the family prior; unobserved bits stay zero rather than being guessed.

protocol is the wire protocol the host is currently talking; configured_protocol is what the device's persistent EEPROM parameter (PM 17009) reports. They normally match, but when they diverge the helper :attr:protocol_mismatch flags it — useful for catching SKU/firmware combinations where the user wrote a new protocol but the runtime stack didn't pick it up (e.g. comms position-8 = 'A', no Modbus stack present even though 17009 reads 1057).

protocol_mismatch property

protocol_mismatch

True when EEPROM says one protocol and we're talking another.

Always False when :attr:configured_protocol is None (i.e. identify did not query parameter 17009).

DeviceProfile dataclass

DeviceProfile(
    name,
    family,
    registry,
    default_protocol,
    default_serial,
    identify,
    wire_temperature_unit=None,
)

A first-class controller type.

Attributes:

Name Type Description
name str

Stable short identifier ("ezzone" / "series_sd").

family ControllerFamily

The controller family this profile describes.

registry ParameterRegistry

Parameter registry used to decode this device's parameters.

default_protocol ProtocolKind

Wire protocol opened when the caller does not pass one explicitly.

default_serial SerialSettings

Factory serial framing for default_protocol. Its port is a placeholder — :func:open_device applies the real port via :func:dataclasses.replace.

identify IdentifyStrategy

Strategy that produces a :class:DeviceInfo.

wire_temperature_unit Unit | None

The scale temperatures travel in over the wire, when the profile knows it for certain. None means "do not guess — the user must assert it" (the EZ-ZONE PM contract; some PM firmware misreports its own unit register).

DeviceResult dataclass

DeviceResult(value, error)

Per-device result container — value or error, never both.

The protocol that produced the failure is available via result.error.context.protocol when the error carries context; keeping it off the result keeps the success-path representation clean and aligns with the ecosystem DeviceResult shape used by :mod:alicatlib and :mod:sartoriuslib.

ok property

ok

True when the controller produced a value (error is None).

failure classmethod

failure(error)

Build a failure result wrapping error.

Source code in src/watlowlib/manager.py
@classmethod
def failure(cls, error: WatlowError) -> Self:
    """Build a failure result wrapping ``error``."""
    return cls(value=None, error=error)

success classmethod

success(value)

Build a success result wrapping value.

Source code in src/watlowlib/manager.py
@classmethod
def success(cls, value: T) -> Self:
    """Build a success result wrapping ``value``."""
    return cls(value=value, error=None)

DeviceSnapshot dataclass

DeviceSnapshot(
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
)

Cross-library identity + connection summary (no I/O).

Attributes:

Name Type Description
name str

Caller-supplied device name (manager-assigned, or the transport label for a solo controller).

model str | None

Best-known model / part-number string, None until :meth:Controller.identify has run.

firmware str | None

Firmware id as a string, or None.

serial str | None

Serial-number string, or None.

connected bool

True when the underlying transport is open.

last_error ErrorContext | None

Most recent :class:ErrorContext recorded by the session, or None.

recoverable_error_count int

Session counter for swallowed-and- retried transient errors. Watlow keeps this dormant until a transient transport class is introduced; the field stays at zero today.

captured_at datetime

Wall-clock at snapshot construction (tz-aware UTC).

DiscoveryResult dataclass

DiscoveryResult(
    ok,
    port,
    address,
    baudrate,
    protocol,
    device_info,
    error,
    elapsed_s,
)

One probe attempt's outcome from :func:find_devices.

Cross-library shape (mirrors :mod:alicatlib, :mod:sartoriuslib, :mod:nidaqlib) so GUI Discover dialogs and capa-style adapters can filter responsive vs silent rows on a single attribute and consume the same field set across vendors.

A populated :attr:device_info carries the full :class:Controller.identify result, including :attr:DeviceInfo.health and (when the scan queried it) :attr:DeviceInfo.configured_protocol.

The address field is typed str | int | None to match the cross-library spec; in practice every watlow probe carries an int.

ErrorContext dataclass

ErrorContext(
    command_name=None,
    protocol=None,
    port=None,
    address=None,
    parameter_id=None,
    cls=None,
    member=None,
    instance=None,
    register_address=None,
    function_code=None,
    request=None,
    response=None,
    elapsed_s=None,
    extra=_empty_extra(),
)

Structured context attached to every :class:WatlowError.

Fields are best-effort — missing data is None rather than raising.

extra accepts any Mapping and is always frozen into a read-only :class:types.MappingProxyType at construction so the shared empty sentinel can never be mutated through error.context.extra[k] = v.

merged

merged(**updates)

Return a new context with updates overlaid. Unknown keys go to extra.

Source code in src/watlowlib/errors.py
def merged(self, **updates: Any) -> Self:
    """Return a new context with ``updates`` overlaid. Unknown keys go to ``extra``."""
    known: dict[str, Any] = {}
    extra_updates: dict[str, Any] = {}
    for key, value in updates.items():
        if key in _CONTEXT_KNOWN_FIELDS:
            known[key] = value
        else:
            extra_updates[key] = value

    new_extra: Mapping[str, Any] = (
        MappingProxyType({**self.extra, **extra_updates}) if extra_updates else self.extra
    )
    return replace(self, **known, extra=new_extra)

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every controller's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each controller produces a :class:DeviceResult and the caller inspects .error per entry.

FakeTransport

FakeTransport(
    script=None,
    *,
    queue=None,
    label="fake://test",
    latency_s=0.0,
)

Scripted :class:Transport for tests.

Parameters:

Name Type Description Default
script Mapping[bytes, ScriptedReply] | None

Mapping of write_bytes → reply. Every scripted write queues the corresponding reply into the read buffer. Unknown writes are recorded but produce no reply; the next read times out.

None
label str

Identifier used in errors.

'fake://test'
latency_s float

Per-operation artificial delay, useful for simulating a slow device.

0.0
Source code in src/watlowlib/transport/fake.py
def __init__(
    self,
    script: Mapping[bytes, ScriptedReply] | None = None,
    *,
    queue: Iterable[tuple[bytes, ScriptedReply]] | None = None,
    label: str = "fake://test",
    latency_s: float = 0.0,
) -> None:
    self._script: dict[bytes, ScriptedReply] = dict(script or {})
    self._queue: deque[tuple[bytes, ScriptedReply]] = deque(queue or [])
    self._writes: list[bytes] = []
    self._unmatched: list[bytes] = []
    self._read_buffer = bytearray()
    self._is_open = False
    self._label = label
    self._latency_s = latency_s
    self._force_read_timeout = False
    self._force_write_timeout = False

remaining_queue property

remaining_queue

Queue entries that have not been consumed yet.

unmatched_writes property

unmatched_writes

Writes that didn't match any scripted reply, in order.

A test can assert transport.unmatched_writes == () to catch accidentally-unscripted traffic — the corresponding read would have timed out, but a precise assertion fails faster and points at the right call.

writes property

writes

Every write payload recorded since construction, in order.

add_script

add_script(command, reply)

Register or overwrite a scripted reply for command.

Source code in src/watlowlib/transport/fake.py
def add_script(self, command: bytes, reply: ScriptedReply) -> None:
    """Register or overwrite a scripted reply for ``command``."""
    self._script[bytes(command)] = reply

extend_queue

extend_queue(rounds)

Append more ordered (write, reply) pairs to the FIFO queue.

Source code in src/watlowlib/transport/fake.py
def extend_queue(self, rounds: Iterable[tuple[bytes, ScriptedReply]]) -> None:
    """Append more ordered ``(write, reply)`` pairs to the FIFO queue."""
    self._queue.extend(rounds)

feed

feed(data)

Push unsolicited bytes into the read buffer.

Useful for simulating a device that left chatter on the line which the protocol client has to drain on recovery.

Source code in src/watlowlib/transport/fake.py
def feed(self, data: bytes) -> None:
    """Push unsolicited bytes into the read buffer.

    Useful for simulating a device that left chatter on the line
    which the protocol client has to drain on recovery.
    """
    self._read_buffer.extend(data)

force_read_timeout

force_read_timeout(enabled=True)

Force the next read to raise :class:WatlowTimeoutError.

Source code in src/watlowlib/transport/fake.py
def force_read_timeout(self, enabled: bool = True) -> None:
    """Force the next read to raise :class:`WatlowTimeoutError`."""
    self._force_read_timeout = enabled

force_write_timeout

force_write_timeout(enabled=True)

Force the next :meth:write to raise :class:WatlowTimeoutError.

Source code in src/watlowlib/transport/fake.py
def force_write_timeout(self, enabled: bool = True) -> None:
    """Force the next :meth:`write` to raise :class:`WatlowTimeoutError`."""
    self._force_write_timeout = enabled

FirmwareVersion dataclass

FirmwareVersion(major, minor=0, patch=0)

Semantic firmware version (major.minor.patch).

order=True makes the dataclass orderable on its tuple of fields, which matches the major.minor.patch precedence callers expect.

parse classmethod

parse(text)

Parse a string like "1", "1.2", "1.2.3", or "v1.2".

Source code in src/watlowlib/firmware.py
@classmethod
def parse(cls, text: str) -> Self:
    """Parse a string like ``"1"``, ``"1.2"``, ``"1.2.3"``, or ``"v1.2"``."""
    m = _PARSE_RE.match(text)
    if m is None:
        raise ValueError(f"invalid firmware version: {text!r}")
    major = int(m.group(1))
    minor = int(m.group(2) or 0)
    patch = int(m.group(3) or 0)
    return cls(major, minor, patch)

IdentifyStrategy

Bases: Protocol

How a profile turns an open controller into a :class:DeviceInfo.

Implementations are pure with respect to the controller's cached identity — they read parameters and return a :class:DeviceInfo; :meth:Controller.identify is responsible for caching the result.

__call__ async

__call__(
    controller,
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Return identity information for controller.

Source code in src/watlowlib/devices/profile.py
async def __call__(
    self,
    controller: Controller,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Return identity information for ``controller``."""
    ...

InMemorySink

InMemorySink()

Collect every written :class:Sample in a single list.

:attr:samples is appended to (never re-assigned) so callers can hold a reference across the sink's lifecycle. :meth:close does not clear the buffer — the point of this sink is post-run inspection.

Source code in src/watlowlib/sinks/memory.py
def __init__(self) -> None:
    self._samples: list[Sample] = []
    self._open = False
    self._closed = False

is_open property

is_open

True once :meth:open has been called and close has not.

samples property

samples

Captured samples, in write order.

close async

close()

Flip the closed flag — no I/O, buffer preserved for inspection.

Source code in src/watlowlib/sinks/memory.py
async def close(self) -> None:
    """Flip the closed flag — no I/O, buffer preserved for inspection."""
    self._closed = True

open async

open()

No backing resource — just flips the open flag.

Source code in src/watlowlib/sinks/memory.py
async def open(self) -> None:
    """No backing resource — just flips the open flag."""
    self._open = True
    self._closed = False

write_many async

write_many(samples)

Append every sample to the internal buffer.

Source code in src/watlowlib/sinks/memory.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append every sample to the internal buffer."""
    if not self.is_open:
        raise RuntimeError("InMemorySink: write_many called before open()")
    self._samples.extend(samples)

JsonlSink

JsonlSink(path)

Append-only JSONL writer — one flattened sample per line.

The on-disk format is <sample-row-as-json>\n per sample; reading back is just [json.loads(line) for line in f]. No header, no schema declaration, no framing overhead beyond the newline. Opening points at the same path twice extends the file — useful for resumable acquisitions and crash-restart scripts.

Source code in src/watlowlib/sinks/jsonl.py
def __init__(self, path: str | Path) -> None:
    self._path = Path(path)
    self._file: TextIOWrapper | None = None

path property

path

Destination file path.

close async

close()

Flush and close the JSONL file. Idempotent.

Source code in src/watlowlib/sinks/jsonl.py
async def close(self) -> None:
    """Flush and close the JSONL file. Idempotent."""
    if self._file is None:
        return
    try:
        self._file.flush()
    finally:
        self._file.close()
        self._file = None

open async

open()

Open the JSONL file for writing in append mode.

Pre-existing content is preserved; new samples are appended. Idempotent on already-open sinks.

Source code in src/watlowlib/sinks/jsonl.py
async def open(self) -> None:
    """Open the JSONL file for writing in append mode.

    Pre-existing content is preserved; new samples are appended.
    Idempotent on already-open sinks.
    """
    if self._file is not None:
        return
    self._file = self._path.open("a", encoding="utf-8", newline="")

write_many async

write_many(samples)

Serialise each sample as one JSON object per line.

Source code in src/watlowlib/sinks/jsonl.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Serialise each sample as one JSON object per line."""
    if self._file is None:
        raise RuntimeError("JsonlSink: write_many called before open()")
    if not samples:
        return
    for sample in samples:
        row = sample_to_row(sample)
        self._file.write(json.dumps(row, ensure_ascii=False))
        self._file.write("\n")
    self._file.flush()

LoopState dataclass

LoopState(
    loop, pv, setpoint, output_pct, raw=(lambda: {})()
)

Snapshot of one loop. Composed from several reads.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch and enqueue the newest. Useful for real-time monitoring where the latest reading matters more than historical buffer contents. Each evicted batch is counted as late.

ParameterEntry dataclass

ParameterEntry(spec, instance, value, raw)

Generic registry-driven read/write result.

Returned by :data:watlowlib.commands.READ_PARAMETER and :data:watlowlib.commands.WRITE_PARAMETER. The :class:Controller translates an entry into a :class:Reading / :class:PartNumber / etc. when the public API guarantees a richer shape.

ParameterRegistry

ParameterRegistry(
    specs,
    *,
    aliases=DEFAULT_ALIASES,
    name_overrides=_NAME_OVERRIDES,
)

Indexed view over a sequence of :class:ParameterSpec rows.

Lookups are O(1) on canonical name, alias, and parameter_id. Construction is O(N).

Source code in src/watlowlib/registry/parameters.py
def __init__(
    self,
    specs: tuple[ParameterSpec, ...],
    *,
    aliases: Mapping[str, str] = DEFAULT_ALIASES,
    name_overrides: Mapping[int, str] = _NAME_OVERRIDES,
) -> None:
    # ``name_overrides`` promotes parameter ids to short canonical
    # names. The PM registry uses the bundled :data:`_NAME_OVERRIDES`
    # (its auto-derived names are verbose); the SD registry passes
    # ``{}`` and instead carries verbatim ``canonical`` names baked
    # into each spec at load time. Passing the PM table to a non-PM
    # registry would mis-claim collision keys (e.g. "units") for PM
    # parameter ids that don't exist in that registry.
    # Apply name overrides + collect aliases per spec.
    rebound: list[ParameterSpec] = []
    for spec in specs:
        override = name_overrides.get(spec.parameter_id)
        name = override or spec.name
        spec_aliases: set[str] = set()
        for alias, target in aliases.items():
            if target == name:
                spec_aliases.add(alias)
        if override and override != spec.name:
            # Keep the original auto-generated name as an alias so
            # callers that learn it from the JSON still resolve.
            spec_aliases.add(spec.name)
        if name != spec.name or spec_aliases:
            # ``dataclasses.replace`` copies every other field
            # verbatim — including any field added to ParameterSpec
            # later (``scale``, future per-row overrides). A manual
            # field-by-field rebind would silently drop new fields,
            # so never reintroduce one here.
            spec = replace(  # noqa: PLW2901 — frozen dataclass rebind
                spec,
                name=name,
                aliases=frozenset(spec_aliases),
            )
        rebound.append(spec)

    self._specs: tuple[ParameterSpec, ...] = tuple(rebound)
    # A handful of canonical names auto-derived by ``_canonical_name``
    # collide across unrelated rows (e.g. "Display - Units",
    # "Analog Input - Units", and "Linearization - Units" all
    # canonicalise to ``"units"``). When an :data:`_NAME_OVERRIDES`
    # entry targets one of those collision names, the override row
    # wins — otherwise the JSON's iteration order silently decided
    # which spec a public name like ``"units"`` resolved to.
    override_owner: dict[str, int] = {name.lower(): pid for pid, name in name_overrides.items()}
    by_id: dict[int, ParameterSpec] = {}
    by_name: dict[str, ParameterSpec] = {}
    for spec in self._specs:
        by_id[spec.parameter_id] = spec
        key = spec.name.lower()
        owner = override_owner.get(key)
        if owner is not None and owner != spec.parameter_id:
            # Another row's auto-canonical name collides with an
            # override-assigned name; the override row owns the key.
            continue
        by_name[key] = spec
        for alias in spec.aliases:
            by_name.setdefault(alias.lower(), spec)
    self._by_id: Mapping[int, ParameterSpec] = MappingProxyType(by_id)
    self._by_name: Mapping[str, ParameterSpec] = MappingProxyType(by_name)

has

has(name_or_id)

Return True if name_or_id resolves; never raises.

Source code in src/watlowlib/registry/parameters.py
def has(self, name_or_id: str | int) -> bool:
    """Return ``True`` if ``name_or_id`` resolves; never raises."""
    try:
        self.resolve(name_or_id)
    except WatlowValidationError:
        return False
    return True

resolve

resolve(name_or_id)

Look up a spec by canonical name, alias, or parameter ID.

Raises:

Type Description
WatlowValidationError

name_or_id does not resolve.

Source code in src/watlowlib/registry/parameters.py
def resolve(self, name_or_id: str | int) -> ParameterSpec:
    """Look up a spec by canonical name, alias, or parameter ID.

    Raises:
        WatlowValidationError: ``name_or_id`` does not resolve.
    """
    if isinstance(name_or_id, int):
        try:
            return self._by_id[name_or_id]
        except KeyError as exc:
            raise WatlowValidationError(
                f"unknown parameter id: {name_or_id}",
            ) from exc
    key = name_or_id.lower()
    try:
        return self._by_name[key]
    except KeyError as exc:
        raise WatlowValidationError(
            f"unknown parameter name: {name_or_id!r}",
        ) from exc

validate_instance

validate_instance(spec, instance)

Raise if instance is out of range for spec.

Public so the variant layer can validate before encoding.

Source code in src/watlowlib/registry/parameters.py
def validate_instance(self, spec: ParameterSpec, instance: int) -> None:
    """Raise if ``instance`` is out of range for ``spec``.

    Public so the variant layer can validate before encoding.
    """
    if instance < 1 or instance > spec.max_instance:
        raise WatlowValidationError(
            f"instance {instance} out of range for {spec.name!r} (1..{spec.max_instance})",
        )

validate_value

validate_value(spec, value)

Soft range check based on the spec's parsed range metadata.

Skipped silently if range_min / range_max couldn't be parsed from the JSON range field — Watlow's range strings are not always machine-readable. STRING parameters are not range-checked.

Source code in src/watlowlib/registry/parameters.py
def validate_value(self, spec: ParameterSpec, value: float | int | str) -> None:
    """Soft range check based on the spec's parsed ``range`` metadata.

    Skipped silently if ``range_min`` / ``range_max`` couldn't be
    parsed from the JSON ``range`` field — Watlow's range strings
    are not always machine-readable. STRING parameters are not
    range-checked.
    """
    if isinstance(value, str):
        return
    if spec.range_min is None or spec.range_max is None:
        return
    v = float(value)
    if v < spec.range_min or v > spec.range_max:
        raise WatlowValidationError(
            f"value {value!r} out of range for {spec.name!r} "
            f"({spec.range_min}..{spec.range_max})",
        )

ParameterSpec dataclass

ParameterSpec(
    parameter_id,
    name,
    aliases,
    data_type,
    unit_kind,
    rwes,
    safety,
    cls,
    member,
    default_instance,
    max_instance,
    relative_addr,
    absolute_addr,
    register_count,
    word_order=None,
    range_min=None,
    range_max=None,
    default=None,
    family_hints=_empty_family_hints(),
    scale=1.0,
)

A single parameter row from pm_parameters.json.

Per-protocol fields:

  • Std Bus selector: :attr:cls, :attr:member, :attr:default_instance, :attr:max_instance.
  • Modbus selector: :attr:relative_addr, :attr:absolute_addr, :attr:register_count, :attr:word_order (None → client default HIGH_LOW per design §5a).

scale class-attribute instance-attribute

scale = 1.0

Engineering-unit scale factor for the Modbus decode / encode path.

The wire stores raw integers (e.g. the Series SD reports a process value of 68421 for 68.421 °F); scale is the multiplier that turns the raw word into engineering units on read (value * scale) and the divisor that turns engineering units back into raw words on write (round(value / scale)).

1.0 (the default) means no scaling — and is applied as a strict identity: the read path skips the multiply entirely when scale == 1.0 so an integer parameter stays an int rather than being promoted to float by int * 1.0. Std Bus rows are never scaled (the Std Bus variant ignores this field).

ParquetSink

ParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
)

Append-only Parquet writer with first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination Parquet file.

compression _Compression

Codec applied to every row group.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first flush.

Source code in src/watlowlib/sinks/parquet.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
) -> None:
    self._path = Path(path)
    self._compression: _Compression = compression
    self._use_dictionary = use_dictionary
    if row_group_size is not None and row_group_size < 1:
        raise ValueError(
            f"row_group_size must be >= 1 if set, got {row_group_size!r}",
        )
    self._row_group_size = row_group_size
    self._schema = SchemaLock(sink_name="parquet", logger=_logger)
    self._pa: Any = None
    self._pq: Any = None
    self._arrow_schema: Any = None
    self._writer: Any = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

compression property

compression

The configured compression codec.

path property

path

Destination Parquet file path.

close async

close()

Flush the footer and close the writer. Idempotent.

Source code in src/watlowlib/sinks/parquet.py
async def close(self) -> None:
    """Flush the footer and close the writer. Idempotent."""
    if self._writer is not None:
        try:
            self._writer.close()
        finally:
            self._writer = None
    self._pa = None
    self._pq = None
    _logger.info(
        "sinks.parquet.close path=%s rows_written=%s",
        str(self._path),
        self._rows_written,
    )

open async

open()

Load pyarrow and create the parent directory. Idempotent.

The actual :class:pyarrow.parquet.ParquetWriter is opened lazily on the first :meth:write_many call, because the writer requires a concrete schema — which we don't have until the first batch is inspected.

Source code in src/watlowlib/sinks/parquet.py
async def open(self) -> None:
    """Load pyarrow and create the parent directory. Idempotent.

    The actual :class:`pyarrow.parquet.ParquetWriter` is opened
    lazily on the first :meth:`write_many` call, because the
    writer requires a concrete schema — which we don't have until
    the first batch is inspected.
    """
    if self._pa is not None:
        return
    self._pa, self._pq = _load_pyarrow()
    self._path.parent.mkdir(parents=True, exist_ok=True)
    _logger.info(
        "sinks.parquet.open path=%s compression=%s",
        str(self._path),
        self._compression,
    )

write_many async

write_many(samples)

Append samples as one Parquet row group.

On first call: infers the schema from the batch, locks it, constructs the matching :mod:pyarrow schema, and opens the underlying :class:~pyarrow.parquet.ParquetWriter.

Subsequent calls project each row onto the locked schema and append the rows as a new row group. Unknown columns are dropped with one-shot WARN (handled by :class:SchemaLock).

Source code in src/watlowlib/sinks/parquet.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as one Parquet row group.

    On first call: infers the schema from the batch, locks it,
    constructs the matching :mod:`pyarrow` schema, and opens the
    underlying :class:`~pyarrow.parquet.ParquetWriter`.

    Subsequent calls project each row onto the locked schema and
    append the rows as a new row group. Unknown columns are
    dropped with one-shot WARN (handled by :class:`SchemaLock`).
    """
    if self._pa is None:
        raise RuntimeError("ParquetSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        self._schema.lock(rows)
        self._arrow_schema = self._build_arrow_schema()
        self._writer = self._open_writer()

    assert self._writer is not None  # noqa: S101 — narrow for type checker
    assert self._arrow_schema is not None  # noqa: S101

    projected = [self._schema.project(r) for r in rows]
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    arrays = {spec.name: [row[spec.name] for row in projected] for spec in columns}

    try:
        table = self._pa.Table.from_pydict(arrays, schema=self._arrow_schema)
        self._writer.write_table(
            table,
            row_group_size=self._row_group_size,
        )
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"ParquetSink: write failed for {self._path}: {exc}",
        ) from exc
    self._rows_written += len(projected)

PartNumber dataclass

PartNumber(raw, family, details=(lambda: {})())

Parsed part-number string returned by read_part_number.

Per-family digit decoding is contributed by :mod:watlowlib.registry.families. Decoded fragments live in :attr:details as a free-form mapping so each family can populate only what its ordering format defines, and so adding fragments to the PM decoder later is non-breaking.

The EZ-ZONE PM decoder populates case size, control type, power input, three output codes, and options string. Other families fall through to a stub: only :attr:family is set, and :attr:details is empty.

PollSource

Bases: Protocol

Minimal shape the recorder needs from its dispatcher.

Both :class:~watlowlib.devices.controller.Controller (solo) and :class:~watlowlib.manager.WatlowManager (multi-device) satisfy this Protocol. Using a Protocol keeps :func:record testable against a lightweight stub without standing up a full controller + transport pipeline.

The contract is intentionally narrow: per call, return a flat :class:~collections.abc.Sequence of :class:Sample\ s — one per (device, parameter) read that succeeded. Failed reads are dropped from the batch and logged by the source; the recorder never sees them.

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Read every parameters × instances combination on every device.

Parameters:

Name Type Description Default
parameters Sequence[str | int]

Parameter names or registry IDs.

required
names Sequence[str] | None

Subset of device names to poll (Manager-only; Controller ignores). None polls everything the source manages.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single-loop devices use (1,) (the default).

(1,)

Returns:

Type Description
Sequence[Sample]

A flat :class:Sequence of :class:Sample. Empty when

Sequence[Sample]

every poll failed.

Source code in src/watlowlib/streaming/recorder.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> Sequence[Sample]:
    """Read every ``parameters`` × ``instances`` combination on every device.

    Args:
        parameters: Parameter names or registry IDs.
        names: Subset of device names to poll (Manager-only;
            Controller ignores). ``None`` polls everything the
            source manages.
        instances: 1-indexed loop / channel numbers per device.
            Single-loop devices use ``(1,)`` (the default).

    Returns:
        A flat :class:`Sequence` of :class:`Sample`. Empty when
        every poll failed.
    """
    ...

PollSourceAdapter

PollSourceAdapter(name, device)

Wrap one :class:Controller as a named :class:PollSource.

Implements poll_many(parameters, *, names=None, instances=(1,)) -> Sequence[Sample] — the watlow recorder Protocol. When names is supplied and does not contain this adapter's name, the call short-circuits to an empty list (cross-library Manager- style filter semantics).

Sample relabeling: :meth:Controller.poll_many tags each :class:Sample with the transport label by default. This adapter rebuilds each sample via :func:dataclasses.replace to set :attr:Sample.device to the caller-provided name. Cost is negligible at typical watlow rates (1–5 Hz × small parameter sets) and stays well under 1 ms/tick at high parameter counts.

Source code in src/watlowlib/streaming/adapter.py
def __init__(self, name: str, device: Controller) -> None:
    self._name = name
    self._device = device

device property

device

The wrapped :class:Controller.

name property

name

The caller-provided device name attached to every emitted sample.

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Poll the wrapped controller and relabel each emitted sample.

Returns [] when names is supplied and this adapter's :attr:name is not in it — same filter semantics as :meth:WatlowManager.poll_many.

Source code in src/watlowlib/streaming/adapter.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Poll the wrapped controller and relabel each emitted sample.

    Returns ``[]`` when ``names`` is supplied and this adapter's
    :attr:`name` is not in it — same filter semantics as
    :meth:`WatlowManager.poll_many`.
    """
    if names is not None and self._name not in set(names):
        return []
    samples = await self._device.poll_many(parameters, instances=instances)
    return [dataclasses.replace(sample, device=self._name) for sample in samples]

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/watlowlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

PostgresSink

PostgresSink(config)

Append-only Postgres writer using pooled asyncpg connections.

Attributes:

Name Type Description
config PostgresConfig

Frozen :class:PostgresConfig instance.

columns tuple[ColumnSpec, ...] | None

Locked columns in order, or None before first :meth:write_many.

Source code in src/watlowlib/sinks/postgres.py
def __init__(self, config: PostgresConfig) -> None:
    self._config = config
    self._schema = SchemaLock(sink_name="postgres", logger=_logger)
    self._asyncpg: Any = None
    self._pool: Any = None
    self._insert_sql: str | None = None
    self._rows_written = 0

columns property

columns

Locked columns in order, or None before first :meth:write_many.

config property

config

The frozen :class:PostgresConfig passed in at construction.

close async

close()

Close the pool. Idempotent.

Source code in src/watlowlib/sinks/postgres.py
async def close(self) -> None:
    """Close the pool. Idempotent."""
    if self._pool is None:
        return
    pool = self._pool
    self._pool = None
    try:
        await pool.close()
    finally:
        _logger.info(
            "sinks.postgres.close target=%s rows_written=%s",
            self._config.target(),
            self._rows_written,
        )
    self._asyncpg = None

open async

open()

Load asyncpg, open the pool, and (optionally) introspect the table.

Idempotent. When create_table=False (the default), the target's columns are read on open and the schema is locked immediately. When create_table=True the lock happens lazily on the first :meth:write_many.

Source code in src/watlowlib/sinks/postgres.py
async def open(self) -> None:
    """Load asyncpg, open the pool, and (optionally) introspect the table.

    Idempotent. When ``create_table=False`` (the default), the
    target's columns are read on open and the schema is locked
    immediately. When ``create_table=True`` the lock happens
    lazily on the first :meth:`write_many`.
    """
    if self._pool is not None:
        return
    self._asyncpg = _load_asyncpg()
    cfg = self._config
    server_settings = {
        "application_name": "watlowlib",
        "statement_timeout": str(int(cfg.statement_timeout_ms)),
    }
    try:
        if cfg.dsn is not None:
            self._pool = await self._asyncpg.create_pool(
                dsn=cfg.dsn,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
        else:
            self._pool = await self._asyncpg.create_pool(
                host=cfg.host,
                port=cfg.port,
                user=cfg.user,
                password=cfg.password,
                database=cfg.database,
                min_size=cfg.pool_min_size,
                max_size=cfg.pool_max_size,
                command_timeout=cfg.command_timeout_s,
                server_settings=server_settings,
            )
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"PostgresSink: failed to open pool for {cfg.target()}: {exc}",
        ) from exc

    _logger.info(
        "sinks.postgres.open target=%s pool_min=%s pool_max=%s use_copy=%s create_table=%s",
        cfg.target(),
        cfg.pool_min_size,
        cfg.pool_max_size,
        cfg.use_copy,
        cfg.create_table,
    )

    if not cfg.create_table:
        await self._introspect_existing_table()

write_many async

write_many(samples)

Append samples — one COPY (or executemany) per call.

Source code in src/watlowlib/sinks/postgres.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` — one COPY (or executemany) per call."""
    if self._pool is None:
        raise RuntimeError("PostgresSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked:
        assert self._config.create_table  # noqa: S101
        self._schema.lock(rows)
        await self._create_table()
        self._insert_sql = self._build_insert_sql()

    columns = self._schema.columns
    assert columns is not None  # noqa: S101
    assert self._insert_sql is not None  # noqa: S101

    projected_tuples: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected_tuples.append(tuple(fields[spec.name] for spec in columns))

    try:
        if self._config.use_copy:
            await self._write_copy(projected_tuples, columns)
        else:
            await self._write_executemany(projected_tuples)
    except WatlowSinkWriteError:
        raise
    except Exception as exc:
        raise WatlowSinkWriteError(
            f"PostgresSink: write failed for {self._config.target()}: {exc}",
        ) from exc
    self._rows_written += len(projected_tuples)

ProtocolClient

Bases: Protocol

Per-device protocol client.

Implementations own the wire codec and the per-port lock. The :class:watlowlib.devices.session.Session is the only caller; it holds lock for the duration of a single command (request + reply).

disposed property

disposed

Whether :meth:dispose has been called.

kind property

kind

The :class:ProtocolKind this client speaks.

lock property

lock

Per-client lock acquired by :meth:Session.execute.

One lock per port — a single :class:Session serializes its own traffic, and :class:watlowlib.manager.WatlowManager enforces one protocol per port across sessions.

dispose

dispose()

Mark the client unusable. Subsequent execute calls raise.

Synchronous because dispose is called from teardown paths that don't always have an event loop. The client is responsible for closing its transport (or signalling the owning :class:Session to do so) — this method just trips the flag.

Source code in src/watlowlib/protocol/base.py
def dispose(self) -> None:
    """Mark the client unusable. Subsequent ``execute`` calls raise.

    Synchronous because dispose is called from teardown paths that
    don't always have an event loop. The client is responsible for
    closing its transport (or signalling the owning :class:`Session`
    to do so) — this method just trips the flag.
    """
    ...

execute async

execute(request, *, address, timeout=None, command_name='')

Send request to address, return the typed reply.

address travels with every call so one client can serve multiple devices on a multi-drop RS-485 segment without re-construction. Std Bus accepts 1..16, Modbus RTU accepts 1..247.

timeout overrides :attr:watlowlib.config.DEFAULTS.io_timeout_s for this call only. command_name is threaded into log events and error contexts; it is informational, not load-bearing for dispatch.

Source code in src/watlowlib/protocol/base.py
async def execute(
    self,
    request: Request_contra,
    *,
    address: int,
    timeout: float | None = None,
    command_name: str = "",
) -> Reply_co:
    """Send ``request`` to ``address``, return the typed reply.

    ``address`` travels with every call so one client can serve
    multiple devices on a multi-drop RS-485 segment without
    re-construction. Std Bus accepts ``1..16``, Modbus RTU accepts
    ``1..247``.

    ``timeout`` overrides :attr:`watlowlib.config.DEFAULTS.io_timeout_s`
    for this call only. ``command_name`` is threaded into log
    events and error contexts; it is informational, not load-bearing
    for dispatch.
    """
    ...

ProtocolKind

Bases: StrEnum

Wire protocol selected for a session.

AUTO triggers the conservative Std Bus → Modbus probe.

Reading dataclass

Reading(
    value, unit, received_at, monotonic_ns, raw, protocol
)

A single timestamped value from the controller.

protocol is set by the variant decoder, not by the facade — it reflects which wire protocol produced the value (per docs/design.md invariant 7).

Recording dataclass

Recording(stream, summary, rate_hz)

Container yielded by :func:record — stream + live summary + rate.

Cross-library shape (alicat / sartorius / watlow / nidaq) so consumers consume the same recording.stream / recording.summary / recording.rate_hz accessors regardless of vendor.

Per-library payload (the T parameter):

  • alicat / sartorius: Recording[Mapping[str, Sample]]
  • watlow: Recording[Sequence[Sample]] — per-tick batches
  • nidaq: Recording[DaqReading] (polled) / Recording[DaqBlock] (block)

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of per-tick payloads. Consume with async for batch in recording.stream.

summary AcquisitionSummary

Live :class:AcquisitionSummary — the recorder mutates this in place; consumers read. summary.finished_at is None while running and is populated on context-manager exit.

rate_hz float

The cadence the recorder is running at, captured at record() entry. Useful for queue-sizing downstream buffers.

RwesFlag

Bases: StrEnum

Persistence + access flag from the EZ-ZONE register list.

  • R — read-only.
  • W — write-only (rare; typically actions like "start autotune").
  • RW — runtime read/write, not EEPROM-backed.
  • RWE — RW + persisted to EEPROM.
  • RWES — RWE + saved set ("save settings to user memory").

Mapping to :class:SafetyTier is in :func:_safety_from_rwes; the registry binds the result to :attr:ParameterSpec.safety at load time.

SafetyTier

Bases: IntEnum

How dangerous a command is to invoke.

  • READ_ONLY (R) — no state change.
  • STATEFUL — runtime state change but not EEPROM-backed. Reserved for commands like "start autotune"; no PM parameter maps here today, but the tier exists so future commands have a place to live.
  • PERSISTENT (RW / RWE / RWES) — EEPROM-backed; requires confirm=True at the facade.

Sample dataclass

Sample(
    device,
    address,
    protocol,
    parameter,
    parameter_id,
    instance,
    value,
    unit,
    t_mono_ns,
    t_utc,
    t_midpoint_mono_ns,
    requested_at,
    received_at,
    latency_s,
    raw,
)

One parameter read with full timing provenance.

Attributes:

Name Type Description
device str

Manager-assigned name (or controller label for solo recordings). Stable downstream identifier that follows the value into sinks.

address int

Bus address of the polled device. Std Bus 1..16, Modbus RTU 1..247.

protocol ProtocolKind

Wire protocol that decoded this read. Set from the session's protocol kind, not the reading metadata, so a mixed-protocol recording records the source per row.

parameter str

Canonical parameter name (e.g. "process_value").

parameter_id int

Registry parameter id (e.g. 4001).

instance int

1-indexed loop / channel selector used for the read.

value float | int | str | bool | None

The decoded scalar. None when the device reported the value as unavailable (sensor-fail, overload, ...).

unit Unit | str | None

Concrete :class:Unit from the Watlow polling path, a free-form string for cross-vendor rows (Alicat's "psia", "sccm" etc. via examples/mixed_watlow_alicat_sqlite.py), or None when the parameter has no unit (counts, IDs, time constants). The Watlow recorder always populates a :class:Unit; the str branch only fires for hand-built cross-vendor samples.

t_mono_ns int

:func:time.monotonic_ns midpoint of the request/ reply round-trip — canonical join key. Monotonic since OS boot; no calendar meaning.

t_utc datetime

Wall-clock midpoint of the request/reply round-trip (tz-aware UTC). Preferred point estimate when aligning Watlow samples against other sensor streams in human time.

t_midpoint_mono_ns int | None

Optional integration-window midpoint in monotonic nanoseconds. None for single polled reads; sensors that average over a window populate it.

requested_at datetime

Wall-clock datetime (UTC) captured just before the read leaves the host.

received_at datetime

Wall-clock datetime (UTC) captured just after the reply is decoded.

latency_s float

(received_at - requested_at).total_seconds() — precomputed for convenience.

raw bytes

The wire payload that produced the value. Available for diagnostics; tabular sinks drop it.

SampleSink

Bases: Protocol

Minimal shape of an acquisition sink.

Sinks own their storage handle lifecycle. Concrete implementations typically follow this call sequence:

  1. await sink.open() — allocate file descriptors, DB connections, etc. Safe to call again on an already-open sink.
  2. await sink.write_many(samples) — one or more times. samples is a :class:~collections.abc.Sequence so the sink knows the full batch up front (CSV column inference, Parquet row groups, parameterised inserts).
  3. await sink.close() — flush and release the handle. Idempotent.

The async context-manager methods provide an async with sink: shape for the common case of "open → write → close" in one block.

__aenter__ async

__aenter__()

Open the sink and return self for chaining.

Source code in src/watlowlib/sinks/base.py
async def __aenter__(self) -> Self:
    """Open the sink and return ``self`` for chaining."""
    ...

__aexit__ async

__aexit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/watlowlib/sinks/base.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close async

close()

Flush and release the backing resource. Idempotent.

Source code in src/watlowlib/sinks/base.py
async def close(self) -> None:
    """Flush and release the backing resource. Idempotent."""
    ...

open async

open()

Allocate the sink's backing resource (file handle, DB conn, ...).

Source code in src/watlowlib/sinks/base.py
async def open(self) -> None:
    """Allocate the sink's backing resource (file handle, DB conn, ...)."""
    ...

write_many async

write_many(samples)

Append samples to the sink.

Sequence (not Iterable) because every in-tree sink wants len() — CSV schema inference, batched parameterised inserts, Parquet row-group bookkeeping.

Source code in src/watlowlib/sinks/base.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink.

    ``Sequence`` (not ``Iterable``) because every in-tree sink wants
    ``len()`` — CSV schema inference, batched parameterised inserts,
    Parquet row-group bookkeeping.
    """
    ...

SerialSettings dataclass

SerialSettings(
    port,
    baudrate=38400,
    bytesize=ByteSize.EIGHT,
    parity=Parity.NONE,
    stopbits=StopBits.ONE,
    rtscts=False,
    xonxoff=False,
    exclusive=True,
)

Serial-port configuration for :class:SerialTransport.

Mirrors :class:anyserial.SerialConfig plus a port path. Default framing is 38400 8-N-1, the EZ-ZONE PM Standard Bus factory setting. exclusive defaults True because Standard Bus is poll/response and won't tolerate a second writer.

The __post_init__ accepts int / float / str shorthand at runtime for the framing fields (bytesize=8, parity="none", stopbits=1) and normalises to the enum. The static field types are the enums themselves so mypy --strict users must pass :class:anyserial.ByteSize / :class:anyserial.Parity / :class:anyserial.StopBits directly; the runtime shorthand is primarily for CLI argument parsing and interactive scripts.

factory_for classmethod

factory_for(protocol, *, port)

Return the EZ-ZONE PM factory framing for protocol.

  • STDBUS → 38400 8-N-1 (the Standard Bus factory default).
  • MODBUS_RTU → 9600 8-E-1 (the Modbus RTU factory default per the EZ-ZONE PM manual).

AUTO raises :class:WatlowConfigurationError — there is no single factory framing for AUTO, the detector probes both. Callers crossing protocol boundaries (the maintenance helpers that switch protocol, watlow-discover --protocol both) should rebuild settings per protocol via this method instead of inheriting whatever framing the previous call used.

Source code in src/watlowlib/transport/base.py
@classmethod
def factory_for(cls, protocol: ProtocolKind, *, port: str) -> SerialSettings:
    """Return the EZ-ZONE PM factory framing for ``protocol``.

    - ``STDBUS`` → 38400 8-N-1 (the Standard Bus factory default).
    - ``MODBUS_RTU`` → 9600 8-E-1 (the Modbus RTU factory default
      per the EZ-ZONE PM manual).

    ``AUTO`` raises :class:`WatlowConfigurationError` — there is no
    single factory framing for AUTO, the detector probes both.
    Callers crossing protocol boundaries (the maintenance helpers
    that switch protocol, ``watlow-discover --protocol both``)
    should rebuild settings per protocol via this method instead
    of inheriting whatever framing the previous call used.
    """
    # Lazy import to keep ``transport.base`` a leaf module — the
    # ProtocolKind enum lives under protocol/, which depends on
    # transport indirectly.
    from watlowlib.protocol.base import ProtocolKind  # noqa: PLC0415

    if protocol is ProtocolKind.STDBUS:
        return cls(port=port, baudrate=38400, parity=Parity.NONE)
    if protocol is ProtocolKind.MODBUS_RTU:
        return cls(port=port, baudrate=9600, parity=Parity.EVEN)
    raise WatlowConfigurationError(
        f"SerialSettings.factory_for: no single factory framing for {protocol!r}; "
        "AUTO probes both Std Bus and Modbus, build a concrete protocol's "
        "settings instead.",
    )

SerialTransport

SerialTransport(settings)

:class:Transport backed by a real serial port via anyserial.

Tests that don't need hardware should use :class:watlowlib.transport.fake.FakeTransport; the two conform to the same structural :class:Transport Protocol.

Source code in src/watlowlib/transport/serial.py
def __init__(self, settings: SerialSettings) -> None:
    self._settings = settings
    self._port: SerialPort | None = None
    # Bytes read past ``n`` in :meth:`read_exact` (e.g. when a
    # framing error makes the caller scan for a new preamble) are
    # held here so the next call sees them first. Serial I/O is
    # chunk-oriented; we can't ask the kernel "give me exactly n"
    # without buffering.
    self._pushback = bytearray()

Session

Session(
    client,
    *,
    profile,
    address,
    port,
    wire_temperature_unit=None,
)

Owns availability cache, gates, and the dispatch loop.

A :class:Session is bound to exactly one :class:ProtocolClient for its lifetime — one protocol per port (invariant 1).

Source code in src/watlowlib/devices/session.py
def __init__(
    self,
    client: ProtocolClient[Any, Any],
    *,
    profile: DeviceProfile,
    address: int,
    port: str,
    wire_temperature_unit: Unit | None = None,
) -> None:
    self._client = client
    # The profile is the device-type bundle (family + registry +
    # framing + identity). ``registry`` / ``family`` stay exposed as
    # thin delegating properties so the streaming / manager layers
    # that learned those names keep working.
    self._profile = profile
    self._registry = profile.registry
    self._family = profile.family
    self._address = address
    self._port = port
    self._availability: dict[str, Availability] = {}
    # Scale of temperature values on the wire. An explicit
    # ``wire_temperature_unit`` (from
    # ``open_device(assert_wire_temperature_unit=...)``) wins;
    # otherwise the profile's own ``wire_temperature_unit`` seeds it
    # (the Series SD knows it speaks °F; the EZ-ZONE PM profile
    # leaves it ``None`` so the user must assert). Drives
    # :class:`Reading.unit` / :class:`Sample.unit` for temperature
    # parameters; ``None`` means "do not guess". The library makes
    # **no** attempt to derive this from PM parameter 17050 — on at
    # least one PM3 firmware that register is label-only and does
    # not govern the wire scale. See ``docs/devices.md`` §Units.
    self._wire_temperature_unit: Unit | None = (
        wire_temperature_unit
        if wire_temperature_unit is not None
        else profile.wire_temperature_unit
    )
    self._wire_temperature_unit_warned: bool = False
    # Lazy cache of parameter 17050's reported value. Kept purely
    # as an inspection helper exposed via
    # :meth:`Controller.read_comms_unit_label` — **does not** feed
    # :class:`Reading.unit`. ``_comms_unit_label_loaded`` distinguishes
    # "haven't asked yet" from "asked and got nothing"; subsequent
    # calls don't repeat the wire turn-around after a rejection.
    self._comms_unit_label: Unit | None = None
    self._comms_unit_label_loaded: bool = False
    # Most recent error's :class:`ErrorContext`, captured from any
    # :class:`WatlowError` that propagates out of :meth:`execute`.
    # Drives :class:`DeviceSnapshot.last_error`. Cleared by
    # :meth:`clear_last_error` (manual reset for diagnostics).
    self._last_error: ErrorContext | None = None
    # Count of transient transport hiccups the session swallowed
    # and retried. Watlow has no current transient class (§F of
    # the unified spec — deferred), so this counter stays at zero
    # unless a future transport path increments it. Reset on
    # :meth:`open` / fresh transport.
    self.recoverable_error_count: int = 0

address property

address

Session bus address.

client property

client

The bound protocol client.

Exposed for the watlow-raw escape hatch and for diagnostics that need to issue an unframed wire op outside the registry. Callers must acquire :attr:ProtocolClient.lock before :meth:ProtocolClient.execute to honour the per-port serialization invariant, and must pass this session's :attr:address (or another concrete address for multi-drop diagnostics) to execute.

family property

family

Best-known controller family for this session (delegates to the profile).

last_error property

last_error

Most-recent error context captured by :meth:execute.

port property

port

Transport label (for logs / error context).

profile property

profile

The device profile (family + registry + framing + identity).

protocol_kind property

protocol_kind

The wire protocol this session speaks.

registry property

registry

Parameter registry bound to this session.

Exposed for the streaming layer so polling code can resolve a name / id to a :class:ParameterSpec without an extra import of the module-level :data:PARAMETERS.

availability

availability(command_name)

Cached availability for command_name.

Source code in src/watlowlib/devices/session.py
def availability(self, command_name: str) -> Availability:
    """Cached availability for ``command_name``."""
    return self._availability.get(command_name, Availability.UNKNOWN)

availability_summary

availability_summary()

Return the current command availability cache.

Includes every command the session has dispatched; the snapshot path filters to the UNSUPPORTED entries.

Source code in src/watlowlib/devices/session.py
def availability_summary(self) -> Mapping[str, Availability]:
    """Return the current command availability cache.

    Includes every command the session has dispatched; the
    snapshot path filters to the UNSUPPORTED entries.
    """
    return MappingProxyType(dict(self._availability))

clear_last_error

clear_last_error()

Reset :attr:last_error to None (manual diagnostics aid).

Source code in src/watlowlib/devices/session.py
def clear_last_error(self) -> None:
    """Reset :attr:`last_error` to ``None`` (manual diagnostics aid)."""
    self._last_error = None

comms_unit_label async

comms_unit_label()

Return the value parameter 17050 reports for this session.

Inspection helper, not a source of truth: on at least one PM3 firmware revision 17050 is a label-only register that does not govern the wire scale. :class:Reading.unit is sourced from :meth:wire_temperature_unit instead.

Reads the parameter on first call and caches the result for the lifetime of the session. Returns None when the device rejects the read or reports an unknown code; the cache distinguishes "haven't asked yet" from "asked and got nothing" so a rejection does not cost another wire turn-around.

Invalidated by :meth:invalidate_comms_unit_label (called by :meth:Controller.set_comms_unit_label after a successful write).

Source code in src/watlowlib/devices/session.py
async def comms_unit_label(self) -> Unit | None:
    """Return the value parameter 17050 reports for this session.

    Inspection helper, not a source of truth: on at least one PM3
    firmware revision 17050 is a label-only register that does not
    govern the wire scale. :class:`Reading.unit` is sourced from
    :meth:`wire_temperature_unit` instead.

    Reads the parameter on first call and caches the result for
    the lifetime of the session. Returns ``None`` when the device
    rejects the read or reports an unknown code; the cache
    distinguishes "haven't asked yet" from "asked and got nothing"
    so a rejection does not cost another wire turn-around.

    Invalidated by :meth:`invalidate_comms_unit_label` (called by
    :meth:`Controller.set_comms_unit_label` after a successful
    write).
    """
    if self._comms_unit_label_loaded:
        return self._comms_unit_label
    self._comms_unit_label = await self._fetch_comms_unit_label()
    self._comms_unit_label_loaded = True
    return self._comms_unit_label

dispose

dispose()

Dispose the bound protocol client.

Source code in src/watlowlib/devices/session.py
def dispose(self) -> None:
    """Dispose the bound protocol client."""
    self._client.dispose()

execute async

execute(command, request, *, confirm=False, timeout=None)

Dispatch command with request and return the typed response.

Source code in src/watlowlib/devices/session.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Resp:
    """Dispatch ``command`` with ``request`` and return the typed response."""
    kind = self._client.kind
    # Variant resolution. The session picks the variant matching
    # the bound protocol; one protocol per port (invariant 1).
    # Resolve to a single ``variant`` local so the rest of the
    # method is protocol-agnostic; we still branch on ``kind``
    # for the encode/decode call shapes (stdbus takes ``reply``;
    # modbus takes ``words, ctx, request``).
    if kind is ProtocolKind.STDBUS:
        stdbus_variant = command.stdbus
        if stdbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Std Bus variant",
                context=self._error_context(command, request),
            )
        modbus_variant = None
    elif kind is ProtocolKind.MODBUS_RTU:
        modbus_variant = command.modbus
        if modbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Modbus variant",
                context=self._error_context(command, request),
            )
        stdbus_variant = None
    else:
        raise WatlowProtocolUnsupportedError(
            f"session has unsupported protocol kind {kind!r}",
            context=self._error_context(command, request),
        )

    # Cache key. We key on ``command_name:parameter_id`` for
    # registry-driven commands so that one ``read_parameter("foo")``
    # rejection doesn't sticky-block every other parameter; bare
    # commands fall back to ``command.name``.
    cache_key = self._cache_key(command, request)

    cached = self._availability.get(cache_key, Availability.UNKNOWN)
    if cached is Availability.UNSUPPORTED:
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} is unsupported on this device",
            context=self._error_context(command, request),
        )

    # Safety gate: PERSISTENT writes need explicit confirm.
    if command.safety is SafetyTier.PERSISTENT and not confirm:
        raise WatlowConfirmationRequiredError(
            f"command {command.name!r} is PERSISTENT and requires confirm=True",
            context=self._error_context(command, request),
        )

    ctx = CommandContext(
        registry=self._registry,
        family=self._family,
        address=self._address,
        port=self._port,
    )

    bound_timeout = timeout if timeout is not None else DEFAULTS.io_timeout_s

    # Encode under the variant. Errors here are pre-I/O — typically
    # validation failures — and should propagate untouched.
    # Exactly one of ``stdbus_variant`` / ``modbus_variant`` is
    # non-None per the resolution above; the type narrowing is
    # explicit so neither mypy nor pyright needs an ``assert`` it
    # can't enforce at runtime under ``-O``.
    wire_request: Any
    if stdbus_variant is not None:
        wire_request = stdbus_variant.encode(ctx, request)
    elif modbus_variant is not None:
        wire_request = modbus_variant.encode(ctx, request)
    else:  # pragma: no cover — variant resolution above guarantees one is set
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} variant resolution lost",
            context=self._error_context(command, request),
        )

    started = time.monotonic()
    # Hold the per-port client lock only for the I/O turn-around.
    # Decode is CPU-only and does not need to block the next
    # request waiting on the same RS-485 segment; ``reply`` is
    # snapshotted before the lock releases. ``maybe_acquire``
    # reuses an existing acquisition when the caller (e.g.
    # ``poll_controller``'s tick batch, or
    # ``WatlowManager._run_group``'s port batch) already holds
    # the lock — avoids the FIFO-queue starvation that would
    # otherwise let unrelated writers interleave between a
    # batch's reads.
    async with maybe_acquire(self._client.lock):
        try:
            reply = await self._client.execute(
                wire_request,
                address=self._address,
                timeout=bound_timeout,
                command_name=command.name,
            )
        except (
            WatlowNoSuchObjectError,
            WatlowNoSuchAttributeError,
            WatlowProtocolUnsupportedError,
        ) as exc:
            self._availability[cache_key] = Availability.UNSUPPORTED
            self._record_error(exc)
            _log.warning(
                "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise
        except WatlowProtocolError as exc:
            self._record_error(exc)
            raise
        except WatlowError as exc:
            self._record_error(exc)
            _log.warning(
                "command error: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise

    # Decode outside the lock — pure compute on the captured reply.
    try:
        if stdbus_variant is not None:
            response = stdbus_variant.decode(reply, ctx)
        else:
            # ``modbus_variant is not None`` per the resolution above;
            # mypy/pyright follow the narrowing without an ``assert``.
            response = modbus_variant.decode(reply, ctx, request)  # type: ignore[union-attr]
    except (
        WatlowNoSuchObjectError,
        WatlowNoSuchAttributeError,
        WatlowProtocolUnsupportedError,
    ) as exc:
        # Decode-side "we don't have this": same availability
        # transition as the wire-side rejection above.
        self._availability[cache_key] = Availability.UNSUPPORTED
        self._record_error(exc)
        _log.warning(
            "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
            kind.value,
            command.name,
            cache_key,
            exc,
        )
        raise
    except WatlowProtocolError as exc:
        # Decode-failure parity with the inside-lock branch above:
        # NoSuchInstance / IllegalDataValue / generic decode errors
        # don't transition availability per design §5b.
        self._record_error(exc)
        raise

    elapsed = time.monotonic() - started
    self._availability[cache_key] = Availability.SUPPORTED
    _log.debug(
        "session exec ok protocol=%s cmd=%s key=%s elapsed=%.4fs",
        kind.value,
        command.name,
        cache_key,
        elapsed,
    )
    return response

invalidate_comms_unit_label

invalidate_comms_unit_label()

Drop the cached 17050 value so the next read re-queries the device.

Source code in src/watlowlib/devices/session.py
def invalidate_comms_unit_label(self) -> None:
    """Drop the cached 17050 value so the next read re-queries the device."""
    self._comms_unit_label = None
    self._comms_unit_label_loaded = False

set_wire_temperature_unit

set_wire_temperature_unit(unit)

Set the wire temperature scale from an authoritative source.

Called by an identity strategy that has read the device's own unit register (e.g. the Series SD reg 18). Unlike the PM path — where the unit register can lie, so the value stays a user assertion — this is the device telling us its comms scale directly, so it is honest to adopt it. Resets the one-shot "trusting user-asserted unit" warning since this value is device-sourced, not user-asserted.

Source code in src/watlowlib/devices/session.py
def set_wire_temperature_unit(self, unit: Unit | None) -> None:
    """Set the wire temperature scale from an authoritative source.

    Called by an identity strategy that has read the device's own
    unit register (e.g. the Series SD reg 18). Unlike the PM path —
    where the unit register can lie, so the value stays a user
    assertion — this is the device telling us its comms scale
    directly, so it is honest to adopt it. Resets the one-shot
    "trusting user-asserted unit" warning since this value is
    device-sourced, not user-asserted.
    """
    self._wire_temperature_unit = unit
    self._wire_temperature_unit_warned = True

wire_temperature_unit

wire_temperature_unit()

Return the user-asserted scale of temperature values on the wire.

This is what :class:Reading.unit / :class:Sample.unit get tagged with for temperature parameters. None when the user did not pass assert_wire_temperature_unit= to :func:watlowlib.open_device; in that case temperature readings carry unit=None rather than guess.

Pure accessor — no I/O. Logs a one-shot WARN the first time an asserted value is consumed so the user-assertion shows up plainly in capture logs.

Source code in src/watlowlib/devices/session.py
def wire_temperature_unit(self) -> Unit | None:
    """Return the user-asserted scale of temperature values on the wire.

    This is what :class:`Reading.unit` / :class:`Sample.unit` get
    tagged with for temperature parameters. ``None`` when the user
    did not pass ``assert_wire_temperature_unit=`` to
    :func:`watlowlib.open_device`; in that case temperature
    readings carry ``unit=None`` rather than guess.

    Pure accessor — no I/O. Logs a one-shot WARN the first time
    an asserted value is consumed so the user-assertion shows up
    plainly in capture logs.
    """
    if self._wire_temperature_unit is not None and not self._wire_temperature_unit_warned:
        _log.warning(
            "trusting user-asserted wire temperature unit %s for port=%s "
            "address=%s; not independently verified by the library "
            "(parameter 17050 is label-only on at least one PM firmware)",
            self._wire_temperature_unit.value,
            self._port,
            self._address,
        )
        self._wire_temperature_unit_warned = True
    return self._wire_temperature_unit

SqliteSink

SqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
)

Append-only SQLite writer with WAL journaling and first-batch schema lock.

Attributes:

Name Type Description
path Path

Destination SQLite file. Created on :meth:open.

table str

Target table name.

columns tuple[ColumnSpec, ...] | None

The locked :class:ColumnSpec tuple, or None before the first flush.

Source code in src/watlowlib/sinks/sqlite.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
) -> None:
    self._path = Path(path)
    self._table = _validate_identifier(table, label="table name")
    self._create_table = create_table
    self._journal_mode: _JournalMode = journal_mode
    self._synchronous: _Synchronous = synchronous
    if busy_timeout_ms < 0:
        raise ValueError(f"busy_timeout_ms must be >= 0, got {busy_timeout_ms!r}")
    self._busy_timeout_ms = busy_timeout_ms
    self._conn: sqlite3.Connection | None = None
    self._schema = SchemaLock(sink_name="sqlite", logger=_logger)
    self._insert_sql: str | None = None

columns property

columns

Locked columns in order, or None before first :meth:write_many.

path property

path

Destination SQLite file path.

table property

table

Target table name (validated).

close async

close()

Close the connection. Idempotent.

Source code in src/watlowlib/sinks/sqlite.py
async def close(self) -> None:
    """Close the connection. Idempotent."""
    if self._conn is None:
        return
    conn = self._conn
    self._conn = None
    try:
        await run_sync(conn.close)
    finally:
        _logger.info(
            "sinks.sqlite.close path=%s table=%s",
            str(self._path),
            self._table,
        )

open async

open()

Open the SQLite connection, apply PRAGMAs, and introspect the target.

Idempotent: calling :meth:open on an already-open sink is a no-op. Runs in a worker thread because sqlite3.connect and PRAGMA execution are blocking I/O.

Source code in src/watlowlib/sinks/sqlite.py
async def open(self) -> None:
    """Open the SQLite connection, apply PRAGMAs, and introspect the target.

    Idempotent: calling :meth:`open` on an already-open sink is a
    no-op. Runs in a worker thread because ``sqlite3.connect`` and
    ``PRAGMA`` execution are blocking I/O.
    """
    if self._conn is not None:
        return
    self._conn = await run_sync(self._connect_blocking)
    _logger.info(
        "sinks.sqlite.open path=%s table=%s journal_mode=%s synchronous=%s",
        str(self._path),
        self._table,
        self._journal_mode,
        self._synchronous,
    )
    if not self._create_table:
        try:
            await run_sync(self._introspect_existing_table_blocking)
        except BaseException:
            # Introspection raised (most commonly WatlowSinkSchemaError on
            # a missing table). Release the connection so we don't leak a
            # resource on a failed open.
            conn = self._conn
            self._conn = None
            await run_sync(conn.close)
            raise

write_many async

write_many(samples)

Append samples as rows in a single transaction.

On the first call (when create_table=True), infers the schema from the batch and runs CREATE TABLE IF NOT EXISTS. Subsequent calls insert directly. All values pass through ? placeholders — never string-formatted into SQL.

Source code in src/watlowlib/sinks/sqlite.py
async def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` as rows in a single transaction.

    On the first call (when ``create_table=True``), infers the
    schema from the batch and runs ``CREATE TABLE IF NOT EXISTS``.
    Subsequent calls insert directly. All values pass through
    ``?`` placeholders — never string-formatted into SQL.
    """
    if self._conn is None:
        raise RuntimeError("SqliteSink: write_many called before open()")
    if not samples:
        return

    rows = [sample_to_row(s) for s in samples]

    if not self._schema.is_locked and self._create_table:
        # else (create_table=False): _introspect_existing_table_blocking
        # already ran in open().
        self._schema.lock(rows)
        await run_sync(self._create_table_blocking)
        self._insert_sql = self._build_insert_sql()

    assert self._insert_sql is not None  # noqa: S101 — narrow for type checker
    columns = self._schema.columns
    assert columns is not None  # noqa: S101

    projected: list[tuple[object, ...]] = []
    for row in rows:
        fields = self._schema.project(row)
        projected.append(tuple(fields[spec.name] for spec in columns))

    await run_sync(self._executemany_blocking, projected)

Transport

Bases: Protocol

Byte-level transport.

Every I/O boundary takes an explicit timeout. On expiry, implementations raise :class:watlowlib.errors.WatlowTimeoutError — never return an empty or partial bytes silently. Backend exceptions normalise to :class:watlowlib.errors.WatlowTransportError (or a subclass) with __cause__ preserving the original exception.

Lifecycle is single-shot: :meth:open once, :meth:close once.

is_open property

is_open

Whether :meth:open has run without a matching :meth:close.

label property

label

Short identifier (port path, "fake://...") used in errors.

close async

close()

Close the underlying port. Safe to call when already closed.

Source code in src/watlowlib/transport/base.py
async def close(self) -> None:
    """Close the underlying port. Safe to call when already closed."""
    ...

drain_input async

drain_input()

Discard any buffered input bytes. Best-effort; never raises.

Source code in src/watlowlib/transport/base.py
async def drain_input(self) -> None:
    """Discard any buffered input bytes. Best-effort; never raises."""
    ...

open async

open()

Open the underlying port. Re-open on an already-open transport is an error.

Source code in src/watlowlib/transport/base.py
async def open(self) -> None:
    """Open the underlying port. Re-open on an already-open transport is an error."""
    ...

read_available async

read_available(*, idle_timeout, max_bytes=None)

Read until the line goes idle for idle_timeout seconds.

Never raises on idle expiry — an idle timeout is the expected exit. Returns whatever was accumulated (possibly empty). Used for best-effort drain and ProtocolKind.AUTO probe gaps.

Source code in src/watlowlib/transport/base.py
async def read_available(
    self,
    *,
    idle_timeout: float,
    max_bytes: int | None = None,
) -> bytes:
    """Read until the line goes idle for ``idle_timeout`` seconds.

    Never raises on idle expiry — an idle timeout is the *expected*
    exit. Returns whatever was accumulated (possibly empty). Used
    for best-effort drain and ``ProtocolKind.AUTO`` probe gaps.
    """
    ...

read_exact async

read_exact(n, *, timeout)

Read exactly n bytes.

Raises :class:watlowlib.errors.WatlowTimeoutError if fewer than n bytes arrive before timeout. Partial buffers are retained for the next call — implementations must not discard them.

Source code in src/watlowlib/transport/base.py
async def read_exact(self, n: int, *, timeout: float) -> bytes:
    """Read exactly ``n`` bytes.

    Raises :class:`watlowlib.errors.WatlowTimeoutError` if fewer
    than ``n`` bytes arrive before ``timeout``. Partial buffers are
    retained for the next call — implementations must not discard
    them.
    """
    ...

write async

write(data, *, timeout)

Write every byte of data. Bounded by timeout.

Source code in src/watlowlib/transport/base.py
async def write(self, data: bytes, *, timeout: float) -> None:
    """Write every byte of ``data``. Bounded by ``timeout``."""
    ...

Unit

Bases: StrEnum

Concrete display unit attached to a :class:Reading value.

UnitKind

Bases: StrEnum

Structural unit family of a parameter, as declared by the registry.

Maps to a concrete :class:Unit at read time via :func:resolve_unit. TEMPERATURE resolves to °C or °F depending on the device's comms display setting (parameter 17050); the rest are independent of device state.

WatlowCapabilityError

WatlowCapabilityError(message='', *, context=None)

Bases: WatlowError

Command is not available on this device / firmware / family.

Reserved for the planned capability-gate hierarchy. The library does not currently raise this — capability mismatches surface as :class:WatlowConfigurationError today (see commands/loop.py).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowCapabilityWarning

Bases: UserWarning

Reserved warning class — not currently emitted.

Planned use is non-strict family-prior mismatches (attempt the command, warn, update availability from the device's response). The library does not currently emit this; the warning class is exported so callers can pre-register filters.

WatlowConfigurationError

WatlowConfigurationError(message='', *, context=None)

Bases: WatlowError

Configuration-level error (bad args, conflicting settings).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowConfirmationRequiredError

WatlowConfirmationRequiredError(
    message="", *, context=None
)

Bases: WatlowConfigurationError

A PERSISTENT command was attempted without confirm=True.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowConnectionError

WatlowConnectionError(message='', *, context=None)

Bases: WatlowTransportError

Could not open / lost the connection to the device.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowDeviceSnapshot dataclass

WatlowDeviceSnapshot(
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
    family,
    capabilities,
    availability_summary=(lambda: {})(),
)

Bases: DeviceSnapshot

Watlow-specific extension of :class:DeviceSnapshot.

Attributes:

Name Type Description
family ControllerFamily | None

Decoded :class:ControllerFamily, or None before :meth:identify has run.

capabilities Capability

SKU-decoded :class:Capability flags.

availability_summary Mapping[str, Availability]

Frozen mapping of command names that the session has marked :attr:Availability.UNSUPPORTED. The mapping is bounded by the parameter registry size (typically small).

WatlowError

WatlowError(message='', *, context=None)

Bases: Exception

Base class for every :mod:watlowlib exception.

Carries a typed :class:ErrorContext. The message is the human-readable summary; the context is the machine-readable detail.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

with_context

with_context(**updates)

Return a copy of this error with its context updated.

Useful when an inner layer raises and an outer layer wants to enrich the context (for instance adding port or elapsed_s).

Source code in src/watlowlib/errors.py
def with_context(self, **updates: Any) -> Self:
    """Return a copy of this error with its context updated.

    Useful when an inner layer raises and an outer layer wants to enrich
    the context (for instance adding ``port`` or ``elapsed_s``).
    """
    cls = type(self)
    new = cls.__new__(cls)
    new.args = self.args
    try:
        new.__dict__.update(self.__dict__)
    except AttributeError:  # pragma: no cover — no slotted subclass today
        for slot in getattr(cls, "__slots__", ()):
            if hasattr(self, slot):
                object.__setattr__(new, slot, getattr(self, slot))
    new.context = self.context.merged(**updates)
    new.__cause__ = self.__cause__
    new.__context__ = self.__context__
    new.__traceback__ = self.__traceback__
    return new

WatlowFirmwareError

WatlowFirmwareError(message='', *, context=None)

Bases: WatlowCapabilityError

Command is outside the supported firmware window.

Reserved alongside :class:WatlowCapabilityError; not currently emitted by the library.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowFrameError

WatlowFrameError(message='', *, context=None)

Bases: WatlowProtocolError

Bad CRC, bad length, malformed framing, etc.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowManager

WatlowManager(*, error_policy=ErrorPolicy.RAISE)

Coordinator for many controllers across one or more serial ports.

Operations run concurrently across different physical ports (via :func:anyio.create_task_group) and serialise on the same-port client lock. Per-controller failures are surfaced per :attr:error_policy:

  • :attr:ErrorPolicy.RAISE: the manager still collects results from every controller, then raises an :class:ExceptionGroup if any failed.
  • :attr:ErrorPolicy.RETURN: per-name :class:DeviceResult containers carry .value or .error.

Usage::

async with WatlowManager() as mgr:
    await mgr.add("ctl1", "/dev/ttyUSB0", address=1)
    await mgr.add("ctl2", "/dev/ttyUSB1", address=1)
    samples = await mgr.poll_many(["process_value", "setpoint"])
Source code in src/watlowlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    self._error_policy = error_policy
    self._devices: dict[str, _DeviceEntry] = {}
    self._ports: dict[str, _PortEntry] = {}
    self._state_lock = anyio.Lock()
    self._closed = False

closed property

closed

True once :meth:close has been called.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed controller names.

add async

add(
    name,
    source,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
    profile=EZZONE_PROFILE,
    assert_wire_temperature_unit=None,
)

Register and open a controller under name.

The source discriminates lifecycle ownership:

  • :class:Controller — pre-built (via :func:watlowlib.open_device outside the manager). The manager only tracks the name mapping; it does not take lifecycle ownership.
  • str — serial port path ("/dev/ttyUSB0", "COM3"). The manager creates a transport, canonicalises the port key, and shares the transport + client across controllers on the same bus. Mixing Std Bus and Modbus on a shared physical port is refused; one serial link has one active protocol.
  • :class:Transport — duck-typed transport. The manager builds a session against it but does not take transport ownership.

Parameters:

Name Type Description Default
name str

Unique manager-level identifier.

required
source Controller | str | Transport

One of the three lifecycle shapes above.

required
protocol ProtocolKind

Wire protocol (STDBUS or MODBUS_RTU). Ignored when source is a pre-built :class:Controller. AUTO is rejected — open the controller via :func:open_device first and register the resulting :class:Controller.

STDBUS
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247.

1
serial_settings SerialSettings | None

Override default serial framing. Only honoured when source is a port-string.

None
profile DeviceProfile

Device profile to open against. Defaults to :data:~watlowlib.devices.profile.EZZONE_PROFILE (EZ-ZONE PM). Pass :data:~watlowlib.devices.profile.SERIES_SD_PROFILE so a rig can mix an SD and a PM on different ports. Ignored when source is a pre-built :class:Controller (which already carries its own profile).

EZZONE_PROFILE
assert_wire_temperature_unit Unit | str | None

Same semantics as :func:watlowlib.open_device — :class:Reading.unit / :class:Sample.unit for temperature parameters get this value. None means temperature readings carry unit=None. Ignored when source is a pre-built :class:Controller (which already carries its own assertion from the open call).

None

Returns:

Type Description
Controller

The opened :class:Controller.

Raises:

Type Description
WatlowValidationError

name already exists or an invalid combination of kwargs was supplied.

WatlowConfigurationError

protocol mismatches an existing lock on the same port, or protocol=AUTO.

WatlowConnectionError

Manager is closed.

Source code in src/watlowlib/manager.py
async def add(
    self,
    name: str,
    source: Controller | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    profile: DeviceProfile = EZZONE_PROFILE,
    assert_wire_temperature_unit: Unit | str | None = None,
) -> Controller:
    """Register and open a controller under ``name``.

    The ``source`` discriminates lifecycle ownership:

    - :class:`Controller` — pre-built (via
      :func:`watlowlib.open_device` outside the manager). The
      manager only tracks the name mapping; it does *not* take
      lifecycle ownership.
    - ``str`` — serial port path (``"/dev/ttyUSB0"``, ``"COM3"``).
      The manager creates a transport, canonicalises the port key,
      and shares the transport + client across controllers on the
      same bus. Mixing Std Bus and Modbus on a shared physical
      port is refused; one serial link has one active protocol.
    - :class:`Transport` — duck-typed transport. The manager builds
      a session against it but does *not* take transport ownership.

    Args:
        name: Unique manager-level identifier.
        source: One of the three lifecycle shapes above.
        protocol: Wire protocol (``STDBUS`` or ``MODBUS_RTU``).
            Ignored when ``source`` is a pre-built :class:`Controller`.
            ``AUTO`` is rejected — open the controller via
            :func:`open_device` first and register the resulting
            :class:`Controller`.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``.
        serial_settings: Override default serial framing. Only
            honoured when ``source`` is a port-string.
        profile: Device profile to open against. Defaults to
            :data:`~watlowlib.devices.profile.EZZONE_PROFILE`
            (EZ-ZONE PM). Pass
            :data:`~watlowlib.devices.profile.SERIES_SD_PROFILE` so a
            rig can mix an SD and a PM on different ports. Ignored
            when ``source`` is a pre-built :class:`Controller`
            (which already carries its own profile).
        assert_wire_temperature_unit: Same semantics as
            :func:`watlowlib.open_device` —
            :class:`Reading.unit` / :class:`Sample.unit` for
            temperature parameters get this value. ``None``
            means temperature readings carry ``unit=None``.
            Ignored when ``source`` is a pre-built
            :class:`Controller` (which already carries its own
            assertion from the open call).

    Returns:
        The opened :class:`Controller`.

    Raises:
        WatlowValidationError: ``name`` already exists or an
            invalid combination of kwargs was supplied.
        WatlowConfigurationError: protocol mismatches an existing
            lock on the same port, or ``protocol=AUTO``.
        WatlowConnectionError: Manager is closed.
    """
    async with self._state_lock:
        self._check_open()
        if name in self._devices:
            raise WatlowValidationError(
                f"manager: name {name!r} already in use",
                context=ErrorContext(address=address),
            )
        if serial_settings is not None and not isinstance(source, str):
            raise WatlowValidationError(
                "manager.add(serial_settings=...) only applies to string port "
                "sources; pre-built Transport / Controller carry their own settings",
            )

        from watlowlib.devices.factory import coerce_wire_temperature_unit  # noqa: PLC0415

        wire_unit = coerce_wire_temperature_unit(assert_wire_temperature_unit)

        port_key, port_entry, controller = await self._resolve_source(
            source,
            protocol=protocol,
            address=address,
            serial_settings=serial_settings,
            profile=profile,
            wire_temperature_unit=wire_unit,
        )

        self._devices[name] = _DeviceEntry(
            name=name,
            controller=controller,
            port_key=port_key,
        )
        if port_entry is not None:
            port_entry.refs.add(name)

        _logger.info(
            "manager.add device_name=%s port_key=%s protocol=%s address=%s",
            name,
            port_key,
            controller.session.protocol_kind.value,
            controller.session.address,
        )
        return controller

close async

close()

Tear down every managed controller and port (LIFO).

Per-device teardown errors are collected; if any occurred, they are raised after the close completes as an :class:ExceptionGroup. This makes explicit await mgr.close() calls fail loud on resource leaks. The async-CM exit path swallows the errors instead so an in-flight exception still wins (see :meth:__aexit__).

Source code in src/watlowlib/manager.py
async def close(self) -> None:
    """Tear down every managed controller and port (LIFO).

    Per-device teardown errors are collected; if any occurred,
    they are raised after the close completes as an
    :class:`ExceptionGroup`. This makes explicit ``await mgr.close()``
    calls fail loud on resource leaks. The async-CM exit path
    swallows the errors instead so an in-flight exception still
    wins (see :meth:`__aexit__`).
    """
    await self._close(suppress_errors=False)

execute_each async

execute_each(op, names=None)

Run op(controller) on every (or named) controller concurrently.

General-purpose dispatcher used for cross-device snapshots (identify, read_pid, etc.) where each controller runs the same coroutine and the result is keyed by name. Cross-port runs concurrently; same-port serialises on the shared client lock.

Under :attr:ErrorPolicy.RAISE the method still returns a complete result mapping but re-raises an :class:ExceptionGroup of every per-device error after the task group joins.

Source code in src/watlowlib/manager.py
async def execute_each[T](
    self,
    op: Callable[[Controller], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Run ``op(controller)`` on every (or named) controller concurrently.

    General-purpose dispatcher used for cross-device snapshots
    (``identify``, ``read_pid``, etc.) where each controller runs
    the same coroutine and the result is keyed by name. Cross-port
    runs concurrently; same-port serialises on the shared client
    lock.

    Under :attr:`ErrorPolicy.RAISE` the method still returns a
    complete result mapping but re-raises an :class:`ExceptionGroup`
    of every per-device error after the task group joins.
    """
    targets = self._resolve_names(names)
    groups = self._group_by_port(targets)
    results: dict[str, DeviceResult[T]] = {}
    errors: list[WatlowError] = []
    result_lock = anyio.Lock()

    async def _run_group(member_names: list[str]) -> None:
        for member in member_names:
            entry = self._devices[member]
            controller = entry.controller
            try:
                value = await op(controller)
            except WatlowError as err:
                async with result_lock:
                    results[member] = DeviceResult.failure(err)
                    errors.append(err)
            else:
                async with result_lock:
                    results[member] = DeviceResult(value=value, error=None)

    async with anyio.create_task_group() as tg:
        for member_names in groups.values():
            _ = tg.start_soon(_run_group, member_names)

    if self._error_policy is ErrorPolicy.RAISE and errors:
        raise ExceptionGroup("manager.execute_each: one or more controllers failed", errors)
    return results

get

get(name)

Return the controller registered under name.

Source code in src/watlowlib/manager.py
def get(self, name: str) -> Controller:
    """Return the controller registered under ``name``."""
    try:
        return self._devices[name].controller
    except KeyError:
        raise WatlowValidationError(
            f"manager: no controller named {name!r}",
        ) from None

poll async

poll(names=None, *, instance=1)

Read the active process value on every (or named) controller.

The canonical no-arg snapshot — aligns with the ecosystem Manager.poll() shape shared by alicatlib.AlicatManager, sartoriuslib.SartoriusManager, and nidaqlib.DaqManager: one :class:DeviceResult per device, keyed by name. Cross-port reads run concurrently; same-port reads serialise on the shared client lock.

For multi-parameter / multi-instance polling use :meth:poll_many.

Source code in src/watlowlib/manager.py
async def poll(
    self,
    names: Sequence[str] | None = None,
    *,
    instance: int = 1,
) -> Mapping[str, DeviceResult[Reading]]:
    """Read the active process value on every (or named) controller.

    The canonical no-arg snapshot — aligns with the ecosystem
    ``Manager.poll()`` shape shared by ``alicatlib.AlicatManager``,
    ``sartoriuslib.SartoriusManager``, and
    ``nidaqlib.DaqManager``: one :class:`DeviceResult` per device,
    keyed by name. Cross-port reads run concurrently; same-port
    reads serialise on the shared client lock.

    For multi-parameter / multi-instance polling use :meth:`poll_many`.
    """
    targets = self._resolve_names(names)
    groups = self._group_by_port(targets)
    results: dict[str, DeviceResult[Reading]] = {}
    result_lock = anyio.Lock()

    async def _run_group(member_names: list[str]) -> None:
        for member in member_names:
            entry = self._devices[member]
            try:
                reading = await entry.controller.read_pv(instance=instance)
            except WatlowError as err:
                async with result_lock:
                    results[member] = DeviceResult.failure(err)
            else:
                async with result_lock:
                    results[member] = DeviceResult(value=reading, error=None)

    async with anyio.create_task_group() as tg:
        for member_names in groups.values():
            _ = tg.start_soon(_run_group, member_names)

    return results

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Poll every (or named) controller concurrently across ports.

Returns a flat list of :class:Sample — one per (device, parameter, instance) read that succeeded. Failed reads are dropped from the list and logged at WARN. Cross-port reads run concurrently; same-port reads serialise on the shared client lock, which is acquired once per port-group batch so a queued writer cannot land between two reads of the same poll. Lock occupancy therefore scales O(devices × parameters × per- read time) per port — the trade-off for a coherent multi- device snapshot.

This satisfies the :class:watlowlib.streaming.PollSource Protocol so a manager can drive :func:watlowlib.streaming.record directly.

Source code in src/watlowlib/manager.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Poll every (or named) controller concurrently across ports.

    Returns a flat list of :class:`Sample` — one per (device,
    parameter, instance) read that succeeded. Failed reads are
    dropped from the list and logged at WARN. Cross-port reads run
    concurrently; same-port reads serialise on the shared client
    lock, which is acquired **once per port-group batch** so a
    queued writer cannot land between two reads of the same poll.
    Lock occupancy therefore scales O(devices × parameters × per-
    read time) per port — the trade-off for a coherent multi-
    device snapshot.

    This satisfies the :class:`watlowlib.streaming.PollSource`
    Protocol so a manager can drive :func:`watlowlib.streaming.record`
    directly.
    """
    targets = self._resolve_names(names)
    groups = self._group_by_port(targets)

    result_lock = anyio.Lock()
    all_samples: list[Sample] = []

    async def _run_group(member_names: list[str]) -> None:
        local: list[Sample] = []
        # All controllers in ``member_names`` share the same physical
        # port and therefore the same protocol client and lock.
        # Acquire once around the whole group so the inner
        # ``poll_controller`` (which uses ``maybe_acquire``) reuses
        # the acquisition rather than queueing per-controller.
        port_lock = self._devices[member_names[0]].controller.session.client.lock
        async with maybe_acquire(port_lock):
            for member in member_names:
                entry = self._devices[member]
                local.extend(
                    await poll_controller(
                        entry.controller,
                        name=member,
                        parameters=parameters,
                        instances=instances,
                    ),
                )
        async with result_lock:
            all_samples.extend(local)

    async with anyio.create_task_group() as tg:
        for member_names in groups.values():
            _ = tg.start_soon(_run_group, member_names)

    return all_samples

remove async

remove(name)

Unregister and close the controller named name.

If name was the last controller on a shared port, the transport for that port is closed too. A pre-built :class:Controller source is only dropped from the manager's registry — the caller retains lifecycle ownership.

Source code in src/watlowlib/manager.py
async def remove(self, name: str) -> None:
    """Unregister and close the controller named ``name``.

    If ``name`` was the last controller on a shared port, the
    transport for that port is closed too. A pre-built
    :class:`Controller` source is only dropped from the manager's
    registry — the caller retains lifecycle ownership.
    """
    async with self._state_lock:
        self._check_open()
        if name not in self._devices:
            raise WatlowValidationError(
                f"manager: no controller named {name!r}",
            )
        entry = self._devices.pop(name)
        await self._teardown_device(entry)
        _logger.info("manager.remove device_name=%s", name)

WatlowModbusError

WatlowModbusError(message='', *, context=None)

Bases: WatlowProtocolError

Base class for Modbus-layer errors.

Wraps every anymodbus exception so callers see one error hierarchy regardless of protocol. __cause__ preserves the original anymodbus exception for callers that need it.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowModbusIllegalDataAddressError

WatlowModbusIllegalDataAddressError(
    message="", *, context=None
)

Bases: WatlowModbusError, WatlowProtocolUnsupportedError

Modbus exception 0x02 — register address not allowed.

Maps to :class:Availability.UNSUPPORTED in the session cache.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowModbusIllegalDataValueError

WatlowModbusIllegalDataValueError(
    message="", *, context=None
)

Bases: WatlowModbusError

Modbus exception 0x03 — bad argument, not absence.

Does not affect availability (the parameter exists; the write value was simply rejected).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowModbusIllegalFunctionError

WatlowModbusIllegalFunctionError(
    message="", *, context=None
)

Bases: WatlowModbusError, WatlowProtocolUnsupportedError

Modbus exception 0x01 — slave does not implement the function.

Maps to :class:Availability.UNSUPPORTED in the session cache. Inherits :class:WatlowProtocolUnsupportedError so the session's sticky-unsupported handling treats it like Std Bus 0x81 / 0x83.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowModbusSlaveFailureError

WatlowModbusSlaveFailureError(message='', *, context=None)

Bases: WatlowModbusError

Modbus exception 0x04 — unrecoverable slave-side error.

Does not affect availability — non-response is not a refusal of the parameter (per design §5b table).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowModbusTimeoutError

WatlowModbusTimeoutError(message='', *, context=None)

Bases: WatlowModbusError, WatlowTimeoutError

Modbus request timed out at the protocol layer.

Inherits :class:WatlowTimeoutError so callers with existing timeout-handling code see this as a transport timeout. Does not affect availability.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowNoSuchAttributeError

WatlowNoSuchAttributeError(message='', *, context=None)

Bases: WatlowProtocolError

Standard Bus error 0x83 — valid class, invalid member.

Maps to :class:Availability.UNSUPPORTED in the session cache.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowNoSuchInstanceError

WatlowNoSuchInstanceError(message='', *, context=None)

Bases: WatlowProtocolError

Standard Bus error 0x84 — valid class+member, invalid instance.

The parameter exists but the requested loop / channel does not. Does not affect availability (a different instance may succeed).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowNoSuchObjectError

WatlowNoSuchObjectError(message='', *, context=None)

Bases: WatlowProtocolError

Standard Bus error 0x81 — invalid class.

The device does not expose the requested object class. Maps to :class:Availability.UNSUPPORTED in the session cache.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowProtocolError

WatlowProtocolError(message='', *, context=None)

Bases: WatlowError

Protocol-level error (framing, parsing, unrecognised reply).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowProtocolUnsupportedError

WatlowProtocolUnsupportedError(message='', *, context=None)

Bases: WatlowProtocolError

The active protocol cannot satisfy this command on this device.

Sticky for the session: subsequent attempts at the same command short-circuit with this error pre-I/O. Set on Std Bus 0x81 / 0x83 and on Modbus IllegalFunction / IllegalDataAddress per docs/design.md §5b.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowSinkDependencyError

WatlowSinkDependencyError(message='', *, context=None)

Bases: WatlowSinkError

An optional sink extra is not installed.

Raised by sinks behind a [parquet] / [postgres] extra when the backing dependency (pyarrow, asyncpg) is missing at :meth:open time. Instantiating the sink succeeds on bare-core installs; the dependency check is deferred so import-time errors don't break callers that never reach for the extra.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowSinkError

WatlowSinkError(message='', *, context=None)

Bases: WatlowError

Base class for :mod:watlowlib.sinks errors.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowSinkSchemaError

WatlowSinkSchemaError(message='', *, context=None)

Bases: WatlowSinkError

The target sink's existing schema does not match what the row carries.

Raised when a sink configured with create_table=False is pointed at a missing or incompatible backing schema.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowSinkWriteError

WatlowSinkWriteError(message='', *, context=None)

Bases: WatlowSinkError

A sink-backend write failed (CREATE TABLE, INSERT, file IO, ...).

__cause__ preserves the backend-native exception (sqlite3.Error, OSError, ...) for callers that want to inspect it.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowTimeoutError

WatlowTimeoutError(message='', *, context=None)

Bases: WatlowTransportError

A transport read or write timed out.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowTransportError

WatlowTransportError(message='', *, context=None)

Bases: WatlowError

I/O-layer error from the transport.

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

WatlowValidationError

WatlowValidationError(message='', *, context=None)

Bases: WatlowConfigurationError

Request validation failed before I/O (bad instance, bad value).

Source code in src/watlowlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

classify_family

classify_family(part_number)

Return the :class:ControllerFamily for a part-number string.

Only the leading family discriminator is parsed; per-family digit decoding is in :func:decode_part_number.

Source code in src/watlowlib/registry/families.py
def classify_family(part_number: str) -> ControllerFamily:
    """Return the :class:`ControllerFamily` for a part-number string.

    Only the leading family discriminator is parsed; per-family digit
    decoding is in :func:`decode_part_number`.
    """
    head = part_number.strip().upper()
    if head.startswith("PM"):
        return ControllerFamily.PM
    if head.startswith("RM"):
        return ControllerFamily.RM
    if head.startswith("ST"):
        return ControllerFamily.ST
    if head.startswith("F4T"):
        return ControllerFamily.F4T
    if head.startswith("SD"):
        return ControllerFamily.SD
    return ControllerFamily.UNKNOWN

find_devices async

find_devices(
    *,
    ports=None,
    addresses=None,
    baudrates=None,
    protocols=None,
    profiles=None,
    serial_template=None,
    per_probe_timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Probe local serial ports for Watlow controllers.

Parameters:

Name Type Description Default
ports Sequence[str] | None

Serial-port paths to scan. None enumerates every visible port via :func:anyserial.list_serial_ports. An empty sequence returns [] without enumeration.

None
addresses Sequence[int] | None

Bus addresses to probe per (port, baudrate, protocol) combination. Defaults to :data:DEFAULT_DISCOVERY_ADDRESSES ((1,)). Std Bus accepts 1..16; Modbus RTU accepts 1..247. Out-of-range addresses for a given protocol are still emitted as ok=False rows carrying a :class:WatlowConfigurationError.

None
baudrates Sequence[int] | None

Baud rates to try. Defaults to :data:DEFAULT_DISCOVERY_BAUDRATES.

None
protocols Sequence[ProtocolKind] | None

Wire protocols to probe. Defaults to :data:DEFAULT_DISCOVERY_PROTOCOLS. ProtocolKind.AUTO is not accepted here (one row per concrete probe). Ignored when profiles is given.

None
profiles Sequence[DeviceProfile] | None

Device profiles to probe. When given, discovery iterates profiles instead of protocols — each profile contributes its own default_protocol, factory serial framing (so the Series SD's 8-N-1 is used, not the PM Modbus 8-E-1), parameter registry, and identity strategy. Pass :data:~watlowlib.devices.profile.DEVICE_PROFILES to sweep for every known device type (PM over Std Bus + SD over Modbus). None (default) keeps the historical protocol-centric scan against the EZ-ZONE PM profile.

None
serial_template SerialSettings | None

Optional :class:SerialSettings whose parity / bytesize / stopbits / rtscts / xonxoff / exclusive fields override the per-protocol factory framing for every probe. port and baudrate are always overwritten per iteration.

None
per_probe_timeout_s float

Per-probe budget. Bounds the :meth:Controller.identify call (four bounded sub-reads) so a silent address bails after one round-trip rather than four. Defaults to 0.5 — a four-port × three-baud × two-protocol scan with one address per combo lands in ~12 s of wall-clock.

_DEFAULT_PROBE_TIMEOUT_S

Returns:

Name Type Description
One list[DiscoveryResult]

class:DiscoveryResult per (port × baudrate × protocol ×

list[DiscoveryResult]

address) tuple, in input order. The cartesian product is

list[DiscoveryResult]

iterated outermost-port, then baudrate, then protocol, then

list[DiscoveryResult]

address — same input → same output ordering.

Raises:

Type Description
WatlowConfigurationError

protocols contains :attr:ProtocolKind.AUTO, or per_probe_timeout_s is non-positive.

Notes
  • Read-only. Discovery never writes to the device; it only calls :meth:Controller.identify (four parameter reads). Safe to run on rigs that already have other software talking to the controller.
  • Per-port short-circuit. If a port fails to open with a :class:WatlowConnectionError, the rest of the scan for that port emits ok=False rows without re-attempting the open. This avoids hammering a port the kernel won't give us.
Source code in src/watlowlib/devices/discovery.py
async def find_devices(
    *,
    ports: Sequence[str] | None = None,
    addresses: Sequence[int] | None = None,
    baudrates: Sequence[int] | None = None,
    protocols: Sequence[ProtocolKind] | None = None,
    profiles: Sequence[DeviceProfile] | None = None,
    serial_template: SerialSettings | None = None,
    per_probe_timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> list[DiscoveryResult]:
    """Probe local serial ports for Watlow controllers.

    Args:
        ports: Serial-port paths to scan. ``None`` enumerates every
            visible port via :func:`anyserial.list_serial_ports`. An
            empty sequence returns ``[]`` without enumeration.
        addresses: Bus addresses to probe per (port, baudrate, protocol)
            combination. Defaults to :data:`DEFAULT_DISCOVERY_ADDRESSES`
            (``(1,)``). Std Bus accepts ``1..16``; Modbus RTU accepts
            ``1..247``. Out-of-range addresses for a given protocol are
            still emitted as ``ok=False`` rows carrying a
            :class:`WatlowConfigurationError`.
        baudrates: Baud rates to try. Defaults to
            :data:`DEFAULT_DISCOVERY_BAUDRATES`.
        protocols: Wire protocols to probe. Defaults to
            :data:`DEFAULT_DISCOVERY_PROTOCOLS`. ``ProtocolKind.AUTO``
            is not accepted here (one row per concrete probe). Ignored
            when ``profiles`` is given.
        profiles: Device profiles to probe. When given, discovery
            iterates profiles instead of ``protocols`` — each profile
            contributes its own ``default_protocol``, factory serial
            framing (so the Series SD's 8-N-1 is used, not the PM
            Modbus 8-E-1), parameter registry, and identity strategy.
            Pass :data:`~watlowlib.devices.profile.DEVICE_PROFILES` to
            sweep for every known device type (PM over Std Bus + SD over
            Modbus). ``None`` (default) keeps the historical
            protocol-centric scan against the EZ-ZONE PM profile.
        serial_template: Optional :class:`SerialSettings` whose
            ``parity`` / ``bytesize`` / ``stopbits`` / ``rtscts`` /
            ``xonxoff`` / ``exclusive`` fields override the
            per-protocol factory framing for every probe. ``port``
            and ``baudrate`` are always overwritten per iteration.
        per_probe_timeout_s: Per-probe budget. Bounds the
            :meth:`Controller.identify` call (four bounded sub-reads)
            so a silent address bails after one round-trip rather than
            four. Defaults to ``0.5`` — a four-port × three-baud ×
            two-protocol scan with one address per combo lands in
            ~12 s of wall-clock.

    Returns:
        One :class:`DiscoveryResult` per (port × baudrate × protocol ×
        address) tuple, in input order. The cartesian product is
        iterated outermost-port, then baudrate, then protocol, then
        address — same input → same output ordering.

    Raises:
        WatlowConfigurationError: ``protocols`` contains
            :attr:`ProtocolKind.AUTO`, or ``per_probe_timeout_s`` is
            non-positive.

    Notes:
        - **Read-only.** Discovery never writes to the device; it
          only calls :meth:`Controller.identify` (four parameter
          reads). Safe to run on rigs that already have other
          software talking to the controller.
        - **Per-port short-circuit.** If a port fails to open with a
          :class:`WatlowConnectionError`, the rest of the scan for
          that port emits ``ok=False`` rows without re-attempting the
          open. This avoids hammering a port the kernel won't give us.
    """
    if per_probe_timeout_s <= 0:
        from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415 — cold path

        raise WatlowConfigurationError(
            f"per_probe_timeout_s must be positive; got {per_probe_timeout_s!r}",
        )

    resolved_ports = await _resolve_ports(ports)
    resolved_addresses = tuple(addresses) if addresses is not None else DEFAULT_DISCOVERY_ADDRESSES
    resolved_baudrates = tuple(baudrates) if baudrates is not None else DEFAULT_DISCOVERY_BAUDRATES

    # Build the per-probe plan: one entry per (protocol, profile,
    # framing_base). ``profiles`` (if given) drives the device type,
    # protocol, and factory framing per probe; otherwise we keep the
    # historical protocol-centric scan against the EZ-ZONE PM profile,
    # taking framing from ``SerialSettings.factory_for`` (None below).
    plan: tuple[tuple[ProtocolKind, DeviceProfile, SerialSettings | None], ...]
    if profiles is not None:
        plan = tuple((p.default_protocol, p, p.default_serial) for p in profiles)
    else:
        resolved_protocols = (
            tuple(protocols) if protocols is not None else DEFAULT_DISCOVERY_PROTOCOLS
        )
        if ProtocolKind.AUTO in resolved_protocols:
            from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415 — cold path

            raise WatlowConfigurationError(
                "find_devices does not accept ProtocolKind.AUTO; pass concrete "
                "protocols (STDBUS, MODBUS_RTU). Auto-detection is a single-port "
                "API on open_device.",
            )
        plan = tuple((protocol, EZZONE_PROFILE, None) for protocol in resolved_protocols)

    results: list[DiscoveryResult] = []
    dead_ports: set[str] = set()
    for port in resolved_ports:
        for baud in resolved_baudrates:
            for protocol, profile, framing_base in plan:
                if port in dead_ports:
                    # Emit one row per planned address so callers see
                    # the same cartesian-product shape regardless of
                    # whether the port opened.
                    error = WatlowConnectionError(
                        f"port {port!r} previously failed to open in this scan",
                        context=ErrorContext(
                            port=port,
                            protocol=protocol,
                        ),
                    )
                    results.extend(
                        DiscoveryResult(
                            ok=False,
                            port=port,
                            address=address,
                            baudrate=baud,
                            protocol=protocol,
                            device_info=None,
                            error=error,
                            elapsed_s=0.0,
                        )
                        for address in resolved_addresses
                    )
                    continue

                settings = _build_settings(
                    port=port,
                    baudrate=baud,
                    protocol=protocol,
                    template=serial_template,
                    framing_base=framing_base,
                )
                rows, port_died = await _probe_combo(
                    port=port,
                    baudrate=baud,
                    protocol=protocol,
                    profile=profile,
                    addresses=resolved_addresses,
                    serial_settings=settings,
                    timeout_s=per_probe_timeout_s,
                )
                results.extend(rows)
                if port_died:
                    dead_ports.add(port)
    return results

open_device async

open_device(
    port,
    *,
    profile=EZZONE_PROFILE,
    protocol=None,
    address=1,
    serial_settings=None,
    assert_wire_temperature_unit=None,
    identify=True,
)

Open a controller on a serial port.

Parameters:

Name Type Description Default
port str

Serial-port path (/dev/ttyUSB0, COM3, ...).

required
profile DeviceProfile

The device profile to open against. Defaults to :data:~watlowlib.devices.profile.EZZONE_PROFILE (EZ-ZONE PM), which preserves all historical behaviour. Pass :data:~watlowlib.devices.profile.SERIES_SD_PROFILE for a Series SD. The profile supplies the default protocol, factory serial framing, parameter registry, identity strategy, and (for the SD) the wire temperature unit.

EZZONE_PROFILE
protocol ProtocolKind | None

Wire protocol. None (default) adopts profile.default_protocol (Std Bus for PM, Modbus RTU for SD). STDBUS / MODBUS_RTU open directly; AUTO runs the conservative detector (Std Bus → Modbus → fail) per docs/design.md §7.

None
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247. Under AUTO the same address is tried against both probes.

1
serial_settings SerialSettings | None

Optional framing override. None (default) adopts profile.default_serial with port applied — 38400 8-N-1 for the EZ-ZONE PM Std Bus profile, 9600 8-N-1 for the Series SD profile. An explicit :class:SerialSettings still has its port forced to the positional port arg. There is no baud sweeping in the open path (cross-cutting invariant 5).

None
identify bool

When True (default), :meth:Controller.identify runs after the transport opens so :meth:Controller.snapshot renders without further wire I/O. Set False for the fast-path open scenarios where caller code drives identity itself or wants the open to return immediately.

True
assert_wire_temperature_unit Unit | str | None

User-asserted scale of temperature values on the wire. Sets :class:Reading.unit / :class:Sample.unit for temperature parameters. Accepts a :class:Unit or a case-insensitive string alias ("C", "F", "celsius", "degF", "°C", ...). :attr:Unit.PERCENT is rejected. None (the default) means temperature readings carry unit=None. The library does not infer this from parameter 17050 — on at least one PM3 firmware 17050 is a label-only register and would silently mis-tag. Verify the actual scale externally — the bundled watlow-diag probe-unit CLI automates the comparison against a known panel reading; see docs/devices.md §Units — before asserting it here.

None

Returns:

Type Description
Controller

An opened :class:Controller whose transport is ready for

Controller

meth:Controller.poll / :meth:Controller.poll_many calls.

Controller

Every protocol (STDBUS, MODBUS_RTU, AUTO) returns

Controller

an opened controller; __aenter__ is a no-op and

Controller

__aexit__ closes the transport.

Raises:

Type Description
WatlowConfigurationError

address is out of range or protocol is unsupported.

WatlowValidationError

assert_wire_temperature_unit is :attr:Unit.PERCENT or an unrecognised alias.

WatlowProtocolUnsupportedError

protocol=AUTO and both probes failed.

Source code in src/watlowlib/devices/factory.py
async def open_device(
    port: str,
    *,
    profile: DeviceProfile = EZZONE_PROFILE,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    assert_wire_temperature_unit: Unit | str | None = None,
    identify: bool = True,
) -> Controller:
    """Open a controller on a serial port.

    Args:
        port: Serial-port path (``/dev/ttyUSB0``, ``COM3``, ...).
        profile: The device profile to open against. Defaults to
            :data:`~watlowlib.devices.profile.EZZONE_PROFILE` (EZ-ZONE
            PM), which preserves all historical behaviour. Pass
            :data:`~watlowlib.devices.profile.SERIES_SD_PROFILE` for a
            Series SD. The profile supplies the default protocol,
            factory serial framing, parameter registry, identity
            strategy, and (for the SD) the wire temperature unit.
        protocol: Wire protocol. ``None`` (default) adopts
            ``profile.default_protocol`` (Std Bus for PM, Modbus RTU for
            SD). ``STDBUS`` / ``MODBUS_RTU`` open directly; ``AUTO``
            runs the conservative detector (Std Bus → Modbus → fail)
            per ``docs/design.md`` §7.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``. Under ``AUTO`` the same address is
            tried against both probes.
        serial_settings: Optional framing override. ``None`` (default)
            adopts ``profile.default_serial`` with ``port`` applied —
            **38400 8-N-1** for the EZ-ZONE PM Std Bus profile, **9600
            8-N-1** for the Series SD profile. An explicit
            :class:`SerialSettings` still has its ``port`` forced to the
            positional ``port`` arg. There is no baud sweeping in the
            open path (cross-cutting invariant 5).
        identify: When ``True`` (default), :meth:`Controller.identify`
            runs after the transport opens so :meth:`Controller.snapshot`
            renders without further wire I/O. Set ``False`` for the
            fast-path open scenarios where caller code drives identity
            itself or wants the open to return immediately.
        assert_wire_temperature_unit: User-asserted scale of
            temperature values on the wire. Sets
            :class:`Reading.unit` / :class:`Sample.unit` for
            temperature parameters. Accepts a :class:`Unit` or a
            case-insensitive string alias (``"C"``, ``"F"``,
            ``"celsius"``, ``"degF"``, ``"°C"``, ...).
            :attr:`Unit.PERCENT` is rejected. ``None`` (the default)
            means temperature readings carry ``unit=None``. The
            library does **not** infer this from parameter 17050 —
            on at least one PM3 firmware 17050 is a label-only
            register and would silently mis-tag. Verify the actual
            scale externally — the bundled
            ``watlow-diag probe-unit`` CLI automates the comparison
            against a known panel reading; see ``docs/devices.md``
            §Units — before asserting it here.

    Returns:
        An *opened* :class:`Controller` whose transport is ready for
        :meth:`Controller.poll` / :meth:`Controller.poll_many` calls.
        Every protocol (``STDBUS``, ``MODBUS_RTU``, ``AUTO``) returns
        an opened controller; ``__aenter__`` is a no-op and
        ``__aexit__`` closes the transport.

    Raises:
        WatlowConfigurationError: ``address`` is out of range or
            ``protocol`` is unsupported.
        WatlowValidationError: ``assert_wire_temperature_unit`` is
            :attr:`Unit.PERCENT` or an unrecognised alias.
        WatlowProtocolUnsupportedError: ``protocol=AUTO`` and both
            probes failed.
    """
    # ``protocol=None`` adopts the profile's factory protocol (Std Bus
    # for EZ-ZONE PM, Modbus RTU for Series SD).
    resolved_protocol = protocol if protocol is not None else profile.default_protocol
    if resolved_protocol not in (
        ProtocolKind.STDBUS,
        ProtocolKind.MODBUS_RTU,
        ProtocolKind.AUTO,
    ):
        raise WatlowConfigurationError(
            f"unsupported protocol kind: {resolved_protocol!r}",
            context=ErrorContext(port=port),
        )

    # ``assert_wire_temperature_unit`` overrides the profile default
    # (the SD knows it speaks °F; the PM contract is "user must assert").
    wire_unit = (
        profile.wire_temperature_unit
        if assert_wire_temperature_unit is None
        else coerce_wire_temperature_unit(assert_wire_temperature_unit)
    )

    # ``serial_settings=None`` adopts the profile's factory framing
    # (port applied from the positional arg). An explicit override still
    # has ``port`` forced to the positional arg to avoid silent surprise.
    if serial_settings is None:
        settings = replace(profile.default_serial, port=port)
    elif serial_settings.port != port:
        settings = replace(serial_settings, port=port)
    else:
        settings = serial_settings

    if resolved_protocol is ProtocolKind.AUTO:
        # Lazy import — keep the Std-Bus-only callers off the anymodbus
        # dep graph until they actually opt in to AUTO.
        from watlowlib.protocol.detect import detect_protocol  # noqa: PLC0415

        resolved = await detect_protocol(
            port,
            address=address,
            serial_settings=settings,
        )
        # Detector returned an *open* transport already paired with the
        # right client; build the controller around them and skip
        # ``Controller.__aenter__``'s open() (it short-circuits when
        # ``transport.is_open`` is already True).
        session = Session(
            resolved.client,
            profile=profile,
            address=address,
            port=resolved.transport.label,
            wire_temperature_unit=wire_unit,
        )
        controller = Controller(session, resolved.transport, serial_settings=settings)
        if identify:
            await controller.identify()
        return controller

    transport: Transport
    if resolved_protocol is ProtocolKind.MODBUS_RTU:
        # Lazy import — keep the Std-Bus path off the anymodbus dep
        # graph for users who never reach for Modbus.
        from watlowlib.protocol.modbus.transport import (  # noqa: PLC0415
            ModbusBusTransport,
        )

        transport = ModbusBusTransport(settings)
    else:
        transport = SerialTransport(settings)
    controller = await _open_controller(
        transport,
        profile=profile,
        protocol=resolved_protocol,
        address=address,
        serial_settings=settings,
        wire_temperature_unit=wire_unit,
    )
    if identify:
        await controller.identify()
    return controller

pipe async

pipe(stream, sink, *, batch_size=64, flush_interval=1.0)

Drain stream into sink with buffered flushes.

Reads per-tick batches from the recorder and accumulates the individual :class:Sample\ s into a list. A flush happens when either threshold is first crossed:

  • the buffer reaches batch_size samples, or
  • flush_interval seconds have elapsed since the last flush.

The time-based check fires on every incoming batch, so the actual inter-flush latency is bounded below by the recorder's tick period: effective_flush_period ≈ max(flush_interval, 1 / rate_hz). For low-rate acquisitions (rate_hz < 1 / flush_interval) the recorder cadence dominates; for high-rate acquisitions the configured flush_interval dominates. Either way, on stream exhaustion any leftover buffer is flushed before the summary is returned.

The samples_late / max_drift_ms fields on the returned summary stay at zero — those are recorder-layer concepts. The recorder emits its own summary on CM exit; this summary is the sink-side view.

Parameters:

Name Type Description Default
stream AsyncIterator[Sequence[Sample]]

The async iterator yielded by :func:~watlowlib.streaming.record.

required
sink SampleSink

Any :class:SampleSink. Must already be open.

required
batch_size int

Buffer threshold in samples (not batches).

64
flush_interval float

Time threshold in seconds between flushes.

1.0

Returns:

Name Type Description
An AcquisitionSummary

class:AcquisitionSummary with samples_emitted set to

AcquisitionSummary

the count actually handed to the sink.

Raises:

Type Description
ValueError

batch_size < 1 or flush_interval <= 0.

Source code in src/watlowlib/sinks/base.py
async def pipe(
    stream: AsyncIterator[Sequence[Sample]],
    sink: SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    r"""Drain ``stream`` into ``sink`` with buffered flushes.

    Reads per-tick batches from the recorder and accumulates the
    individual :class:`Sample`\ s into a list. A flush happens when
    either threshold is first crossed:

    - the buffer reaches ``batch_size`` samples, or
    - ``flush_interval`` seconds have elapsed since the last flush.

    The time-based check fires on every incoming batch, so the actual
    inter-flush latency is bounded below by the recorder's tick
    period: ``effective_flush_period ≈ max(flush_interval,
    1 / rate_hz)``. For low-rate acquisitions (rate_hz < 1 / flush_interval)
    the recorder cadence dominates; for high-rate acquisitions the
    configured ``flush_interval`` dominates. Either way, on stream
    exhaustion any leftover buffer is flushed before the summary is
    returned.

    The ``samples_late`` / ``max_drift_ms`` fields on the returned
    summary stay at zero — those are recorder-layer concepts. The
    recorder emits its own summary on CM exit; this summary is the
    sink-side view.

    Args:
        stream: The async iterator yielded by
            :func:`~watlowlib.streaming.record`.
        sink: Any :class:`SampleSink`. Must already be open.
        batch_size: Buffer threshold in samples (not batches).
        flush_interval: Time threshold in seconds between flushes.

    Returns:
        An :class:`AcquisitionSummary` with ``samples_emitted`` set to
        the count actually handed to the sink.

    Raises:
        ValueError: ``batch_size < 1`` or ``flush_interval <= 0``.
    """
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = anyio.current_time()

    async def _flush() -> None:
        nonlocal emitted
        if not buffer:
            return
        await sink.write_many(buffer)
        emitted += len(buffer)
        buffer.clear()

    async for batch in stream:
        buffer.extend(batch)
        now = anyio.current_time()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            await _flush()
            last_flush = now

    await _flush()
    finished_at = datetime.now(UTC)
    _logger.info(
        "sinks.pipe_done sink=%s samples_emitted=%s duration_s=%.3f",
        type(sink).__name__,
        emitted,
        (finished_at - started_at).total_seconds(),
    )
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
    )

record async

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    auto_reconnect=False,
    reconnect_factory=None,
)

Record polled samples into a receive stream at an absolute cadence.

Usage::

async with record(
    controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
) as recording:
    async for batch in recording.stream:
        for sample in batch:
            print(sample.parameter, sample.value)
    # recording.summary is live; recording.summary.finished_at is None
    # while running and set on CM exit.

The CM yields a :class:Recording[Sequence[Sample]] exposing .stream (async iterator of per-tick :class:Sample batches), .summary (live :class:AcquisitionSummary — recorder is sole writer), and .rate_hz (the cadence the recorder is running at).

Each batch is a flat :class:Sequence — one entry per (device, parameter, instance) read that succeeded. Failed reads are dropped by the source and logged at WARN.

Parameters:

Name Type Description Default
source PollSource

Any :class:PollSource (a :class:Controller or a :class:WatlowManager).

required
parameters Sequence[str | int]

Parameter names or registry IDs to poll each tick.

required
rate_hz float

Target cadence. Absolute targets are computed target[n] = start + n * (1 / rate_hz). Must be > 0.

required
duration float | None

Total acquisition duration in seconds. None means "until the caller exits the CM".

None
names Sequence[str] | None

Subset of device names to poll per tick. None polls everything the source manages. Ignored for solo controllers.

None
instances Sequence[int]

1-indexed loop / channel numbers per device. Single- loop devices use (1,).

(1,)
overflow OverflowPolicy

Backpressure policy when the receive-stream buffer is full. See :class:OverflowPolicy.

BLOCK
buffer_size int

Receive-stream capacity, in per-tick batches.

64
auto_reconnect bool

When True, treat :class:WatlowConnectionError raised by source.poll_many as a transient transport drop rather than a fatal error. The producer logs recorder.disconnected, waits per the backoff schedule, and either rebuilds the source via reconnect_factory (if supplied) or simply retries the same source.poll_many on the next tick. samples_late ticks up for each tick missed during the gap.

False
reconnect_factory Callable[[], Awaitable[PollSource]] | None

When supplied alongside auto_reconnect, invoked to rebuild the :class:PollSource after a disconnect. Useful when the source's transport needs to be re-opened explicitly (e.g. a fresh :func:watlowlib.open_device call). The returned source replaces source for subsequent ticks. Without a factory, the recorder relies on source.poll_many itself to recover (which works for callers that wrap their own transport-reopen logic inside poll_many).

None

Yields:

Name Type Description
A AsyncGenerator[Recording[Sequence[Sample]]]

class:Recording[Sequence[Sample]] exposing .stream,

AsyncGenerator[Recording[Sequence[Sample]]]

.summary, and .rate_hz.

Raises:

Type Description
ValueError

rate_hz <= 0, duration <= 0, or buffer_size < 1.

Source code in src/watlowlib/streaming/recorder.py
@asynccontextmanager
async def record(
    source: PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    auto_reconnect: bool = False,
    reconnect_factory: Callable[[], Awaitable[PollSource]] | None = None,
) -> AsyncGenerator[Recording[Sequence[Sample]]]:
    """Record polled samples into a receive stream at an absolute cadence.

    Usage::

        async with record(
            controller, parameters=["process_value", "setpoint"], rate_hz=2, duration=10
        ) as recording:
            async for batch in recording.stream:
                for sample in batch:
                    print(sample.parameter, sample.value)
            # recording.summary is live; recording.summary.finished_at is None
            # while running and set on CM exit.

    The CM yields a :class:`Recording[Sequence[Sample]]` exposing
    ``.stream`` (async iterator of per-tick :class:`Sample` batches),
    ``.summary`` (live :class:`AcquisitionSummary` — recorder is sole
    writer), and ``.rate_hz`` (the cadence the recorder is running
    at).

    Each batch is a flat :class:`Sequence` — one entry per (device,
    parameter, instance) read that succeeded. Failed reads are dropped
    by the source and logged at WARN.

    Args:
        source: Any :class:`PollSource` (a :class:`Controller` or a
            :class:`WatlowManager`).
        parameters: Parameter names or registry IDs to poll each tick.
        rate_hz: Target cadence. Absolute targets are computed
            ``target[n] = start + n * (1 / rate_hz)``. Must be > 0.
        duration: Total acquisition duration in seconds. ``None``
            means "until the caller exits the CM".
        names: Subset of device names to poll per tick. ``None`` polls
            everything the source manages. Ignored for solo controllers.
        instances: 1-indexed loop / channel numbers per device. Single-
            loop devices use ``(1,)``.
        overflow: Backpressure policy when the receive-stream buffer
            is full. See :class:`OverflowPolicy`.
        buffer_size: Receive-stream capacity, in per-tick batches.
        auto_reconnect: When ``True``, treat
            :class:`WatlowConnectionError` raised by ``source.poll_many``
            as a transient transport drop rather than a fatal error.
            The producer logs ``recorder.disconnected``, waits per the
            backoff schedule, and either rebuilds the source via
            ``reconnect_factory`` (if supplied) or simply retries the
            same ``source.poll_many`` on the next tick. ``samples_late``
            ticks up for each tick missed during the gap.
        reconnect_factory: When supplied alongside ``auto_reconnect``,
            invoked to rebuild the :class:`PollSource` after a
            disconnect. Useful when the source's transport needs to be
            re-opened explicitly (e.g. a fresh
            :func:`watlowlib.open_device` call). The returned source
            replaces ``source`` for subsequent ticks. Without a
            factory, the recorder relies on ``source.poll_many`` itself to
            recover (which works for callers that wrap their own
            transport-reopen logic inside ``poll_many``).

    Yields:
        A :class:`Recording[Sequence[Sample]]` exposing ``.stream``,
        ``.summary``, and ``.rate_hz``.

    Raises:
        ValueError: ``rate_hz <= 0``, ``duration <= 0``, or
            ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if duration is not None and duration <= 0:
        raise ValueError(f"duration must be > 0 or None, got {duration!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    if not parameters:
        raise ValueError("parameters must be a non-empty sequence")

    period = 1.0 / rate_hz
    total_ticks = None if duration is None else max(1, round(duration * rate_hz))

    send_stream, receive_stream = anyio.create_memory_object_stream[Sequence[Sample]](
        max_buffer_size=buffer_size,
    )
    # Producer-side clone of the receive stream — used to evict the
    # oldest queued batch under DROP_OLDEST. Cloning here (before the
    # consumer starts iterating) keeps the eviction path off the
    # consumer's iterator and avoids racing with it.
    drop_rx = receive_stream.clone()

    started_at = datetime.now(UTC)
    summary = AcquisitionSummary(started_at=started_at)
    tick_durations_ms: list[float] = []
    _logger.info(
        "recorder.start rate_hz=%s duration_s=%s overflow=%s buffer_size=%s names=%s",
        rate_hz,
        duration,
        overflow.value,
        buffer_size,
        list(names) if names is not None else None,
    )

    async with anyio.create_task_group() as tg, receive_stream:

        async def _producer_entrypoint() -> None:
            await _run_producer(
                source,
                send_stream,
                drop_rx,
                tuple(parameters),
                tuple(instances),
                names,
                period,
                total_ticks,
                overflow,
                summary,
                tick_durations_ms,
                auto_reconnect=auto_reconnect,
                reconnect_factory=reconnect_factory,
            )

        # ``start_soon`` returns a TaskHandle (anyio >= 4.14); the producer is
        # fire-and-forget and cancelled via the group below, so discard it.
        _ = tg.start_soon(_producer_entrypoint)
        try:
            yield Recording(stream=receive_stream, summary=summary, rate_hz=rate_hz)
        finally:
            # Cancel + drain before the CM returns — producer lifetime
            # is strictly nested inside the ``async with``.
            tg.cancel()

    finished_at = datetime.now(UTC)
    p50, p99 = _tick_percentiles(tick_durations_ms)
    summary.finished_at = finished_at
    summary.tick_duration_ms_p50 = p50
    summary.tick_duration_ms_p99 = p99
    _logger.info(
        "recorder.stop emitted=%s late=%s max_drift_ms=%.3f "
        "tick_p50_ms=%.3f tick_p99_ms=%.3f duration_s=%.3f",
        summary.samples_emitted,
        summary.samples_late,
        summary.max_drift_ms,
        summary.tick_duration_ms_p50,
        summary.tick_duration_ms_p99,
        (finished_at - started_at).total_seconds(),
    )

sample_to_row

sample_to_row(sample)

Flatten a :class:Sample into a single row dict for tabular sinks.

Long-format schema (one row per parameter read), stable across all in-tree sinks:

  • device — manager-assigned name (or controller transport label).
  • address — bus address.
  • protocol — wire protocol that produced the read (string).
  • parameter — canonical parameter name.
  • parameter_id — registry parameter id.
  • instance — 1-indexed loop / channel selector.
  • value — decoded value, coerced to a sink-friendly scalar (bools become "true" / "false" strings so SQLite type inference doesn't pin the column to INTEGER for the run).
  • unit — :class:Unit's .value ("C" / "F" / "%") for Watlow rows, a free-form string for cross-vendor rows ("psia", "sccm", ...), or None when the parameter has no unit.
  • t_mono_ns — monotonic-ns canonical join key.
  • t_utc — wall-clock acquisition midpoint, ISO 8601 string.
  • requested_at / received_at — ISO 8601 strings.
  • latency_s — poll round-trip in seconds.

The sample's raw payload is intentionally not in the row: bytes don't fit cleanly into CSV / JSONL / SQLite affinities, and tabular sinks are for time-series queries, not byte-level diagnostics. Callers that need raw consume :class:Sample directly via :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sinks/base.py
def sample_to_row(sample: Sample) -> dict[str, float | int | str | bool | None]:
    """Flatten a :class:`Sample` into a single row dict for tabular sinks.

    Long-format schema (one row per parameter read), stable across all
    in-tree sinks:

    - ``device`` — manager-assigned name (or controller transport label).
    - ``address`` — bus address.
    - ``protocol`` — wire protocol that produced the read (string).
    - ``parameter`` — canonical parameter name.
    - ``parameter_id`` — registry parameter id.
    - ``instance`` — 1-indexed loop / channel selector.
    - ``value`` — decoded value, coerced to a sink-friendly scalar
      (bools become ``"true"`` / ``"false"`` strings so SQLite type
      inference doesn't pin the column to INTEGER for the run).
    - ``unit`` — :class:`Unit`'s ``.value`` (``"C"`` / ``"F"`` / ``"%"``)
      for Watlow rows, a free-form string for cross-vendor rows
      (``"psia"``, ``"sccm"``, ...), or ``None`` when the parameter has
      no unit.
    - ``t_mono_ns`` — monotonic-ns canonical join key.
    - ``t_utc`` — wall-clock acquisition midpoint, ISO 8601 string.
    - ``requested_at`` / ``received_at`` — ISO 8601 strings.
    - ``latency_s`` — poll round-trip in seconds.

    The sample's ``raw`` payload is intentionally **not** in the row:
    bytes don't fit cleanly into CSV / JSONL / SQLite affinities, and
    tabular sinks are for time-series queries, not byte-level
    diagnostics. Callers that need ``raw`` consume :class:`Sample`
    directly via :class:`~watlowlib.sinks.memory.InMemorySink`.
    """
    raw_value = sample.value
    coerced: float | int | str | None
    if isinstance(raw_value, bool):
        # Coerce before the int-isinstance check below; bool is an int.
        coerced = "true" if raw_value else "false"
    elif isinstance(raw_value, int | float | str):
        coerced = raw_value
    else:
        # raw_value is None — Sample.value's type rules out anything else.
        coerced = None

    unit = sample.unit.value if isinstance(sample.unit, Unit) else sample.unit
    return {
        "device": sample.device,
        "address": sample.address,
        "protocol": sample.protocol.value,
        "parameter": sample.parameter,
        "parameter_id": sample.parameter_id,
        "instance": sample.instance,
        "value": coerced,
        "unit": unit,
        "t_mono_ns": sample.t_mono_ns,
        "t_utc": sample.t_utc.isoformat(),
        "requested_at": sample.requested_at.isoformat(),
        "received_at": sample.received_at.isoformat(),
        "latency_s": sample.latency_s,
    }