Skip to content

watlowlib.devices

The Controller facade, Session, typed dataclasses (Reading, DeviceInfo, PartNumber, AlarmState, LoopState, DiscoveryResult, DeviceSnapshot, WatlowDeviceSnapshot, …), ControllerFamily, Capability, SafetyTier, Availability, open_device, and find_devices. See Controllers.

Public surface

watlowlib.devices

Device facade — :class:Controller, :class:Session, and dataclasses.

The facade is the public surface; everything else (:mod:watlowlib.protocol, :mod:watlowlib.commands, :mod:watlowlib.registry) is implementation detail callers don't have to import.

AlarmState dataclass

AlarmState(loop, high, low, silenced, raw_bits)

Decoded alarm bits for one loop.

Availability

Bases: StrEnum

Per-command session state.

Sticky for the session: once a command transitions to :attr:UNSUPPORTED, the session short-circuits subsequent invocations with a typed error pre-I/O. The transition table lives in docs/design.md §5b.

Capability

Bases: Flag

Coarse hardware capability bits.

Bits are derived from a decoded part number when one is available (see :func:watlowlib.registry.families.capabilities_for_part_number) and fall back to a per-family prior otherwise. The session widens the set at runtime when a command succeeds against a parameter that proves the capability.

The vocabulary is small on purpose — most Watlow gating is by :class:watlowlib.registry.families.ControllerFamily and by :attr:watlowlib.registry.parameters.ParameterSpec.parameter_id, not by per-feature bits. New bits are added when captured family behaviour requires them.

Controller

Controller(session, transport, *, serial_settings)

Async facade for a single Watlow controller.

Source code in src/watlowlib/devices/controller.py
def __init__(
    self,
    session: Session,
    transport: Transport,
    *,
    serial_settings: SerialSettings,
) -> None:
    self._session = session
    self._transport = transport
    self._serial_settings = serial_settings
    # Cached loop count populated by :meth:`identify`. ``None`` means
    # "we haven't asked yet" — :meth:`loop` then defers validation
    # to the registry's per-spec ``max_instance``. Concrete count is
    # what the part-number decoder produced from the captured part
    # string (PM3 → 1, PM6/8/9 + ``U`` control → 2, etc.).
    self._loops: int | None = None
    # Cached SKU-derived capabilities populated by :meth:`identify`.
    # ``None`` until the part number has been decoded; downstream
    # operations that gate on bits (cool-side PID, etc.) treat
    # ``None`` as "no information, no gate" so calls work pre-
    # identify without surprising the user.
    self._capabilities: Capability | None = None
    # Full cached :class:`DeviceInfo` populated by :meth:`identify`.
    # Drives :meth:`snapshot` so no wire I/O is needed to render
    # the controller's identity. ``None`` until identify runs.
    self._device_info: DeviceInfo | None = None

capabilities property

capabilities

Cached SKU capabilities (set after :meth:identify).

None pre-identify so capability-gated operations behave permissively until the part number is captured. After :meth:identify, callers can branch on :attr:Capability.HAS_COOLING etc. without re-issuing identify.

loops property

loops

Cached loop count (set after :meth:identify).

None until the device's part number has been decoded; :meth:loop accepts any 1-indexed value while loops is None and falls back to per-spec validation at the first wire call. After :meth:identify, loops reflects the decoded value.

serial_settings property

serial_settings

Serial framing the controller was opened with.

Exposed so an identity strategy (see :mod:watlowlib.devices.profile) can stamp it onto the :class:DeviceInfo it builds.

session property

session

Underlying session used for command dispatch.

close async

close()

Close the underlying transport and dispose the protocol client.

Source code in src/watlowlib/devices/controller.py
async def close(self) -> None:
    """Close the underlying transport and dispose the protocol client."""
    # The session holds a reference to the protocol client; dispose
    # it so any pending caller learns the controller is gone before
    # the transport close races them.
    try:
        self._session.dispose()
    finally:
        await self._transport.close()

identify async

identify(
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Read the identity parameters and return a :class:DeviceInfo.

Reads (in order): part number (1009), hardware id (1001), firmware id (1002), serial number. Missing secondary fields stay None and the result's :attr:DeviceInfo.health is promoted from :attr:DeviceHealth.OK to :attr:DeviceHealth.PARTIAL. If the part-number read itself fails, the result's health is :attr:DeviceHealth.FAILED and capability decoding is skipped (the family prior still applies).

Parameters:

Name Type Description Default
timeout float | None

Per-read timeout override.

None
strict bool

If True, raise the underlying error when the part-number read fails instead of returning a health=FAILED info. Use this in maintenance code paths that need to know the device actually answered before declaring success.

False
query_configured_protocol bool

If True, also read parameter 17009 (Protocol) and populate :attr:DeviceInfo.configured_protocol. Off by default because the read costs an extra round-trip; the maintenance verify pass and the discover CLI opt in.

False

Raises:

Type Description
WatlowError

When strict=True and the part-number read fails. The original transport / protocol error class is preserved.

Source code in src/watlowlib/devices/controller.py
async def identify(
    self,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Read the identity parameters and return a :class:`DeviceInfo`.

    Reads (in order): part number (1009), hardware id (1001),
    firmware id (1002), serial number. Missing secondary fields
    stay ``None`` and the result's :attr:`DeviceInfo.health` is
    promoted from :attr:`DeviceHealth.OK` to
    :attr:`DeviceHealth.PARTIAL`. If the part-number read itself
    fails, the result's health is :attr:`DeviceHealth.FAILED` and
    capability decoding is skipped (the family prior still
    applies).

    Args:
        timeout: Per-read timeout override.
        strict: If ``True``, raise the underlying error when the
            part-number read fails instead of returning a
            ``health=FAILED`` info. Use this in maintenance code
            paths that need to know the device actually answered
            before declaring success.
        query_configured_protocol: If ``True``, also read parameter
            17009 (Protocol) and populate
            :attr:`DeviceInfo.configured_protocol`. Off by default
            because the read costs an extra round-trip; the
            maintenance verify pass and the discover CLI opt in.

    Raises:
        WatlowError: When ``strict=True`` and the part-number read
            fails. The original transport / protocol error class
            is preserved.
    """
    # Device-neutral: the bound profile owns the family-specific
    # identity sequence (EZ-ZONE PM reads 1009/1001/1002/serial;
    # Series SD reads the numeric 10/11/13 + serial 7-8 + reg 18).
    info = await self._session.profile.identify(
        self,
        timeout=timeout,
        strict=strict,
        query_configured_protocol=query_configured_protocol,
    )
    # Cache for ``self.loop(n)``'s eager validator and ``snapshot``.
    # Identify is the canonical place that sets these — open()
    # doesn't have the identity yet, and a device's loop count /
    # capabilities never change mid-session.
    self._loops = info.loops
    self._capabilities = info.capabilities
    self._device_info = info
    return info

loop

loop(n)

Return a sub-facade bound to loop n (1-indexed).

n is validated eagerly when :attr:loops is known, otherwise per-spec max_instance validation kicks in at the first wire call. Multi-loop access is the public way to reach loop 2 on dual-loop devices — :meth:Controller.read_pv defaults to instance=1.

Source code in src/watlowlib/devices/controller.py
def loop(self, n: int) -> ControllerLoop:
    """Return a sub-facade bound to loop ``n`` (1-indexed).

    ``n`` is validated eagerly when :attr:`loops` is known,
    otherwise per-spec ``max_instance`` validation kicks in at the
    first wire call. Multi-loop access is the public way to reach
    loop 2 on dual-loop devices —
    :meth:`Controller.read_pv` defaults to ``instance=1``.
    """
    return ControllerLoop(self, n)

poll async

poll(*, instance=1, timeout=None)

Read the active process value — the canonical no-arg snapshot.

Equivalent to :meth:read_pv. The no-arg form aligns with the ecosystem poll() convention shared by alicatlib.Device, sartoriuslib.Balance, and nidaqlib.DaqSession: a single, default-shaped reading per call.

For multi-parameter polling use :meth:poll_many.

Source code in src/watlowlib/devices/controller.py
async def poll(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active process value — the canonical no-arg snapshot.

    Equivalent to :meth:`read_pv`. The no-arg form aligns with the
    ecosystem ``poll()`` convention shared by ``alicatlib.Device``,
    ``sartoriuslib.Balance``, and ``nidaqlib.DaqSession``: a
    single, default-shaped reading per call.

    For multi-parameter polling use :meth:`poll_many`.
    """
    return await self.read_pv(instance=instance, timeout=timeout)

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Read every (parameter × instance) and return them as :class:Sample\ s.

Satisfies the :class:watlowlib.streaming.PollSource Protocol so a solo :class:Controller can drive :func:watlowlib.streaming.record directly without a manager. names is accepted for Protocol compatibility but ignored — a Controller has only one device.

Failed reads are dropped from the returned list and logged at WARN. The recorder treats absence as "drop this row from the batch" and continues with the next tick.

Source code in src/watlowlib/devices/controller.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    r"""Read every (parameter × instance) and return them as :class:`Sample`\ s.

    Satisfies the :class:`watlowlib.streaming.PollSource` Protocol so
    a solo :class:`Controller` can drive :func:`watlowlib.streaming.record`
    directly without a manager. ``names`` is accepted for Protocol
    compatibility but ignored — a Controller has only one device.

    Failed reads are dropped from the returned list and logged at
    WARN. The recorder treats absence as "drop this row from the
    batch" and continues with the next tick.
    """
    del names  # solo controller has no name-keyed device map
    from watlowlib.streaming._poll import poll_controller  # noqa: PLC0415 — avoid cycle

    return await poll_controller(
        self,
        name=self._transport.label,
        parameters=parameters,
        instances=instances,
    )

read_comms_unit_label async

read_comms_unit_label(*, timeout=None)

Read (and cache) the value parameter 17050 reports.

Inspection / diagnostics helper. Does not drive :class:Reading.unit: on at least one PM3 firmware revision 17050 is a label-only register that changes the enum the device reports for itself but does not affect the scale of temperature values exchanged over comms.

To tell watlowlib what scale temperatures actually travel in over the wire, pass assert_wire_temperature_unit= to :func:watlowlib.open_device. That assertion is what feeds :class:Reading.unit.

Distinct from read_parameter("units"), which targets parameter 3005 (front-panel display). The two can disagree on a real device.

Returns None if the device doesn't report a known code.

Source code in src/watlowlib/devices/controller.py
async def read_comms_unit_label(self, *, timeout: float | None = None) -> Unit | None:
    """Read (and cache) the value parameter 17050 reports.

    Inspection / diagnostics helper. **Does not** drive
    :class:`Reading.unit`: on at least one PM3 firmware revision
    17050 is a label-only register that changes the enum the
    device reports for itself but does not affect the scale of
    temperature values exchanged over comms.

    To tell watlowlib what scale temperatures actually travel in
    over the wire, pass ``assert_wire_temperature_unit=`` to
    :func:`watlowlib.open_device`. That assertion is what feeds
    :class:`Reading.unit`.

    Distinct from ``read_parameter("units")``, which targets
    parameter 3005 (front-panel display). The two can disagree on
    a real device.

    Returns ``None`` if the device doesn't report a known code.
    """
    del timeout  # comms_unit_label() is cached + uses session defaults
    return await self._session.comms_unit_label()

read_parameter async

read_parameter(name_or_id, *, instance=1, timeout=None)

Read any registry parameter.

instance=1 is the default for single-loop devices and the first loop / channel on multi-loop devices.

Source code in src/watlowlib/devices/controller.py
async def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Read any registry parameter.

    ``instance=1`` is the default for single-loop devices and the
    first loop / channel on multi-loop devices.
    """
    return await self._session.execute(
        READ_PARAMETER,
        ReadParameterRequest(name_or_id, instance=instance),
        timeout=timeout,
    )

read_pv async

read_pv(*, instance=1, timeout=None)

Read the process value for instance (loop number, 1-indexed).

Source code in src/watlowlib/devices/controller.py
async def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the process value for ``instance`` (loop number, 1-indexed)."""
    entry = await self.read_parameter("process_value", instance=instance, timeout=timeout)
    return await reading_from_entry(self._session, entry)

read_setpoint async

read_setpoint(*, instance=1, timeout=None)

Read the active setpoint for instance.

Source code in src/watlowlib/devices/controller.py
async def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active setpoint for ``instance``."""
    entry = await self.read_parameter("setpoint", instance=instance, timeout=timeout)
    return await reading_from_entry(self._session, entry)

set_comms_unit_label async

set_comms_unit_label(unit, *, confirm=False, timeout=None)

Set parameter 17050 ("Communications - Display Units").

Accepts a :class:Unit or a case-insensitive string alias ("C" / "F" / "celsius" / "fahrenheit" / "degC" / "degF" / "°C" / "°F"). :attr:Unit.PERCENT is rejected pre-I/O — the register is temperature-only.

Raw enumeration codes (15 / 30) are not accepted here. Callers who want the lower-level path use write_parameter("display_units", 30).

.. warning::

On at least one PM3 firmware revision this register is
**label-only**: writing it changes the enum the device
reports when 17050 is read back, but does not change the
scale of temperature values exchanged over comms. This
setter therefore does **not** affect
:class:`Reading.unit`. To tell watlowlib what scale
temperatures are actually on, pass
``assert_wire_temperature_unit=`` to
:func:`watlowlib.open_device`.

Persistent write (parameter 17050 is RWE); pass confirm=True to acknowledge the EEPROM write. The session raises :class:WatlowConfirmationRequiredError pre-I/O if missing.

Returns the device-echoed label after the write. None if the device's echo decodes outside the known codes.

Source code in src/watlowlib/devices/controller.py
async def set_comms_unit_label(
    self,
    unit: Unit | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Unit | None:
    """Set parameter 17050 ("Communications - Display Units").

    Accepts a :class:`Unit` or a case-insensitive string alias
    (``"C"`` / ``"F"`` / ``"celsius"`` / ``"fahrenheit"`` /
    ``"degC"`` / ``"degF"`` / ``"°C"`` / ``"°F"``).
    :attr:`Unit.PERCENT` is rejected pre-I/O — the register is
    temperature-only.

    Raw enumeration codes (15 / 30) are not accepted here. Callers
    who want the lower-level path use
    ``write_parameter("display_units", 30)``.

    .. warning::

        On at least one PM3 firmware revision this register is
        **label-only**: writing it changes the enum the device
        reports when 17050 is read back, but does not change the
        scale of temperature values exchanged over comms. This
        setter therefore does **not** affect
        :class:`Reading.unit`. To tell watlowlib what scale
        temperatures are actually on, pass
        ``assert_wire_temperature_unit=`` to
        :func:`watlowlib.open_device`.

    Persistent write (parameter 17050 is RWE); pass ``confirm=True``
    to acknowledge the EEPROM write. The session raises
    :class:`WatlowConfirmationRequiredError` pre-I/O if missing.

    Returns the device-echoed label after the write. ``None`` if
    the device's echo decodes outside the known codes.
    """
    resolved = coerce_unit(unit)
    code = display_code_for_unit(resolved)
    if code is None:
        raise WatlowValidationError(
            "set_comms_unit_label accepts CELSIUS / FAHRENHEIT only; "
            "PERCENT is not a valid display-unit code",
        )
    # PERSISTENT write — session enforces ``confirm=True`` pre-I/O.
    await self.write_parameter(
        "display_units",
        code,
        confirm=confirm,
        timeout=timeout,
    )
    self._session.invalidate_comms_unit_label()
    return await self._session.comms_unit_label()

set_persistent_writes async

set_persistent_writes(
    enabled, *, confirm=False, timeout=None
)

Toggle whether subsequent writes persist to non-volatile memory.

Series-SD-specific. The SD persists every register write to EEPROM by default, so a high-rate writer (ramping setpoints, a tuning loop) can wear the EEPROM out and brick the controller. Writing 0 to register 17 keeps subsequent writes in RAM only; the device resets register 17 to 1 on every power cycle, so call set_persistent_writes(False) once after each power-up before a burst of writes (see sd_manual.txt p.84).

Parameters:

Name Type Description Default
enabled bool

True → persist writes to EEPROM (the power-on default); False → keep writes in RAM only (lost on power cycle, but spares the EEPROM).

required
confirm bool

The write itself is gated like any other parameter write — pass confirm=True to acknowledge it.

False
timeout float | None

Per-write timeout override.

None

Raises:

Type Description
WatlowConfirmationRequiredError

confirm is False.

WatlowValidationError

the bound profile's registry has no eeprom_write_enable parameter (e.g. an EZ-ZONE PM, which has no such register).

Source code in src/watlowlib/devices/controller.py
async def set_persistent_writes(
    self,
    enabled: bool,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    """Toggle whether subsequent writes persist to non-volatile memory.

    Series-SD-specific. The SD persists every register write to
    EEPROM by default, so a high-rate writer (ramping setpoints, a
    tuning loop) can wear the EEPROM out and brick the controller.
    Writing ``0`` to register 17 keeps subsequent writes in RAM
    only; the device resets register 17 to ``1`` on every power
    cycle, so call ``set_persistent_writes(False)`` once after each
    power-up before a burst of writes (see ``sd_manual.txt`` p.84).

    Args:
        enabled: ``True`` → persist writes to EEPROM (the power-on
            default); ``False`` → keep writes in RAM only (lost on
            power cycle, but spares the EEPROM).
        confirm: The write itself is gated like any other parameter
            write — pass ``confirm=True`` to acknowledge it.
        timeout: Per-write timeout override.

    Raises:
        WatlowConfirmationRequiredError: ``confirm`` is ``False``.
        WatlowValidationError: the bound profile's registry has no
            ``eeprom_write_enable`` parameter (e.g. an EZ-ZONE PM,
            which has no such register).
    """
    await self.write_parameter(
        "eeprom_write_enable",
        1 if enabled else 0,
        confirm=confirm,
        timeout=timeout,
    )

set_setpoint async

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Write the setpoint and return the device-echoed value as a :class:Reading.

Setpoint is RWES — pass confirm=True to acknowledge the EEPROM write. The returned reading is the device's echo of the value it accepted.

Source code in src/watlowlib/devices/controller.py
async def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write the setpoint and return the device-echoed value as a :class:`Reading`.

    Setpoint is RWES — pass ``confirm=True`` to acknowledge the
    EEPROM write. The returned reading is the device's echo of
    the value it accepted.
    """
    entry = await self.write_parameter(
        "setpoint",
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )
    return await reading_from_entry(self._session, entry)

snapshot async

snapshot(*, name=None)

Return an I/O-free :class:WatlowDeviceSnapshot.

Built from cached identity (populated by :meth:identify, which :func:watlowlib.open_device calls by default) plus the session's last error and per-command availability cache. Does not issue any reads — safe to call from monitoring loops at high cadence.

Parameters:

Name Type Description Default
name str | None

Override the snapshot's name field. Defaults to the controller's transport label, matching the manager-assigned name surfaced into emitted samples.

None
Source code in src/watlowlib/devices/controller.py
async def snapshot(self, *, name: str | None = None) -> WatlowDeviceSnapshot:
    """Return an I/O-free :class:`WatlowDeviceSnapshot`.

    Built from cached identity (populated by :meth:`identify`,
    which :func:`watlowlib.open_device` calls by default) plus
    the session's last error and per-command availability cache.
    Does **not** issue any reads — safe to call from monitoring
    loops at high cadence.

    Args:
        name: Override the snapshot's ``name`` field. Defaults to
            the controller's transport label, matching the
            manager-assigned name surfaced into emitted samples.
    """
    info = self._device_info
    model = info.part_number.raw if info is not None else None
    firmware = (
        str(info.firmware_id) if info is not None and info.firmware_id is not None else None
    )
    serial = info.serial_number if info is not None else None
    # Snapshot is built from cached state; availability_summary is
    # a frozen view of the session's UNSUPPORTED-marked commands.
    availability = {
        key: state
        for key, state in self._session.availability_summary().items()
        if state.name == "UNSUPPORTED"
    }
    return WatlowDeviceSnapshot(
        name=name if name is not None else self._transport.label,
        model=model,
        firmware=firmware,
        serial=serial,
        connected=self._transport.is_open,
        last_error=self._session.last_error,
        recoverable_error_count=self._session.recoverable_error_count,
        captured_at=datetime.now(UTC),
        family=info.family if info is not None else None,
        capabilities=self._capabilities if self._capabilities is not None else Capability.NONE,
        availability_summary=availability,
    )

write_parameter async

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Write any registry parameter.

Persistent (RWE / RWES) writes require confirm=True; the session raises :class:WatlowConfirmationRequiredError before any I/O if the gate is missing.

Source code in src/watlowlib/devices/controller.py
async def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Write any registry parameter.

    Persistent (RWE / RWES) writes require ``confirm=True``;
    the session raises :class:`WatlowConfirmationRequiredError`
    before any I/O if the gate is missing.
    """
    return await self._session.execute(
        WRITE_PARAMETER,
        WriteParameterRequest(name_or_id, value, instance=instance),
        confirm=confirm,
        timeout=timeout,
    )

ControllerFamily

Bases: StrEnum

Watlow controller family discriminator.

Membership here is advisory — :class:watlowlib.devices.session.Session treats family hints as priors, not gates. See docs/design.md §5b.

ControllerLoop

ControllerLoop(controller, loop_number)

A view over one control loop on a :class:Controller.

Construct via :meth:Controller.loop; never instantiated directly by user code. The sub-facade lives only as long as the parent controller's session — closing the controller is the only cleanup needed.

Source code in src/watlowlib/devices/loop.py
def __init__(self, controller: Controller, loop_number: int) -> None:
    if loop_number < 1:
        raise WatlowValidationError(
            f"loop number must be 1-indexed and >= 1; got {loop_number}",
        )
    # If the controller has identified the device, validate
    # eagerly. Otherwise defer to the registry's per-spec
    # ``validate_instance`` at first call: a registered parameter
    # with ``max_instance=1`` will raise a clear
    # ``WatlowValidationError`` when ``loop(2).read_pv()`` is
    # invoked. That keeps ``Controller.loop(2)`` cheap when called
    # before identify, but still fails before I/O.
    loops = controller.loops
    if loops is not None and loop_number > loops:
        raise WatlowValidationError(
            f"loop {loop_number} out of range for this device (1..{loops})",
        )
    self._controller = controller
    self._loop = loop_number

number property

number

The 1-indexed loop number this view binds.

read_alarms async

read_alarms()

Read the alarm word for this loop.

Currently raises :class:watlowlib.errors.WatlowProtocolUnsupportedError — see :func:watlowlib.commands.alarms.read_alarms for why the decoder is not yet wired up.

Source code in src/watlowlib/devices/loop.py
async def read_alarms(self) -> AlarmState:
    """Read the alarm word for this loop.

    Currently raises :class:`watlowlib.errors.WatlowProtocolUnsupportedError` —
    see :func:`watlowlib.commands.alarms.read_alarms` for why the
    decoder is not yet wired up.
    """
    return await _read_alarms(self._controller.session, instance=self._loop)

read_output async

read_output()

Read this loop's working output (output_power).

Source code in src/watlowlib/devices/loop.py
async def read_output(self) -> Reading:
    """Read this loop's working output (``output_power``)."""
    return await _read_output(self._controller.session, instance=self._loop)

read_pid async

read_pid()

Read every PID gain for this loop. Missing gains return None.

Cool-side gains (cool_proportional_band, dead_band) are skipped when the controller's identified capabilities lack :attr:Capability.HAS_COOLING (e.g. PM output_2 == 'A'). Pre-identify, the gate is permissive.

Source code in src/watlowlib/devices/loop.py
async def read_pid(self) -> PidGains:
    """Read every PID gain for this loop. Missing gains return ``None``.

    Cool-side gains (``cool_proportional_band``, ``dead_band``)
    are skipped when the controller's identified capabilities
    lack :attr:`Capability.HAS_COOLING` (e.g. PM ``output_2 ==
    'A'``). Pre-identify, the gate is permissive.
    """
    return await _read_pid(
        self._controller.session,
        instance=self._loop,
        capabilities=self._controller.capabilities,
    )

read_pv async

read_pv(*, timeout=None)

Read this loop's process value.

Source code in src/watlowlib/devices/loop.py
async def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's process value."""
    return await self._controller.read_pv(instance=self._loop, timeout=timeout)

read_setpoint async

read_setpoint(*, timeout=None)

Read this loop's active setpoint.

Source code in src/watlowlib/devices/loop.py
async def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's active setpoint."""
    return await self._controller.read_setpoint(instance=self._loop, timeout=timeout)

set_setpoint async

set_setpoint(value, *, confirm=False, timeout=None)

Write this loop's setpoint (RWES → confirm=True required).

Source code in src/watlowlib/devices/loop.py
async def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write this loop's setpoint (RWES → ``confirm=True`` required)."""
    return await self._controller.set_setpoint(
        value,
        instance=self._loop,
        confirm=confirm,
        timeout=timeout,
    )

write_pid async

write_pid(gains, *, confirm=False)

Write the supplied gains for this loop.

Persistent — passing confirm=True is required. Fields left None on gains skip the wire entirely. Setting a cool-side field on a controller without :attr:Capability.HAS_COOLING raises :class:watlowlib.errors.WatlowConfigurationError.

Source code in src/watlowlib/devices/loop.py
async def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Write the supplied gains for this loop.

    Persistent — passing ``confirm=True`` is required. Fields
    left ``None`` on ``gains`` skip the wire entirely. Setting a
    cool-side field on a controller without
    :attr:`Capability.HAS_COOLING` raises
    :class:`watlowlib.errors.WatlowConfigurationError`.
    """
    return await _write_pid(
        self._controller.session,
        gains,
        instance=self._loop,
        confirm=confirm,
        capabilities=self._controller.capabilities,
    )

DeviceInfo dataclass

DeviceInfo(
    part_number,
    hardware_id,
    firmware_id,
    serial_number,
    family,
    protocol,
    address,
    capabilities,
    serial_settings,
    loops,
    health=DeviceHealth.OK,
    configured_protocol=None,
)

Identity + connection metadata for an open controller.

Returned by :meth:Controller.identify. Capabilities are decoded from the part number when one is captured (see :func:watlowlib.registry.families.capabilities_for_part_number) and OR-ed with the family prior; unobserved bits stay zero rather than being guessed.

protocol is the wire protocol the host is currently talking; configured_protocol is what the device's persistent EEPROM parameter (PM 17009) reports. They normally match, but when they diverge the helper :attr:protocol_mismatch flags it — useful for catching SKU/firmware combinations where the user wrote a new protocol but the runtime stack didn't pick it up (e.g. comms position-8 = 'A', no Modbus stack present even though 17009 reads 1057).

protocol_mismatch property

protocol_mismatch

True when EEPROM says one protocol and we're talking another.

Always False when :attr:configured_protocol is None (i.e. identify did not query parameter 17009).

DeviceProfile dataclass

DeviceProfile(
    name,
    family,
    registry,
    default_protocol,
    default_serial,
    identify,
    wire_temperature_unit=None,
)

A first-class controller type.

Attributes:

Name Type Description
name str

Stable short identifier ("ezzone" / "series_sd").

family ControllerFamily

The controller family this profile describes.

registry ParameterRegistry

Parameter registry used to decode this device's parameters.

default_protocol ProtocolKind

Wire protocol opened when the caller does not pass one explicitly.

default_serial SerialSettings

Factory serial framing for default_protocol. Its port is a placeholder — :func:open_device applies the real port via :func:dataclasses.replace.

identify IdentifyStrategy

Strategy that produces a :class:DeviceInfo.

wire_temperature_unit Unit | None

The scale temperatures travel in over the wire, when the profile knows it for certain. None means "do not guess — the user must assert it" (the EZ-ZONE PM contract; some PM firmware misreports its own unit register).

DeviceSnapshot dataclass

DeviceSnapshot(
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
)

Cross-library identity + connection summary (no I/O).

Attributes:

Name Type Description
name str

Caller-supplied device name (manager-assigned, or the transport label for a solo controller).

model str | None

Best-known model / part-number string, None until :meth:Controller.identify has run.

firmware str | None

Firmware id as a string, or None.

serial str | None

Serial-number string, or None.

connected bool

True when the underlying transport is open.

last_error ErrorContext | None

Most recent :class:ErrorContext recorded by the session, or None.

recoverable_error_count int

Session counter for swallowed-and- retried transient errors. Watlow keeps this dormant until a transient transport class is introduced; the field stays at zero today.

captured_at datetime

Wall-clock at snapshot construction (tz-aware UTC).

DiscoveryResult dataclass

DiscoveryResult(
    ok,
    port,
    address,
    baudrate,
    protocol,
    device_info,
    error,
    elapsed_s,
)

One probe attempt's outcome from :func:find_devices.

Cross-library shape (mirrors :mod:alicatlib, :mod:sartoriuslib, :mod:nidaqlib) so GUI Discover dialogs and capa-style adapters can filter responsive vs silent rows on a single attribute and consume the same field set across vendors.

A populated :attr:device_info carries the full :class:Controller.identify result, including :attr:DeviceInfo.health and (when the scan queried it) :attr:DeviceInfo.configured_protocol.

The address field is typed str | int | None to match the cross-library spec; in practice every watlow probe carries an int.

IdentifyStrategy

Bases: Protocol

How a profile turns an open controller into a :class:DeviceInfo.

Implementations are pure with respect to the controller's cached identity — they read parameters and return a :class:DeviceInfo; :meth:Controller.identify is responsible for caching the result.

__call__ async

__call__(
    controller,
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Return identity information for controller.

Source code in src/watlowlib/devices/profile.py
async def __call__(
    self,
    controller: Controller,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Return identity information for ``controller``."""
    ...

LoopState dataclass

LoopState(
    loop, pv, setpoint, output_pct, raw=(lambda: {})()
)

Snapshot of one loop. Composed from several reads.

ParameterEntry dataclass

ParameterEntry(spec, instance, value, raw)

Generic registry-driven read/write result.

Returned by :data:watlowlib.commands.READ_PARAMETER and :data:watlowlib.commands.WRITE_PARAMETER. The :class:Controller translates an entry into a :class:Reading / :class:PartNumber / etc. when the public API guarantees a richer shape.

PartNumber dataclass

PartNumber(raw, family, details=(lambda: {})())

Parsed part-number string returned by read_part_number.

Per-family digit decoding is contributed by :mod:watlowlib.registry.families. Decoded fragments live in :attr:details as a free-form mapping so each family can populate only what its ordering format defines, and so adding fragments to the PM decoder later is non-breaking.

The EZ-ZONE PM decoder populates case size, control type, power input, three output codes, and options string. Other families fall through to a stub: only :attr:family is set, and :attr:details is empty.

Reading dataclass

Reading(
    value, unit, received_at, monotonic_ns, raw, protocol
)

A single timestamped value from the controller.

protocol is set by the variant decoder, not by the facade — it reflects which wire protocol produced the value (per docs/design.md invariant 7).

SafetyTier

Bases: IntEnum

How dangerous a command is to invoke.

  • READ_ONLY (R) — no state change.
  • STATEFUL — runtime state change but not EEPROM-backed. Reserved for commands like "start autotune"; no PM parameter maps here today, but the tier exists so future commands have a place to live.
  • PERSISTENT (RW / RWE / RWES) — EEPROM-backed; requires confirm=True at the facade.

Session

Session(
    client,
    *,
    profile,
    address,
    port,
    wire_temperature_unit=None,
)

Owns availability cache, gates, and the dispatch loop.

A :class:Session is bound to exactly one :class:ProtocolClient for its lifetime — one protocol per port (invariant 1).

Source code in src/watlowlib/devices/session.py
def __init__(
    self,
    client: ProtocolClient[Any, Any],
    *,
    profile: DeviceProfile,
    address: int,
    port: str,
    wire_temperature_unit: Unit | None = None,
) -> None:
    self._client = client
    # The profile is the device-type bundle (family + registry +
    # framing + identity). ``registry`` / ``family`` stay exposed as
    # thin delegating properties so the streaming / manager layers
    # that learned those names keep working.
    self._profile = profile
    self._registry = profile.registry
    self._family = profile.family
    self._address = address
    self._port = port
    self._availability: dict[str, Availability] = {}
    # Scale of temperature values on the wire. An explicit
    # ``wire_temperature_unit`` (from
    # ``open_device(assert_wire_temperature_unit=...)``) wins;
    # otherwise the profile's own ``wire_temperature_unit`` seeds it
    # (the Series SD knows it speaks °F; the EZ-ZONE PM profile
    # leaves it ``None`` so the user must assert). Drives
    # :class:`Reading.unit` / :class:`Sample.unit` for temperature
    # parameters; ``None`` means "do not guess". The library makes
    # **no** attempt to derive this from PM parameter 17050 — on at
    # least one PM3 firmware that register is label-only and does
    # not govern the wire scale. See ``docs/devices.md`` §Units.
    self._wire_temperature_unit: Unit | None = (
        wire_temperature_unit
        if wire_temperature_unit is not None
        else profile.wire_temperature_unit
    )
    self._wire_temperature_unit_warned: bool = False
    # Lazy cache of parameter 17050's reported value. Kept purely
    # as an inspection helper exposed via
    # :meth:`Controller.read_comms_unit_label` — **does not** feed
    # :class:`Reading.unit`. ``_comms_unit_label_loaded`` distinguishes
    # "haven't asked yet" from "asked and got nothing"; subsequent
    # calls don't repeat the wire turn-around after a rejection.
    self._comms_unit_label: Unit | None = None
    self._comms_unit_label_loaded: bool = False
    # Most recent error's :class:`ErrorContext`, captured from any
    # :class:`WatlowError` that propagates out of :meth:`execute`.
    # Drives :class:`DeviceSnapshot.last_error`. Cleared by
    # :meth:`clear_last_error` (manual reset for diagnostics).
    self._last_error: ErrorContext | None = None
    # Count of transient transport hiccups the session swallowed
    # and retried. Watlow has no current transient class (§F of
    # the unified spec — deferred), so this counter stays at zero
    # unless a future transport path increments it. Reset on
    # :meth:`open` / fresh transport.
    self.recoverable_error_count: int = 0

address property

address

Session bus address.

client property

client

The bound protocol client.

Exposed for the watlow-raw escape hatch and for diagnostics that need to issue an unframed wire op outside the registry. Callers must acquire :attr:ProtocolClient.lock before :meth:ProtocolClient.execute to honour the per-port serialization invariant, and must pass this session's :attr:address (or another concrete address for multi-drop diagnostics) to execute.

family property

family

Best-known controller family for this session (delegates to the profile).

last_error property

last_error

Most-recent error context captured by :meth:execute.

port property

port

Transport label (for logs / error context).

profile property

profile

The device profile (family + registry + framing + identity).

protocol_kind property

protocol_kind

The wire protocol this session speaks.

registry property

registry

Parameter registry bound to this session.

Exposed for the streaming layer so polling code can resolve a name / id to a :class:ParameterSpec without an extra import of the module-level :data:PARAMETERS.

availability

availability(command_name)

Cached availability for command_name.

Source code in src/watlowlib/devices/session.py
def availability(self, command_name: str) -> Availability:
    """Cached availability for ``command_name``."""
    return self._availability.get(command_name, Availability.UNKNOWN)

availability_summary

availability_summary()

Return the current command availability cache.

Includes every command the session has dispatched; the snapshot path filters to the UNSUPPORTED entries.

Source code in src/watlowlib/devices/session.py
def availability_summary(self) -> Mapping[str, Availability]:
    """Return the current command availability cache.

    Includes every command the session has dispatched; the
    snapshot path filters to the UNSUPPORTED entries.
    """
    return MappingProxyType(dict(self._availability))

clear_last_error

clear_last_error()

Reset :attr:last_error to None (manual diagnostics aid).

Source code in src/watlowlib/devices/session.py
def clear_last_error(self) -> None:
    """Reset :attr:`last_error` to ``None`` (manual diagnostics aid)."""
    self._last_error = None

comms_unit_label async

comms_unit_label()

Return the value parameter 17050 reports for this session.

Inspection helper, not a source of truth: on at least one PM3 firmware revision 17050 is a label-only register that does not govern the wire scale. :class:Reading.unit is sourced from :meth:wire_temperature_unit instead.

Reads the parameter on first call and caches the result for the lifetime of the session. Returns None when the device rejects the read or reports an unknown code; the cache distinguishes "haven't asked yet" from "asked and got nothing" so a rejection does not cost another wire turn-around.

Invalidated by :meth:invalidate_comms_unit_label (called by :meth:Controller.set_comms_unit_label after a successful write).

Source code in src/watlowlib/devices/session.py
async def comms_unit_label(self) -> Unit | None:
    """Return the value parameter 17050 reports for this session.

    Inspection helper, not a source of truth: on at least one PM3
    firmware revision 17050 is a label-only register that does not
    govern the wire scale. :class:`Reading.unit` is sourced from
    :meth:`wire_temperature_unit` instead.

    Reads the parameter on first call and caches the result for
    the lifetime of the session. Returns ``None`` when the device
    rejects the read or reports an unknown code; the cache
    distinguishes "haven't asked yet" from "asked and got nothing"
    so a rejection does not cost another wire turn-around.

    Invalidated by :meth:`invalidate_comms_unit_label` (called by
    :meth:`Controller.set_comms_unit_label` after a successful
    write).
    """
    if self._comms_unit_label_loaded:
        return self._comms_unit_label
    self._comms_unit_label = await self._fetch_comms_unit_label()
    self._comms_unit_label_loaded = True
    return self._comms_unit_label

dispose

dispose()

Dispose the bound protocol client.

Source code in src/watlowlib/devices/session.py
def dispose(self) -> None:
    """Dispose the bound protocol client."""
    self._client.dispose()

execute async

execute(command, request, *, confirm=False, timeout=None)

Dispatch command with request and return the typed response.

Source code in src/watlowlib/devices/session.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Resp:
    """Dispatch ``command`` with ``request`` and return the typed response."""
    kind = self._client.kind
    # Variant resolution. The session picks the variant matching
    # the bound protocol; one protocol per port (invariant 1).
    # Resolve to a single ``variant`` local so the rest of the
    # method is protocol-agnostic; we still branch on ``kind``
    # for the encode/decode call shapes (stdbus takes ``reply``;
    # modbus takes ``words, ctx, request``).
    if kind is ProtocolKind.STDBUS:
        stdbus_variant = command.stdbus
        if stdbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Std Bus variant",
                context=self._error_context(command, request),
            )
        modbus_variant = None
    elif kind is ProtocolKind.MODBUS_RTU:
        modbus_variant = command.modbus
        if modbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Modbus variant",
                context=self._error_context(command, request),
            )
        stdbus_variant = None
    else:
        raise WatlowProtocolUnsupportedError(
            f"session has unsupported protocol kind {kind!r}",
            context=self._error_context(command, request),
        )

    # Cache key. We key on ``command_name:parameter_id`` for
    # registry-driven commands so that one ``read_parameter("foo")``
    # rejection doesn't sticky-block every other parameter; bare
    # commands fall back to ``command.name``.
    cache_key = self._cache_key(command, request)

    cached = self._availability.get(cache_key, Availability.UNKNOWN)
    if cached is Availability.UNSUPPORTED:
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} is unsupported on this device",
            context=self._error_context(command, request),
        )

    # Safety gate: PERSISTENT writes need explicit confirm.
    if command.safety is SafetyTier.PERSISTENT and not confirm:
        raise WatlowConfirmationRequiredError(
            f"command {command.name!r} is PERSISTENT and requires confirm=True",
            context=self._error_context(command, request),
        )

    ctx = CommandContext(
        registry=self._registry,
        family=self._family,
        address=self._address,
        port=self._port,
    )

    bound_timeout = timeout if timeout is not None else DEFAULTS.io_timeout_s

    # Encode under the variant. Errors here are pre-I/O — typically
    # validation failures — and should propagate untouched.
    # Exactly one of ``stdbus_variant`` / ``modbus_variant`` is
    # non-None per the resolution above; the type narrowing is
    # explicit so neither mypy nor pyright needs an ``assert`` it
    # can't enforce at runtime under ``-O``.
    wire_request: Any
    if stdbus_variant is not None:
        wire_request = stdbus_variant.encode(ctx, request)
    elif modbus_variant is not None:
        wire_request = modbus_variant.encode(ctx, request)
    else:  # pragma: no cover — variant resolution above guarantees one is set
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} variant resolution lost",
            context=self._error_context(command, request),
        )

    started = time.monotonic()
    # Hold the per-port client lock only for the I/O turn-around.
    # Decode is CPU-only and does not need to block the next
    # request waiting on the same RS-485 segment; ``reply`` is
    # snapshotted before the lock releases. ``maybe_acquire``
    # reuses an existing acquisition when the caller (e.g.
    # ``poll_controller``'s tick batch, or
    # ``WatlowManager._run_group``'s port batch) already holds
    # the lock — avoids the FIFO-queue starvation that would
    # otherwise let unrelated writers interleave between a
    # batch's reads.
    async with maybe_acquire(self._client.lock):
        try:
            reply = await self._client.execute(
                wire_request,
                address=self._address,
                timeout=bound_timeout,
                command_name=command.name,
            )
        except (
            WatlowNoSuchObjectError,
            WatlowNoSuchAttributeError,
            WatlowProtocolUnsupportedError,
        ) as exc:
            self._availability[cache_key] = Availability.UNSUPPORTED
            self._record_error(exc)
            _log.warning(
                "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise
        except WatlowProtocolError as exc:
            self._record_error(exc)
            raise
        except WatlowError as exc:
            self._record_error(exc)
            _log.warning(
                "command error: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise

    # Decode outside the lock — pure compute on the captured reply.
    try:
        if stdbus_variant is not None:
            response = stdbus_variant.decode(reply, ctx)
        else:
            # ``modbus_variant is not None`` per the resolution above;
            # mypy/pyright follow the narrowing without an ``assert``.
            response = modbus_variant.decode(reply, ctx, request)  # type: ignore[union-attr]
    except (
        WatlowNoSuchObjectError,
        WatlowNoSuchAttributeError,
        WatlowProtocolUnsupportedError,
    ) as exc:
        # Decode-side "we don't have this": same availability
        # transition as the wire-side rejection above.
        self._availability[cache_key] = Availability.UNSUPPORTED
        self._record_error(exc)
        _log.warning(
            "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
            kind.value,
            command.name,
            cache_key,
            exc,
        )
        raise
    except WatlowProtocolError as exc:
        # Decode-failure parity with the inside-lock branch above:
        # NoSuchInstance / IllegalDataValue / generic decode errors
        # don't transition availability per design §5b.
        self._record_error(exc)
        raise

    elapsed = time.monotonic() - started
    self._availability[cache_key] = Availability.SUPPORTED
    _log.debug(
        "session exec ok protocol=%s cmd=%s key=%s elapsed=%.4fs",
        kind.value,
        command.name,
        cache_key,
        elapsed,
    )
    return response

invalidate_comms_unit_label

invalidate_comms_unit_label()

Drop the cached 17050 value so the next read re-queries the device.

Source code in src/watlowlib/devices/session.py
def invalidate_comms_unit_label(self) -> None:
    """Drop the cached 17050 value so the next read re-queries the device."""
    self._comms_unit_label = None
    self._comms_unit_label_loaded = False

set_wire_temperature_unit

set_wire_temperature_unit(unit)

Set the wire temperature scale from an authoritative source.

Called by an identity strategy that has read the device's own unit register (e.g. the Series SD reg 18). Unlike the PM path — where the unit register can lie, so the value stays a user assertion — this is the device telling us its comms scale directly, so it is honest to adopt it. Resets the one-shot "trusting user-asserted unit" warning since this value is device-sourced, not user-asserted.

Source code in src/watlowlib/devices/session.py
def set_wire_temperature_unit(self, unit: Unit | None) -> None:
    """Set the wire temperature scale from an authoritative source.

    Called by an identity strategy that has read the device's own
    unit register (e.g. the Series SD reg 18). Unlike the PM path —
    where the unit register can lie, so the value stays a user
    assertion — this is the device telling us its comms scale
    directly, so it is honest to adopt it. Resets the one-shot
    "trusting user-asserted unit" warning since this value is
    device-sourced, not user-asserted.
    """
    self._wire_temperature_unit = unit
    self._wire_temperature_unit_warned = True

wire_temperature_unit

wire_temperature_unit()

Return the user-asserted scale of temperature values on the wire.

This is what :class:Reading.unit / :class:Sample.unit get tagged with for temperature parameters. None when the user did not pass assert_wire_temperature_unit= to :func:watlowlib.open_device; in that case temperature readings carry unit=None rather than guess.

Pure accessor — no I/O. Logs a one-shot WARN the first time an asserted value is consumed so the user-assertion shows up plainly in capture logs.

Source code in src/watlowlib/devices/session.py
def wire_temperature_unit(self) -> Unit | None:
    """Return the user-asserted scale of temperature values on the wire.

    This is what :class:`Reading.unit` / :class:`Sample.unit` get
    tagged with for temperature parameters. ``None`` when the user
    did not pass ``assert_wire_temperature_unit=`` to
    :func:`watlowlib.open_device`; in that case temperature
    readings carry ``unit=None`` rather than guess.

    Pure accessor — no I/O. Logs a one-shot WARN the first time
    an asserted value is consumed so the user-assertion shows up
    plainly in capture logs.
    """
    if self._wire_temperature_unit is not None and not self._wire_temperature_unit_warned:
        _log.warning(
            "trusting user-asserted wire temperature unit %s for port=%s "
            "address=%s; not independently verified by the library "
            "(parameter 17050 is label-only on at least one PM firmware)",
            self._wire_temperature_unit.value,
            self._port,
            self._address,
        )
        self._wire_temperature_unit_warned = True
    return self._wire_temperature_unit

WatlowDeviceSnapshot dataclass

WatlowDeviceSnapshot(
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
    family,
    capabilities,
    availability_summary=(lambda: {})(),
)

Bases: DeviceSnapshot

Watlow-specific extension of :class:DeviceSnapshot.

Attributes:

Name Type Description
family ControllerFamily | None

Decoded :class:ControllerFamily, or None before :meth:identify has run.

capabilities Capability

SKU-decoded :class:Capability flags.

availability_summary Mapping[str, Availability]

Frozen mapping of command names that the session has marked :attr:Availability.UNSUPPORTED. The mapping is bounded by the parameter registry size (typically small).

classify_family

classify_family(part_number)

Return the :class:ControllerFamily for a part-number string.

Only the leading family discriminator is parsed; per-family digit decoding is in :func:decode_part_number.

Source code in src/watlowlib/registry/families.py
def classify_family(part_number: str) -> ControllerFamily:
    """Return the :class:`ControllerFamily` for a part-number string.

    Only the leading family discriminator is parsed; per-family digit
    decoding is in :func:`decode_part_number`.
    """
    head = part_number.strip().upper()
    if head.startswith("PM"):
        return ControllerFamily.PM
    if head.startswith("RM"):
        return ControllerFamily.RM
    if head.startswith("ST"):
        return ControllerFamily.ST
    if head.startswith("F4T"):
        return ControllerFamily.F4T
    if head.startswith("SD"):
        return ControllerFamily.SD
    return ControllerFamily.UNKNOWN

find_devices async

find_devices(
    *,
    ports=None,
    addresses=None,
    baudrates=None,
    protocols=None,
    profiles=None,
    serial_template=None,
    per_probe_timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Probe local serial ports for Watlow controllers.

Parameters:

Name Type Description Default
ports Sequence[str] | None

Serial-port paths to scan. None enumerates every visible port via :func:anyserial.list_serial_ports. An empty sequence returns [] without enumeration.

None
addresses Sequence[int] | None

Bus addresses to probe per (port, baudrate, protocol) combination. Defaults to :data:DEFAULT_DISCOVERY_ADDRESSES ((1,)). Std Bus accepts 1..16; Modbus RTU accepts 1..247. Out-of-range addresses for a given protocol are still emitted as ok=False rows carrying a :class:WatlowConfigurationError.

None
baudrates Sequence[int] | None

Baud rates to try. Defaults to :data:DEFAULT_DISCOVERY_BAUDRATES.

None
protocols Sequence[ProtocolKind] | None

Wire protocols to probe. Defaults to :data:DEFAULT_DISCOVERY_PROTOCOLS. ProtocolKind.AUTO is not accepted here (one row per concrete probe). Ignored when profiles is given.

None
profiles Sequence[DeviceProfile] | None

Device profiles to probe. When given, discovery iterates profiles instead of protocols — each profile contributes its own default_protocol, factory serial framing (so the Series SD's 8-N-1 is used, not the PM Modbus 8-E-1), parameter registry, and identity strategy. Pass :data:~watlowlib.devices.profile.DEVICE_PROFILES to sweep for every known device type (PM over Std Bus + SD over Modbus). None (default) keeps the historical protocol-centric scan against the EZ-ZONE PM profile.

None
serial_template SerialSettings | None

Optional :class:SerialSettings whose parity / bytesize / stopbits / rtscts / xonxoff / exclusive fields override the per-protocol factory framing for every probe. port and baudrate are always overwritten per iteration.

None
per_probe_timeout_s float

Per-probe budget. Bounds the :meth:Controller.identify call (four bounded sub-reads) so a silent address bails after one round-trip rather than four. Defaults to 0.5 — a four-port × three-baud × two-protocol scan with one address per combo lands in ~12 s of wall-clock.

_DEFAULT_PROBE_TIMEOUT_S

Returns:

Name Type Description
One list[DiscoveryResult]

class:DiscoveryResult per (port × baudrate × protocol ×

list[DiscoveryResult]

address) tuple, in input order. The cartesian product is

list[DiscoveryResult]

iterated outermost-port, then baudrate, then protocol, then

list[DiscoveryResult]

address — same input → same output ordering.

Raises:

Type Description
WatlowConfigurationError

protocols contains :attr:ProtocolKind.AUTO, or per_probe_timeout_s is non-positive.

Notes
  • Read-only. Discovery never writes to the device; it only calls :meth:Controller.identify (four parameter reads). Safe to run on rigs that already have other software talking to the controller.
  • Per-port short-circuit. If a port fails to open with a :class:WatlowConnectionError, the rest of the scan for that port emits ok=False rows without re-attempting the open. This avoids hammering a port the kernel won't give us.
Source code in src/watlowlib/devices/discovery.py
async def find_devices(
    *,
    ports: Sequence[str] | None = None,
    addresses: Sequence[int] | None = None,
    baudrates: Sequence[int] | None = None,
    protocols: Sequence[ProtocolKind] | None = None,
    profiles: Sequence[DeviceProfile] | None = None,
    serial_template: SerialSettings | None = None,
    per_probe_timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> list[DiscoveryResult]:
    """Probe local serial ports for Watlow controllers.

    Args:
        ports: Serial-port paths to scan. ``None`` enumerates every
            visible port via :func:`anyserial.list_serial_ports`. An
            empty sequence returns ``[]`` without enumeration.
        addresses: Bus addresses to probe per (port, baudrate, protocol)
            combination. Defaults to :data:`DEFAULT_DISCOVERY_ADDRESSES`
            (``(1,)``). Std Bus accepts ``1..16``; Modbus RTU accepts
            ``1..247``. Out-of-range addresses for a given protocol are
            still emitted as ``ok=False`` rows carrying a
            :class:`WatlowConfigurationError`.
        baudrates: Baud rates to try. Defaults to
            :data:`DEFAULT_DISCOVERY_BAUDRATES`.
        protocols: Wire protocols to probe. Defaults to
            :data:`DEFAULT_DISCOVERY_PROTOCOLS`. ``ProtocolKind.AUTO``
            is not accepted here (one row per concrete probe). Ignored
            when ``profiles`` is given.
        profiles: Device profiles to probe. When given, discovery
            iterates profiles instead of ``protocols`` — each profile
            contributes its own ``default_protocol``, factory serial
            framing (so the Series SD's 8-N-1 is used, not the PM
            Modbus 8-E-1), parameter registry, and identity strategy.
            Pass :data:`~watlowlib.devices.profile.DEVICE_PROFILES` to
            sweep for every known device type (PM over Std Bus + SD over
            Modbus). ``None`` (default) keeps the historical
            protocol-centric scan against the EZ-ZONE PM profile.
        serial_template: Optional :class:`SerialSettings` whose
            ``parity`` / ``bytesize`` / ``stopbits`` / ``rtscts`` /
            ``xonxoff`` / ``exclusive`` fields override the
            per-protocol factory framing for every probe. ``port``
            and ``baudrate`` are always overwritten per iteration.
        per_probe_timeout_s: Per-probe budget. Bounds the
            :meth:`Controller.identify` call (four bounded sub-reads)
            so a silent address bails after one round-trip rather than
            four. Defaults to ``0.5`` — a four-port × three-baud ×
            two-protocol scan with one address per combo lands in
            ~12 s of wall-clock.

    Returns:
        One :class:`DiscoveryResult` per (port × baudrate × protocol ×
        address) tuple, in input order. The cartesian product is
        iterated outermost-port, then baudrate, then protocol, then
        address — same input → same output ordering.

    Raises:
        WatlowConfigurationError: ``protocols`` contains
            :attr:`ProtocolKind.AUTO`, or ``per_probe_timeout_s`` is
            non-positive.

    Notes:
        - **Read-only.** Discovery never writes to the device; it
          only calls :meth:`Controller.identify` (four parameter
          reads). Safe to run on rigs that already have other
          software talking to the controller.
        - **Per-port short-circuit.** If a port fails to open with a
          :class:`WatlowConnectionError`, the rest of the scan for
          that port emits ``ok=False`` rows without re-attempting the
          open. This avoids hammering a port the kernel won't give us.
    """
    if per_probe_timeout_s <= 0:
        from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415 — cold path

        raise WatlowConfigurationError(
            f"per_probe_timeout_s must be positive; got {per_probe_timeout_s!r}",
        )

    resolved_ports = await _resolve_ports(ports)
    resolved_addresses = tuple(addresses) if addresses is not None else DEFAULT_DISCOVERY_ADDRESSES
    resolved_baudrates = tuple(baudrates) if baudrates is not None else DEFAULT_DISCOVERY_BAUDRATES

    # Build the per-probe plan: one entry per (protocol, profile,
    # framing_base). ``profiles`` (if given) drives the device type,
    # protocol, and factory framing per probe; otherwise we keep the
    # historical protocol-centric scan against the EZ-ZONE PM profile,
    # taking framing from ``SerialSettings.factory_for`` (None below).
    plan: tuple[tuple[ProtocolKind, DeviceProfile, SerialSettings | None], ...]
    if profiles is not None:
        plan = tuple((p.default_protocol, p, p.default_serial) for p in profiles)
    else:
        resolved_protocols = (
            tuple(protocols) if protocols is not None else DEFAULT_DISCOVERY_PROTOCOLS
        )
        if ProtocolKind.AUTO in resolved_protocols:
            from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415 — cold path

            raise WatlowConfigurationError(
                "find_devices does not accept ProtocolKind.AUTO; pass concrete "
                "protocols (STDBUS, MODBUS_RTU). Auto-detection is a single-port "
                "API on open_device.",
            )
        plan = tuple((protocol, EZZONE_PROFILE, None) for protocol in resolved_protocols)

    results: list[DiscoveryResult] = []
    dead_ports: set[str] = set()
    for port in resolved_ports:
        for baud in resolved_baudrates:
            for protocol, profile, framing_base in plan:
                if port in dead_ports:
                    # Emit one row per planned address so callers see
                    # the same cartesian-product shape regardless of
                    # whether the port opened.
                    error = WatlowConnectionError(
                        f"port {port!r} previously failed to open in this scan",
                        context=ErrorContext(
                            port=port,
                            protocol=protocol,
                        ),
                    )
                    results.extend(
                        DiscoveryResult(
                            ok=False,
                            port=port,
                            address=address,
                            baudrate=baud,
                            protocol=protocol,
                            device_info=None,
                            error=error,
                            elapsed_s=0.0,
                        )
                        for address in resolved_addresses
                    )
                    continue

                settings = _build_settings(
                    port=port,
                    baudrate=baud,
                    protocol=protocol,
                    template=serial_template,
                    framing_base=framing_base,
                )
                rows, port_died = await _probe_combo(
                    port=port,
                    baudrate=baud,
                    protocol=protocol,
                    profile=profile,
                    addresses=resolved_addresses,
                    serial_settings=settings,
                    timeout_s=per_probe_timeout_s,
                )
                results.extend(rows)
                if port_died:
                    dead_ports.add(port)
    return results

open_device async

open_device(
    port,
    *,
    profile=EZZONE_PROFILE,
    protocol=None,
    address=1,
    serial_settings=None,
    assert_wire_temperature_unit=None,
    identify=True,
)

Open a controller on a serial port.

Parameters:

Name Type Description Default
port str

Serial-port path (/dev/ttyUSB0, COM3, ...).

required
profile DeviceProfile

The device profile to open against. Defaults to :data:~watlowlib.devices.profile.EZZONE_PROFILE (EZ-ZONE PM), which preserves all historical behaviour. Pass :data:~watlowlib.devices.profile.SERIES_SD_PROFILE for a Series SD. The profile supplies the default protocol, factory serial framing, parameter registry, identity strategy, and (for the SD) the wire temperature unit.

EZZONE_PROFILE
protocol ProtocolKind | None

Wire protocol. None (default) adopts profile.default_protocol (Std Bus for PM, Modbus RTU for SD). STDBUS / MODBUS_RTU open directly; AUTO runs the conservative detector (Std Bus → Modbus → fail) per docs/design.md §7.

None
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247. Under AUTO the same address is tried against both probes.

1
serial_settings SerialSettings | None

Optional framing override. None (default) adopts profile.default_serial with port applied — 38400 8-N-1 for the EZ-ZONE PM Std Bus profile, 9600 8-N-1 for the Series SD profile. An explicit :class:SerialSettings still has its port forced to the positional port arg. There is no baud sweeping in the open path (cross-cutting invariant 5).

None
identify bool

When True (default), :meth:Controller.identify runs after the transport opens so :meth:Controller.snapshot renders without further wire I/O. Set False for the fast-path open scenarios where caller code drives identity itself or wants the open to return immediately.

True
assert_wire_temperature_unit Unit | str | None

User-asserted scale of temperature values on the wire. Sets :class:Reading.unit / :class:Sample.unit for temperature parameters. Accepts a :class:Unit or a case-insensitive string alias ("C", "F", "celsius", "degF", "°C", ...). :attr:Unit.PERCENT is rejected. None (the default) means temperature readings carry unit=None. The library does not infer this from parameter 17050 — on at least one PM3 firmware 17050 is a label-only register and would silently mis-tag. Verify the actual scale externally — the bundled watlow-diag probe-unit CLI automates the comparison against a known panel reading; see docs/devices.md §Units — before asserting it here.

None

Returns:

Type Description
Controller

An opened :class:Controller whose transport is ready for

Controller

meth:Controller.poll / :meth:Controller.poll_many calls.

Controller

Every protocol (STDBUS, MODBUS_RTU, AUTO) returns

Controller

an opened controller; __aenter__ is a no-op and

Controller

__aexit__ closes the transport.

Raises:

Type Description
WatlowConfigurationError

address is out of range or protocol is unsupported.

WatlowValidationError

assert_wire_temperature_unit is :attr:Unit.PERCENT or an unrecognised alias.

WatlowProtocolUnsupportedError

protocol=AUTO and both probes failed.

Source code in src/watlowlib/devices/factory.py
async def open_device(
    port: str,
    *,
    profile: DeviceProfile = EZZONE_PROFILE,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    assert_wire_temperature_unit: Unit | str | None = None,
    identify: bool = True,
) -> Controller:
    """Open a controller on a serial port.

    Args:
        port: Serial-port path (``/dev/ttyUSB0``, ``COM3``, ...).
        profile: The device profile to open against. Defaults to
            :data:`~watlowlib.devices.profile.EZZONE_PROFILE` (EZ-ZONE
            PM), which preserves all historical behaviour. Pass
            :data:`~watlowlib.devices.profile.SERIES_SD_PROFILE` for a
            Series SD. The profile supplies the default protocol,
            factory serial framing, parameter registry, identity
            strategy, and (for the SD) the wire temperature unit.
        protocol: Wire protocol. ``None`` (default) adopts
            ``profile.default_protocol`` (Std Bus for PM, Modbus RTU for
            SD). ``STDBUS`` / ``MODBUS_RTU`` open directly; ``AUTO``
            runs the conservative detector (Std Bus → Modbus → fail)
            per ``docs/design.md`` §7.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``. Under ``AUTO`` the same address is
            tried against both probes.
        serial_settings: Optional framing override. ``None`` (default)
            adopts ``profile.default_serial`` with ``port`` applied —
            **38400 8-N-1** for the EZ-ZONE PM Std Bus profile, **9600
            8-N-1** for the Series SD profile. An explicit
            :class:`SerialSettings` still has its ``port`` forced to the
            positional ``port`` arg. There is no baud sweeping in the
            open path (cross-cutting invariant 5).
        identify: When ``True`` (default), :meth:`Controller.identify`
            runs after the transport opens so :meth:`Controller.snapshot`
            renders without further wire I/O. Set ``False`` for the
            fast-path open scenarios where caller code drives identity
            itself or wants the open to return immediately.
        assert_wire_temperature_unit: User-asserted scale of
            temperature values on the wire. Sets
            :class:`Reading.unit` / :class:`Sample.unit` for
            temperature parameters. Accepts a :class:`Unit` or a
            case-insensitive string alias (``"C"``, ``"F"``,
            ``"celsius"``, ``"degF"``, ``"°C"``, ...).
            :attr:`Unit.PERCENT` is rejected. ``None`` (the default)
            means temperature readings carry ``unit=None``. The
            library does **not** infer this from parameter 17050 —
            on at least one PM3 firmware 17050 is a label-only
            register and would silently mis-tag. Verify the actual
            scale externally — the bundled
            ``watlow-diag probe-unit`` CLI automates the comparison
            against a known panel reading; see ``docs/devices.md``
            §Units — before asserting it here.

    Returns:
        An *opened* :class:`Controller` whose transport is ready for
        :meth:`Controller.poll` / :meth:`Controller.poll_many` calls.
        Every protocol (``STDBUS``, ``MODBUS_RTU``, ``AUTO``) returns
        an opened controller; ``__aenter__`` is a no-op and
        ``__aexit__`` closes the transport.

    Raises:
        WatlowConfigurationError: ``address`` is out of range or
            ``protocol`` is unsupported.
        WatlowValidationError: ``assert_wire_temperature_unit`` is
            :attr:`Unit.PERCENT` or an unrecognised alias.
        WatlowProtocolUnsupportedError: ``protocol=AUTO`` and both
            probes failed.
    """
    # ``protocol=None`` adopts the profile's factory protocol (Std Bus
    # for EZ-ZONE PM, Modbus RTU for Series SD).
    resolved_protocol = protocol if protocol is not None else profile.default_protocol
    if resolved_protocol not in (
        ProtocolKind.STDBUS,
        ProtocolKind.MODBUS_RTU,
        ProtocolKind.AUTO,
    ):
        raise WatlowConfigurationError(
            f"unsupported protocol kind: {resolved_protocol!r}",
            context=ErrorContext(port=port),
        )

    # ``assert_wire_temperature_unit`` overrides the profile default
    # (the SD knows it speaks °F; the PM contract is "user must assert").
    wire_unit = (
        profile.wire_temperature_unit
        if assert_wire_temperature_unit is None
        else coerce_wire_temperature_unit(assert_wire_temperature_unit)
    )

    # ``serial_settings=None`` adopts the profile's factory framing
    # (port applied from the positional arg). An explicit override still
    # has ``port`` forced to the positional arg to avoid silent surprise.
    if serial_settings is None:
        settings = replace(profile.default_serial, port=port)
    elif serial_settings.port != port:
        settings = replace(serial_settings, port=port)
    else:
        settings = serial_settings

    if resolved_protocol is ProtocolKind.AUTO:
        # Lazy import — keep the Std-Bus-only callers off the anymodbus
        # dep graph until they actually opt in to AUTO.
        from watlowlib.protocol.detect import detect_protocol  # noqa: PLC0415

        resolved = await detect_protocol(
            port,
            address=address,
            serial_settings=settings,
        )
        # Detector returned an *open* transport already paired with the
        # right client; build the controller around them and skip
        # ``Controller.__aenter__``'s open() (it short-circuits when
        # ``transport.is_open`` is already True).
        session = Session(
            resolved.client,
            profile=profile,
            address=address,
            port=resolved.transport.label,
            wire_temperature_unit=wire_unit,
        )
        controller = Controller(session, resolved.transport, serial_settings=settings)
        if identify:
            await controller.identify()
        return controller

    transport: Transport
    if resolved_protocol is ProtocolKind.MODBUS_RTU:
        # Lazy import — keep the Std-Bus path off the anymodbus dep
        # graph for users who never reach for Modbus.
        from watlowlib.protocol.modbus.transport import (  # noqa: PLC0415
            ModbusBusTransport,
        )

        transport = ModbusBusTransport(settings)
    else:
        transport = SerialTransport(settings)
    controller = await _open_controller(
        transport,
        profile=profile,
        protocol=resolved_protocol,
        address=address,
        serial_settings=settings,
        wire_temperature_unit=wire_unit,
    )
    if identify:
        await controller.identify()
    return controller

Controller

watlowlib.devices.controller

The :class:Controller facade — public API for one device.

Single-device surface:

  • :meth:identify
  • :meth:read_pv / :meth:read_setpoint / :meth:set_setpoint
  • :meth:read_parameter / :meth:write_parameter
  • :meth:loop (multi-loop access), PID, alarms

Lifecycle is async-context-manager: async with await open_device(...) opens the transport on __aenter__ and disposes the protocol client + closes the transport on __aexit__.

Controller

Controller(session, transport, *, serial_settings)

Async facade for a single Watlow controller.

Source code in src/watlowlib/devices/controller.py
def __init__(
    self,
    session: Session,
    transport: Transport,
    *,
    serial_settings: SerialSettings,
) -> None:
    self._session = session
    self._transport = transport
    self._serial_settings = serial_settings
    # Cached loop count populated by :meth:`identify`. ``None`` means
    # "we haven't asked yet" — :meth:`loop` then defers validation
    # to the registry's per-spec ``max_instance``. Concrete count is
    # what the part-number decoder produced from the captured part
    # string (PM3 → 1, PM6/8/9 + ``U`` control → 2, etc.).
    self._loops: int | None = None
    # Cached SKU-derived capabilities populated by :meth:`identify`.
    # ``None`` until the part number has been decoded; downstream
    # operations that gate on bits (cool-side PID, etc.) treat
    # ``None`` as "no information, no gate" so calls work pre-
    # identify without surprising the user.
    self._capabilities: Capability | None = None
    # Full cached :class:`DeviceInfo` populated by :meth:`identify`.
    # Drives :meth:`snapshot` so no wire I/O is needed to render
    # the controller's identity. ``None`` until identify runs.
    self._device_info: DeviceInfo | None = None

capabilities property

capabilities

Cached SKU capabilities (set after :meth:identify).

None pre-identify so capability-gated operations behave permissively until the part number is captured. After :meth:identify, callers can branch on :attr:Capability.HAS_COOLING etc. without re-issuing identify.

loops property

loops

Cached loop count (set after :meth:identify).

None until the device's part number has been decoded; :meth:loop accepts any 1-indexed value while loops is None and falls back to per-spec validation at the first wire call. After :meth:identify, loops reflects the decoded value.

serial_settings property

serial_settings

Serial framing the controller was opened with.

Exposed so an identity strategy (see :mod:watlowlib.devices.profile) can stamp it onto the :class:DeviceInfo it builds.

session property

session

Underlying session used for command dispatch.

close async

close()

Close the underlying transport and dispose the protocol client.

Source code in src/watlowlib/devices/controller.py
async def close(self) -> None:
    """Close the underlying transport and dispose the protocol client."""
    # The session holds a reference to the protocol client; dispose
    # it so any pending caller learns the controller is gone before
    # the transport close races them.
    try:
        self._session.dispose()
    finally:
        await self._transport.close()

identify async

identify(
    *,
    timeout=None,
    strict=False,
    query_configured_protocol=False,
)

Read the identity parameters and return a :class:DeviceInfo.

Reads (in order): part number (1009), hardware id (1001), firmware id (1002), serial number. Missing secondary fields stay None and the result's :attr:DeviceInfo.health is promoted from :attr:DeviceHealth.OK to :attr:DeviceHealth.PARTIAL. If the part-number read itself fails, the result's health is :attr:DeviceHealth.FAILED and capability decoding is skipped (the family prior still applies).

Parameters:

Name Type Description Default
timeout float | None

Per-read timeout override.

None
strict bool

If True, raise the underlying error when the part-number read fails instead of returning a health=FAILED info. Use this in maintenance code paths that need to know the device actually answered before declaring success.

False
query_configured_protocol bool

If True, also read parameter 17009 (Protocol) and populate :attr:DeviceInfo.configured_protocol. Off by default because the read costs an extra round-trip; the maintenance verify pass and the discover CLI opt in.

False

Raises:

Type Description
WatlowError

When strict=True and the part-number read fails. The original transport / protocol error class is preserved.

Source code in src/watlowlib/devices/controller.py
async def identify(
    self,
    *,
    timeout: float | None = None,
    strict: bool = False,
    query_configured_protocol: bool = False,
) -> DeviceInfo:
    """Read the identity parameters and return a :class:`DeviceInfo`.

    Reads (in order): part number (1009), hardware id (1001),
    firmware id (1002), serial number. Missing secondary fields
    stay ``None`` and the result's :attr:`DeviceInfo.health` is
    promoted from :attr:`DeviceHealth.OK` to
    :attr:`DeviceHealth.PARTIAL`. If the part-number read itself
    fails, the result's health is :attr:`DeviceHealth.FAILED` and
    capability decoding is skipped (the family prior still
    applies).

    Args:
        timeout: Per-read timeout override.
        strict: If ``True``, raise the underlying error when the
            part-number read fails instead of returning a
            ``health=FAILED`` info. Use this in maintenance code
            paths that need to know the device actually answered
            before declaring success.
        query_configured_protocol: If ``True``, also read parameter
            17009 (Protocol) and populate
            :attr:`DeviceInfo.configured_protocol`. Off by default
            because the read costs an extra round-trip; the
            maintenance verify pass and the discover CLI opt in.

    Raises:
        WatlowError: When ``strict=True`` and the part-number read
            fails. The original transport / protocol error class
            is preserved.
    """
    # Device-neutral: the bound profile owns the family-specific
    # identity sequence (EZ-ZONE PM reads 1009/1001/1002/serial;
    # Series SD reads the numeric 10/11/13 + serial 7-8 + reg 18).
    info = await self._session.profile.identify(
        self,
        timeout=timeout,
        strict=strict,
        query_configured_protocol=query_configured_protocol,
    )
    # Cache for ``self.loop(n)``'s eager validator and ``snapshot``.
    # Identify is the canonical place that sets these — open()
    # doesn't have the identity yet, and a device's loop count /
    # capabilities never change mid-session.
    self._loops = info.loops
    self._capabilities = info.capabilities
    self._device_info = info
    return info

loop

loop(n)

Return a sub-facade bound to loop n (1-indexed).

n is validated eagerly when :attr:loops is known, otherwise per-spec max_instance validation kicks in at the first wire call. Multi-loop access is the public way to reach loop 2 on dual-loop devices — :meth:Controller.read_pv defaults to instance=1.

Source code in src/watlowlib/devices/controller.py
def loop(self, n: int) -> ControllerLoop:
    """Return a sub-facade bound to loop ``n`` (1-indexed).

    ``n`` is validated eagerly when :attr:`loops` is known,
    otherwise per-spec ``max_instance`` validation kicks in at the
    first wire call. Multi-loop access is the public way to reach
    loop 2 on dual-loop devices —
    :meth:`Controller.read_pv` defaults to ``instance=1``.
    """
    return ControllerLoop(self, n)

poll async

poll(*, instance=1, timeout=None)

Read the active process value — the canonical no-arg snapshot.

Equivalent to :meth:read_pv. The no-arg form aligns with the ecosystem poll() convention shared by alicatlib.Device, sartoriuslib.Balance, and nidaqlib.DaqSession: a single, default-shaped reading per call.

For multi-parameter polling use :meth:poll_many.

Source code in src/watlowlib/devices/controller.py
async def poll(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active process value — the canonical no-arg snapshot.

    Equivalent to :meth:`read_pv`. The no-arg form aligns with the
    ecosystem ``poll()`` convention shared by ``alicatlib.Device``,
    ``sartoriuslib.Balance``, and ``nidaqlib.DaqSession``: a
    single, default-shaped reading per call.

    For multi-parameter polling use :meth:`poll_many`.
    """
    return await self.read_pv(instance=instance, timeout=timeout)

poll_many async

poll_many(parameters, *, names=None, instances=(1,))

Read every (parameter × instance) and return them as :class:Sample\ s.

Satisfies the :class:watlowlib.streaming.PollSource Protocol so a solo :class:Controller can drive :func:watlowlib.streaming.record directly without a manager. names is accepted for Protocol compatibility but ignored — a Controller has only one device.

Failed reads are dropped from the returned list and logged at WARN. The recorder treats absence as "drop this row from the batch" and continues with the next tick.

Source code in src/watlowlib/devices/controller.py
async def poll_many(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    r"""Read every (parameter × instance) and return them as :class:`Sample`\ s.

    Satisfies the :class:`watlowlib.streaming.PollSource` Protocol so
    a solo :class:`Controller` can drive :func:`watlowlib.streaming.record`
    directly without a manager. ``names`` is accepted for Protocol
    compatibility but ignored — a Controller has only one device.

    Failed reads are dropped from the returned list and logged at
    WARN. The recorder treats absence as "drop this row from the
    batch" and continues with the next tick.
    """
    del names  # solo controller has no name-keyed device map
    from watlowlib.streaming._poll import poll_controller  # noqa: PLC0415 — avoid cycle

    return await poll_controller(
        self,
        name=self._transport.label,
        parameters=parameters,
        instances=instances,
    )

read_comms_unit_label async

read_comms_unit_label(*, timeout=None)

Read (and cache) the value parameter 17050 reports.

Inspection / diagnostics helper. Does not drive :class:Reading.unit: on at least one PM3 firmware revision 17050 is a label-only register that changes the enum the device reports for itself but does not affect the scale of temperature values exchanged over comms.

To tell watlowlib what scale temperatures actually travel in over the wire, pass assert_wire_temperature_unit= to :func:watlowlib.open_device. That assertion is what feeds :class:Reading.unit.

Distinct from read_parameter("units"), which targets parameter 3005 (front-panel display). The two can disagree on a real device.

Returns None if the device doesn't report a known code.

Source code in src/watlowlib/devices/controller.py
async def read_comms_unit_label(self, *, timeout: float | None = None) -> Unit | None:
    """Read (and cache) the value parameter 17050 reports.

    Inspection / diagnostics helper. **Does not** drive
    :class:`Reading.unit`: on at least one PM3 firmware revision
    17050 is a label-only register that changes the enum the
    device reports for itself but does not affect the scale of
    temperature values exchanged over comms.

    To tell watlowlib what scale temperatures actually travel in
    over the wire, pass ``assert_wire_temperature_unit=`` to
    :func:`watlowlib.open_device`. That assertion is what feeds
    :class:`Reading.unit`.

    Distinct from ``read_parameter("units")``, which targets
    parameter 3005 (front-panel display). The two can disagree on
    a real device.

    Returns ``None`` if the device doesn't report a known code.
    """
    del timeout  # comms_unit_label() is cached + uses session defaults
    return await self._session.comms_unit_label()

read_parameter async

read_parameter(name_or_id, *, instance=1, timeout=None)

Read any registry parameter.

instance=1 is the default for single-loop devices and the first loop / channel on multi-loop devices.

Source code in src/watlowlib/devices/controller.py
async def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Read any registry parameter.

    ``instance=1`` is the default for single-loop devices and the
    first loop / channel on multi-loop devices.
    """
    return await self._session.execute(
        READ_PARAMETER,
        ReadParameterRequest(name_or_id, instance=instance),
        timeout=timeout,
    )

read_pv async

read_pv(*, instance=1, timeout=None)

Read the process value for instance (loop number, 1-indexed).

Source code in src/watlowlib/devices/controller.py
async def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the process value for ``instance`` (loop number, 1-indexed)."""
    entry = await self.read_parameter("process_value", instance=instance, timeout=timeout)
    return await reading_from_entry(self._session, entry)

read_setpoint async

read_setpoint(*, instance=1, timeout=None)

Read the active setpoint for instance.

Source code in src/watlowlib/devices/controller.py
async def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Read the active setpoint for ``instance``."""
    entry = await self.read_parameter("setpoint", instance=instance, timeout=timeout)
    return await reading_from_entry(self._session, entry)

set_comms_unit_label async

set_comms_unit_label(unit, *, confirm=False, timeout=None)

Set parameter 17050 ("Communications - Display Units").

Accepts a :class:Unit or a case-insensitive string alias ("C" / "F" / "celsius" / "fahrenheit" / "degC" / "degF" / "°C" / "°F"). :attr:Unit.PERCENT is rejected pre-I/O — the register is temperature-only.

Raw enumeration codes (15 / 30) are not accepted here. Callers who want the lower-level path use write_parameter("display_units", 30).

.. warning::

On at least one PM3 firmware revision this register is
**label-only**: writing it changes the enum the device
reports when 17050 is read back, but does not change the
scale of temperature values exchanged over comms. This
setter therefore does **not** affect
:class:`Reading.unit`. To tell watlowlib what scale
temperatures are actually on, pass
``assert_wire_temperature_unit=`` to
:func:`watlowlib.open_device`.

Persistent write (parameter 17050 is RWE); pass confirm=True to acknowledge the EEPROM write. The session raises :class:WatlowConfirmationRequiredError pre-I/O if missing.

Returns the device-echoed label after the write. None if the device's echo decodes outside the known codes.

Source code in src/watlowlib/devices/controller.py
async def set_comms_unit_label(
    self,
    unit: Unit | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Unit | None:
    """Set parameter 17050 ("Communications - Display Units").

    Accepts a :class:`Unit` or a case-insensitive string alias
    (``"C"`` / ``"F"`` / ``"celsius"`` / ``"fahrenheit"`` /
    ``"degC"`` / ``"degF"`` / ``"°C"`` / ``"°F"``).
    :attr:`Unit.PERCENT` is rejected pre-I/O — the register is
    temperature-only.

    Raw enumeration codes (15 / 30) are not accepted here. Callers
    who want the lower-level path use
    ``write_parameter("display_units", 30)``.

    .. warning::

        On at least one PM3 firmware revision this register is
        **label-only**: writing it changes the enum the device
        reports when 17050 is read back, but does not change the
        scale of temperature values exchanged over comms. This
        setter therefore does **not** affect
        :class:`Reading.unit`. To tell watlowlib what scale
        temperatures are actually on, pass
        ``assert_wire_temperature_unit=`` to
        :func:`watlowlib.open_device`.

    Persistent write (parameter 17050 is RWE); pass ``confirm=True``
    to acknowledge the EEPROM write. The session raises
    :class:`WatlowConfirmationRequiredError` pre-I/O if missing.

    Returns the device-echoed label after the write. ``None`` if
    the device's echo decodes outside the known codes.
    """
    resolved = coerce_unit(unit)
    code = display_code_for_unit(resolved)
    if code is None:
        raise WatlowValidationError(
            "set_comms_unit_label accepts CELSIUS / FAHRENHEIT only; "
            "PERCENT is not a valid display-unit code",
        )
    # PERSISTENT write — session enforces ``confirm=True`` pre-I/O.
    await self.write_parameter(
        "display_units",
        code,
        confirm=confirm,
        timeout=timeout,
    )
    self._session.invalidate_comms_unit_label()
    return await self._session.comms_unit_label()

set_persistent_writes async

set_persistent_writes(
    enabled, *, confirm=False, timeout=None
)

Toggle whether subsequent writes persist to non-volatile memory.

Series-SD-specific. The SD persists every register write to EEPROM by default, so a high-rate writer (ramping setpoints, a tuning loop) can wear the EEPROM out and brick the controller. Writing 0 to register 17 keeps subsequent writes in RAM only; the device resets register 17 to 1 on every power cycle, so call set_persistent_writes(False) once after each power-up before a burst of writes (see sd_manual.txt p.84).

Parameters:

Name Type Description Default
enabled bool

True → persist writes to EEPROM (the power-on default); False → keep writes in RAM only (lost on power cycle, but spares the EEPROM).

required
confirm bool

The write itself is gated like any other parameter write — pass confirm=True to acknowledge it.

False
timeout float | None

Per-write timeout override.

None

Raises:

Type Description
WatlowConfirmationRequiredError

confirm is False.

WatlowValidationError

the bound profile's registry has no eeprom_write_enable parameter (e.g. an EZ-ZONE PM, which has no such register).

Source code in src/watlowlib/devices/controller.py
async def set_persistent_writes(
    self,
    enabled: bool,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> None:
    """Toggle whether subsequent writes persist to non-volatile memory.

    Series-SD-specific. The SD persists every register write to
    EEPROM by default, so a high-rate writer (ramping setpoints, a
    tuning loop) can wear the EEPROM out and brick the controller.
    Writing ``0`` to register 17 keeps subsequent writes in RAM
    only; the device resets register 17 to ``1`` on every power
    cycle, so call ``set_persistent_writes(False)`` once after each
    power-up before a burst of writes (see ``sd_manual.txt`` p.84).

    Args:
        enabled: ``True`` → persist writes to EEPROM (the power-on
            default); ``False`` → keep writes in RAM only (lost on
            power cycle, but spares the EEPROM).
        confirm: The write itself is gated like any other parameter
            write — pass ``confirm=True`` to acknowledge it.
        timeout: Per-write timeout override.

    Raises:
        WatlowConfirmationRequiredError: ``confirm`` is ``False``.
        WatlowValidationError: the bound profile's registry has no
            ``eeprom_write_enable`` parameter (e.g. an EZ-ZONE PM,
            which has no such register).
    """
    await self.write_parameter(
        "eeprom_write_enable",
        1 if enabled else 0,
        confirm=confirm,
        timeout=timeout,
    )

set_setpoint async

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Write the setpoint and return the device-echoed value as a :class:Reading.

Setpoint is RWES — pass confirm=True to acknowledge the EEPROM write. The returned reading is the device's echo of the value it accepted.

Source code in src/watlowlib/devices/controller.py
async def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write the setpoint and return the device-echoed value as a :class:`Reading`.

    Setpoint is RWES — pass ``confirm=True`` to acknowledge the
    EEPROM write. The returned reading is the device's echo of
    the value it accepted.
    """
    entry = await self.write_parameter(
        "setpoint",
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )
    return await reading_from_entry(self._session, entry)

snapshot async

snapshot(*, name=None)

Return an I/O-free :class:WatlowDeviceSnapshot.

Built from cached identity (populated by :meth:identify, which :func:watlowlib.open_device calls by default) plus the session's last error and per-command availability cache. Does not issue any reads — safe to call from monitoring loops at high cadence.

Parameters:

Name Type Description Default
name str | None

Override the snapshot's name field. Defaults to the controller's transport label, matching the manager-assigned name surfaced into emitted samples.

None
Source code in src/watlowlib/devices/controller.py
async def snapshot(self, *, name: str | None = None) -> WatlowDeviceSnapshot:
    """Return an I/O-free :class:`WatlowDeviceSnapshot`.

    Built from cached identity (populated by :meth:`identify`,
    which :func:`watlowlib.open_device` calls by default) plus
    the session's last error and per-command availability cache.
    Does **not** issue any reads — safe to call from monitoring
    loops at high cadence.

    Args:
        name: Override the snapshot's ``name`` field. Defaults to
            the controller's transport label, matching the
            manager-assigned name surfaced into emitted samples.
    """
    info = self._device_info
    model = info.part_number.raw if info is not None else None
    firmware = (
        str(info.firmware_id) if info is not None and info.firmware_id is not None else None
    )
    serial = info.serial_number if info is not None else None
    # Snapshot is built from cached state; availability_summary is
    # a frozen view of the session's UNSUPPORTED-marked commands.
    availability = {
        key: state
        for key, state in self._session.availability_summary().items()
        if state.name == "UNSUPPORTED"
    }
    return WatlowDeviceSnapshot(
        name=name if name is not None else self._transport.label,
        model=model,
        firmware=firmware,
        serial=serial,
        connected=self._transport.is_open,
        last_error=self._session.last_error,
        recoverable_error_count=self._session.recoverable_error_count,
        captured_at=datetime.now(UTC),
        family=info.family if info is not None else None,
        capabilities=self._capabilities if self._capabilities is not None else Capability.NONE,
        availability_summary=availability,
    )

write_parameter async

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Write any registry parameter.

Persistent (RWE / RWES) writes require confirm=True; the session raises :class:WatlowConfirmationRequiredError before any I/O if the gate is missing.

Source code in src/watlowlib/devices/controller.py
async def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Write any registry parameter.

    Persistent (RWE / RWES) writes require ``confirm=True``;
    the session raises :class:`WatlowConfirmationRequiredError`
    before any I/O if the gate is missing.
    """
    return await self._session.execute(
        WRITE_PARAMETER,
        WriteParameterRequest(name_or_id, value, instance=instance),
        confirm=confirm,
        timeout=timeout,
    )

Session

watlowlib.devices.session

The :class:Session — single dispatch point for every command.

The session is the only place that gates, logs, and updates :class:Availability. Variants are pure (ctx, request) → response functions; protocol clients only own the wire. Per docs/design.md invariant 2, no other layer touches these concerns.

Responsibilities (in order, per execute):

  1. Resolve the protocol variant. UNSUPPORTED is sticky — short- circuit pre-I/O on a typed error.
  2. Enforce confirm=True for :attr:SafetyTier.PERSISTENT writes.
  3. Acquire the per-port lock on the protocol client.
  4. Variant encodeclient.execute → variant decode.
  5. Map success / typed errors to availability transitions and log a structured event.

Variant signatures differ across protocols (see docs/design.md §5):

  • Std Bus variants take decode(reply, ctx) — the reply already carries the parameter selector echoed by the device.
  • Modbus variants take decode(words, ctx, request) — the wire carries no echo, so the variant re-resolves the spec from the request to interpret the words.

Session

Session(
    client,
    *,
    profile,
    address,
    port,
    wire_temperature_unit=None,
)

Owns availability cache, gates, and the dispatch loop.

A :class:Session is bound to exactly one :class:ProtocolClient for its lifetime — one protocol per port (invariant 1).

Source code in src/watlowlib/devices/session.py
def __init__(
    self,
    client: ProtocolClient[Any, Any],
    *,
    profile: DeviceProfile,
    address: int,
    port: str,
    wire_temperature_unit: Unit | None = None,
) -> None:
    self._client = client
    # The profile is the device-type bundle (family + registry +
    # framing + identity). ``registry`` / ``family`` stay exposed as
    # thin delegating properties so the streaming / manager layers
    # that learned those names keep working.
    self._profile = profile
    self._registry = profile.registry
    self._family = profile.family
    self._address = address
    self._port = port
    self._availability: dict[str, Availability] = {}
    # Scale of temperature values on the wire. An explicit
    # ``wire_temperature_unit`` (from
    # ``open_device(assert_wire_temperature_unit=...)``) wins;
    # otherwise the profile's own ``wire_temperature_unit`` seeds it
    # (the Series SD knows it speaks °F; the EZ-ZONE PM profile
    # leaves it ``None`` so the user must assert). Drives
    # :class:`Reading.unit` / :class:`Sample.unit` for temperature
    # parameters; ``None`` means "do not guess". The library makes
    # **no** attempt to derive this from PM parameter 17050 — on at
    # least one PM3 firmware that register is label-only and does
    # not govern the wire scale. See ``docs/devices.md`` §Units.
    self._wire_temperature_unit: Unit | None = (
        wire_temperature_unit
        if wire_temperature_unit is not None
        else profile.wire_temperature_unit
    )
    self._wire_temperature_unit_warned: bool = False
    # Lazy cache of parameter 17050's reported value. Kept purely
    # as an inspection helper exposed via
    # :meth:`Controller.read_comms_unit_label` — **does not** feed
    # :class:`Reading.unit`. ``_comms_unit_label_loaded`` distinguishes
    # "haven't asked yet" from "asked and got nothing"; subsequent
    # calls don't repeat the wire turn-around after a rejection.
    self._comms_unit_label: Unit | None = None
    self._comms_unit_label_loaded: bool = False
    # Most recent error's :class:`ErrorContext`, captured from any
    # :class:`WatlowError` that propagates out of :meth:`execute`.
    # Drives :class:`DeviceSnapshot.last_error`. Cleared by
    # :meth:`clear_last_error` (manual reset for diagnostics).
    self._last_error: ErrorContext | None = None
    # Count of transient transport hiccups the session swallowed
    # and retried. Watlow has no current transient class (§F of
    # the unified spec — deferred), so this counter stays at zero
    # unless a future transport path increments it. Reset on
    # :meth:`open` / fresh transport.
    self.recoverable_error_count: int = 0

address property

address

Session bus address.

client property

client

The bound protocol client.

Exposed for the watlow-raw escape hatch and for diagnostics that need to issue an unframed wire op outside the registry. Callers must acquire :attr:ProtocolClient.lock before :meth:ProtocolClient.execute to honour the per-port serialization invariant, and must pass this session's :attr:address (or another concrete address for multi-drop diagnostics) to execute.

family property

family

Best-known controller family for this session (delegates to the profile).

last_error property

last_error

Most-recent error context captured by :meth:execute.

port property

port

Transport label (for logs / error context).

profile property

profile

The device profile (family + registry + framing + identity).

protocol_kind property

protocol_kind

The wire protocol this session speaks.

registry property

registry

Parameter registry bound to this session.

Exposed for the streaming layer so polling code can resolve a name / id to a :class:ParameterSpec without an extra import of the module-level :data:PARAMETERS.

availability

availability(command_name)

Cached availability for command_name.

Source code in src/watlowlib/devices/session.py
def availability(self, command_name: str) -> Availability:
    """Cached availability for ``command_name``."""
    return self._availability.get(command_name, Availability.UNKNOWN)

availability_summary

availability_summary()

Return the current command availability cache.

Includes every command the session has dispatched; the snapshot path filters to the UNSUPPORTED entries.

Source code in src/watlowlib/devices/session.py
def availability_summary(self) -> Mapping[str, Availability]:
    """Return the current command availability cache.

    Includes every command the session has dispatched; the
    snapshot path filters to the UNSUPPORTED entries.
    """
    return MappingProxyType(dict(self._availability))

clear_last_error

clear_last_error()

Reset :attr:last_error to None (manual diagnostics aid).

Source code in src/watlowlib/devices/session.py
def clear_last_error(self) -> None:
    """Reset :attr:`last_error` to ``None`` (manual diagnostics aid)."""
    self._last_error = None

comms_unit_label async

comms_unit_label()

Return the value parameter 17050 reports for this session.

Inspection helper, not a source of truth: on at least one PM3 firmware revision 17050 is a label-only register that does not govern the wire scale. :class:Reading.unit is sourced from :meth:wire_temperature_unit instead.

Reads the parameter on first call and caches the result for the lifetime of the session. Returns None when the device rejects the read or reports an unknown code; the cache distinguishes "haven't asked yet" from "asked and got nothing" so a rejection does not cost another wire turn-around.

Invalidated by :meth:invalidate_comms_unit_label (called by :meth:Controller.set_comms_unit_label after a successful write).

Source code in src/watlowlib/devices/session.py
async def comms_unit_label(self) -> Unit | None:
    """Return the value parameter 17050 reports for this session.

    Inspection helper, not a source of truth: on at least one PM3
    firmware revision 17050 is a label-only register that does not
    govern the wire scale. :class:`Reading.unit` is sourced from
    :meth:`wire_temperature_unit` instead.

    Reads the parameter on first call and caches the result for
    the lifetime of the session. Returns ``None`` when the device
    rejects the read or reports an unknown code; the cache
    distinguishes "haven't asked yet" from "asked and got nothing"
    so a rejection does not cost another wire turn-around.

    Invalidated by :meth:`invalidate_comms_unit_label` (called by
    :meth:`Controller.set_comms_unit_label` after a successful
    write).
    """
    if self._comms_unit_label_loaded:
        return self._comms_unit_label
    self._comms_unit_label = await self._fetch_comms_unit_label()
    self._comms_unit_label_loaded = True
    return self._comms_unit_label

dispose

dispose()

Dispose the bound protocol client.

Source code in src/watlowlib/devices/session.py
def dispose(self) -> None:
    """Dispose the bound protocol client."""
    self._client.dispose()

execute async

execute(command, request, *, confirm=False, timeout=None)

Dispatch command with request and return the typed response.

Source code in src/watlowlib/devices/session.py
async def execute[Req, Resp](
    self,
    command: Command[Req, Resp],
    request: Req,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Resp:
    """Dispatch ``command`` with ``request`` and return the typed response."""
    kind = self._client.kind
    # Variant resolution. The session picks the variant matching
    # the bound protocol; one protocol per port (invariant 1).
    # Resolve to a single ``variant`` local so the rest of the
    # method is protocol-agnostic; we still branch on ``kind``
    # for the encode/decode call shapes (stdbus takes ``reply``;
    # modbus takes ``words, ctx, request``).
    if kind is ProtocolKind.STDBUS:
        stdbus_variant = command.stdbus
        if stdbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Std Bus variant",
                context=self._error_context(command, request),
            )
        modbus_variant = None
    elif kind is ProtocolKind.MODBUS_RTU:
        modbus_variant = command.modbus
        if modbus_variant is None:
            raise WatlowProtocolUnsupportedError(
                f"command {command.name!r} has no Modbus variant",
                context=self._error_context(command, request),
            )
        stdbus_variant = None
    else:
        raise WatlowProtocolUnsupportedError(
            f"session has unsupported protocol kind {kind!r}",
            context=self._error_context(command, request),
        )

    # Cache key. We key on ``command_name:parameter_id`` for
    # registry-driven commands so that one ``read_parameter("foo")``
    # rejection doesn't sticky-block every other parameter; bare
    # commands fall back to ``command.name``.
    cache_key = self._cache_key(command, request)

    cached = self._availability.get(cache_key, Availability.UNKNOWN)
    if cached is Availability.UNSUPPORTED:
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} is unsupported on this device",
            context=self._error_context(command, request),
        )

    # Safety gate: PERSISTENT writes need explicit confirm.
    if command.safety is SafetyTier.PERSISTENT and not confirm:
        raise WatlowConfirmationRequiredError(
            f"command {command.name!r} is PERSISTENT and requires confirm=True",
            context=self._error_context(command, request),
        )

    ctx = CommandContext(
        registry=self._registry,
        family=self._family,
        address=self._address,
        port=self._port,
    )

    bound_timeout = timeout if timeout is not None else DEFAULTS.io_timeout_s

    # Encode under the variant. Errors here are pre-I/O — typically
    # validation failures — and should propagate untouched.
    # Exactly one of ``stdbus_variant`` / ``modbus_variant`` is
    # non-None per the resolution above; the type narrowing is
    # explicit so neither mypy nor pyright needs an ``assert`` it
    # can't enforce at runtime under ``-O``.
    wire_request: Any
    if stdbus_variant is not None:
        wire_request = stdbus_variant.encode(ctx, request)
    elif modbus_variant is not None:
        wire_request = modbus_variant.encode(ctx, request)
    else:  # pragma: no cover — variant resolution above guarantees one is set
        raise WatlowProtocolUnsupportedError(
            f"command {command.name!r} variant resolution lost",
            context=self._error_context(command, request),
        )

    started = time.monotonic()
    # Hold the per-port client lock only for the I/O turn-around.
    # Decode is CPU-only and does not need to block the next
    # request waiting on the same RS-485 segment; ``reply`` is
    # snapshotted before the lock releases. ``maybe_acquire``
    # reuses an existing acquisition when the caller (e.g.
    # ``poll_controller``'s tick batch, or
    # ``WatlowManager._run_group``'s port batch) already holds
    # the lock — avoids the FIFO-queue starvation that would
    # otherwise let unrelated writers interleave between a
    # batch's reads.
    async with maybe_acquire(self._client.lock):
        try:
            reply = await self._client.execute(
                wire_request,
                address=self._address,
                timeout=bound_timeout,
                command_name=command.name,
            )
        except (
            WatlowNoSuchObjectError,
            WatlowNoSuchAttributeError,
            WatlowProtocolUnsupportedError,
        ) as exc:
            self._availability[cache_key] = Availability.UNSUPPORTED
            self._record_error(exc)
            _log.warning(
                "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise
        except WatlowProtocolError as exc:
            self._record_error(exc)
            raise
        except WatlowError as exc:
            self._record_error(exc)
            _log.warning(
                "command error: protocol=%s cmd=%s key=%s exc=%s",
                kind.value,
                command.name,
                cache_key,
                exc,
            )
            raise

    # Decode outside the lock — pure compute on the captured reply.
    try:
        if stdbus_variant is not None:
            response = stdbus_variant.decode(reply, ctx)
        else:
            # ``modbus_variant is not None`` per the resolution above;
            # mypy/pyright follow the narrowing without an ``assert``.
            response = modbus_variant.decode(reply, ctx, request)  # type: ignore[union-attr]
    except (
        WatlowNoSuchObjectError,
        WatlowNoSuchAttributeError,
        WatlowProtocolUnsupportedError,
    ) as exc:
        # Decode-side "we don't have this": same availability
        # transition as the wire-side rejection above.
        self._availability[cache_key] = Availability.UNSUPPORTED
        self._record_error(exc)
        _log.warning(
            "command unsupported: protocol=%s cmd=%s key=%s exc=%s",
            kind.value,
            command.name,
            cache_key,
            exc,
        )
        raise
    except WatlowProtocolError as exc:
        # Decode-failure parity with the inside-lock branch above:
        # NoSuchInstance / IllegalDataValue / generic decode errors
        # don't transition availability per design §5b.
        self._record_error(exc)
        raise

    elapsed = time.monotonic() - started
    self._availability[cache_key] = Availability.SUPPORTED
    _log.debug(
        "session exec ok protocol=%s cmd=%s key=%s elapsed=%.4fs",
        kind.value,
        command.name,
        cache_key,
        elapsed,
    )
    return response

invalidate_comms_unit_label

invalidate_comms_unit_label()

Drop the cached 17050 value so the next read re-queries the device.

Source code in src/watlowlib/devices/session.py
def invalidate_comms_unit_label(self) -> None:
    """Drop the cached 17050 value so the next read re-queries the device."""
    self._comms_unit_label = None
    self._comms_unit_label_loaded = False

set_wire_temperature_unit

set_wire_temperature_unit(unit)

Set the wire temperature scale from an authoritative source.

Called by an identity strategy that has read the device's own unit register (e.g. the Series SD reg 18). Unlike the PM path — where the unit register can lie, so the value stays a user assertion — this is the device telling us its comms scale directly, so it is honest to adopt it. Resets the one-shot "trusting user-asserted unit" warning since this value is device-sourced, not user-asserted.

Source code in src/watlowlib/devices/session.py
def set_wire_temperature_unit(self, unit: Unit | None) -> None:
    """Set the wire temperature scale from an authoritative source.

    Called by an identity strategy that has read the device's own
    unit register (e.g. the Series SD reg 18). Unlike the PM path —
    where the unit register can lie, so the value stays a user
    assertion — this is the device telling us its comms scale
    directly, so it is honest to adopt it. Resets the one-shot
    "trusting user-asserted unit" warning since this value is
    device-sourced, not user-asserted.
    """
    self._wire_temperature_unit = unit
    self._wire_temperature_unit_warned = True

wire_temperature_unit

wire_temperature_unit()

Return the user-asserted scale of temperature values on the wire.

This is what :class:Reading.unit / :class:Sample.unit get tagged with for temperature parameters. None when the user did not pass assert_wire_temperature_unit= to :func:watlowlib.open_device; in that case temperature readings carry unit=None rather than guess.

Pure accessor — no I/O. Logs a one-shot WARN the first time an asserted value is consumed so the user-assertion shows up plainly in capture logs.

Source code in src/watlowlib/devices/session.py
def wire_temperature_unit(self) -> Unit | None:
    """Return the user-asserted scale of temperature values on the wire.

    This is what :class:`Reading.unit` / :class:`Sample.unit` get
    tagged with for temperature parameters. ``None`` when the user
    did not pass ``assert_wire_temperature_unit=`` to
    :func:`watlowlib.open_device`; in that case temperature
    readings carry ``unit=None`` rather than guess.

    Pure accessor — no I/O. Logs a one-shot WARN the first time
    an asserted value is consumed so the user-assertion shows up
    plainly in capture logs.
    """
    if self._wire_temperature_unit is not None and not self._wire_temperature_unit_warned:
        _log.warning(
            "trusting user-asserted wire temperature unit %s for port=%s "
            "address=%s; not independently verified by the library "
            "(parameter 17050 is label-only on at least one PM firmware)",
            self._wire_temperature_unit.value,
            self._port,
            self._address,
        )
        self._wire_temperature_unit_warned = True
    return self._wire_temperature_unit

Loops

watlowlib.devices.loop

Per-loop sub-facade returned by :meth:Controller.loop.

A :class:ControllerLoop is a thin view over a :class:Controller that pre-binds an instance argument. It validates the loop number once at construction (cross-cutting invariant 6: 1-indexed everywhere) and forwards every operation to the parent controller's session, threading the loop index as the registry instance.

The sub-facade is stateless beyond the loop number — it does not duplicate the controller's transport, lock, or availability cache. Multiple :class:ControllerLoop instances over the same controller share the underlying session safely; concurrent calls serialize on the protocol client's lock.

This module intentionally has no protocol-specific code. PID, output, and alarm helpers live in :mod:watlowlib.commands.loop and :mod:watlowlib.commands.alarms so the facade-only logic and the parameter aggregation logic stay separate.

ControllerLoop

ControllerLoop(controller, loop_number)

A view over one control loop on a :class:Controller.

Construct via :meth:Controller.loop; never instantiated directly by user code. The sub-facade lives only as long as the parent controller's session — closing the controller is the only cleanup needed.

Source code in src/watlowlib/devices/loop.py
def __init__(self, controller: Controller, loop_number: int) -> None:
    if loop_number < 1:
        raise WatlowValidationError(
            f"loop number must be 1-indexed and >= 1; got {loop_number}",
        )
    # If the controller has identified the device, validate
    # eagerly. Otherwise defer to the registry's per-spec
    # ``validate_instance`` at first call: a registered parameter
    # with ``max_instance=1`` will raise a clear
    # ``WatlowValidationError`` when ``loop(2).read_pv()`` is
    # invoked. That keeps ``Controller.loop(2)`` cheap when called
    # before identify, but still fails before I/O.
    loops = controller.loops
    if loops is not None and loop_number > loops:
        raise WatlowValidationError(
            f"loop {loop_number} out of range for this device (1..{loops})",
        )
    self._controller = controller
    self._loop = loop_number

number property

number

The 1-indexed loop number this view binds.

read_alarms async

read_alarms()

Read the alarm word for this loop.

Currently raises :class:watlowlib.errors.WatlowProtocolUnsupportedError — see :func:watlowlib.commands.alarms.read_alarms for why the decoder is not yet wired up.

Source code in src/watlowlib/devices/loop.py
async def read_alarms(self) -> AlarmState:
    """Read the alarm word for this loop.

    Currently raises :class:`watlowlib.errors.WatlowProtocolUnsupportedError` —
    see :func:`watlowlib.commands.alarms.read_alarms` for why the
    decoder is not yet wired up.
    """
    return await _read_alarms(self._controller.session, instance=self._loop)

read_output async

read_output()

Read this loop's working output (output_power).

Source code in src/watlowlib/devices/loop.py
async def read_output(self) -> Reading:
    """Read this loop's working output (``output_power``)."""
    return await _read_output(self._controller.session, instance=self._loop)

read_pid async

read_pid()

Read every PID gain for this loop. Missing gains return None.

Cool-side gains (cool_proportional_band, dead_band) are skipped when the controller's identified capabilities lack :attr:Capability.HAS_COOLING (e.g. PM output_2 == 'A'). Pre-identify, the gate is permissive.

Source code in src/watlowlib/devices/loop.py
async def read_pid(self) -> PidGains:
    """Read every PID gain for this loop. Missing gains return ``None``.

    Cool-side gains (``cool_proportional_band``, ``dead_band``)
    are skipped when the controller's identified capabilities
    lack :attr:`Capability.HAS_COOLING` (e.g. PM ``output_2 ==
    'A'``). Pre-identify, the gate is permissive.
    """
    return await _read_pid(
        self._controller.session,
        instance=self._loop,
        capabilities=self._controller.capabilities,
    )

read_pv async

read_pv(*, timeout=None)

Read this loop's process value.

Source code in src/watlowlib/devices/loop.py
async def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's process value."""
    return await self._controller.read_pv(instance=self._loop, timeout=timeout)

read_setpoint async

read_setpoint(*, timeout=None)

Read this loop's active setpoint.

Source code in src/watlowlib/devices/loop.py
async def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Read this loop's active setpoint."""
    return await self._controller.read_setpoint(instance=self._loop, timeout=timeout)

set_setpoint async

set_setpoint(value, *, confirm=False, timeout=None)

Write this loop's setpoint (RWES → confirm=True required).

Source code in src/watlowlib/devices/loop.py
async def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Write this loop's setpoint (RWES → ``confirm=True`` required)."""
    return await self._controller.set_setpoint(
        value,
        instance=self._loop,
        confirm=confirm,
        timeout=timeout,
    )

write_pid async

write_pid(gains, *, confirm=False)

Write the supplied gains for this loop.

Persistent — passing confirm=True is required. Fields left None on gains skip the wire entirely. Setting a cool-side field on a controller without :attr:Capability.HAS_COOLING raises :class:watlowlib.errors.WatlowConfigurationError.

Source code in src/watlowlib/devices/loop.py
async def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Write the supplied gains for this loop.

    Persistent — passing ``confirm=True`` is required. Fields
    left ``None`` on ``gains`` skip the wire entirely. Setting a
    cool-side field on a controller without
    :attr:`Capability.HAS_COOLING` raises
    :class:`watlowlib.errors.WatlowConfigurationError`.
    """
    return await _write_pid(
        self._controller.session,
        gains,
        instance=self._loop,
        confirm=confirm,
        capabilities=self._controller.capabilities,
    )

Capability + safety + availability

watlowlib.devices.capability

Three small enums that set the contract between layers.

  • :class:SafetyTier — derived from RWES; gates confirm=True writes.
  • :class:Capability — coarse hardware-feature bitmap. Bits are added when a captured family needs them and existing values stay stable.
  • :class:Availability — per-command session cache state.

This module is leaf — it imports nothing from :mod:watlowlib.devices siblings, so the registry and command layers can pull these enums without an import cycle. See docs/design.md §5b.

Availability

Bases: StrEnum

Per-command session state.

Sticky for the session: once a command transitions to :attr:UNSUPPORTED, the session short-circuits subsequent invocations with a typed error pre-I/O. The transition table lives in docs/design.md §5b.

Capability

Bases: Flag

Coarse hardware capability bits.

Bits are derived from a decoded part number when one is available (see :func:watlowlib.registry.families.capabilities_for_part_number) and fall back to a per-family prior otherwise. The session widens the set at runtime when a command succeeds against a parameter that proves the capability.

The vocabulary is small on purpose — most Watlow gating is by :class:watlowlib.registry.families.ControllerFamily and by :attr:watlowlib.registry.parameters.ParameterSpec.parameter_id, not by per-feature bits. New bits are added when captured family behaviour requires them.

SafetyTier

Bases: IntEnum

How dangerous a command is to invoke.

  • READ_ONLY (R) — no state change.
  • STATEFUL — runtime state change but not EEPROM-backed. Reserved for commands like "start autotune"; no PM parameter maps here today, but the tier exists so future commands have a place to live.
  • PERSISTENT (RW / RWE / RWES) — EEPROM-backed; requires confirm=True at the facade.

capabilities_for_family

capabilities_for_family(family)

Return the capability prior for family.

The session promotes observed capabilities at runtime and the part- number decoder fills in per-SKU bits via :func:watlowlib.registry.families.capabilities_for_part_number. PM is intentionally :attr:Capability.NONE because PM SKUs vary across every dimension (cooling / modbus / profile / comms).

Source code in src/watlowlib/devices/capability.py
def capabilities_for_family(family: ControllerFamily) -> Capability:
    """Return the capability prior for ``family``.

    The session promotes observed capabilities at runtime and the part-
    number decoder fills in per-SKU bits via
    :func:`watlowlib.registry.families.capabilities_for_part_number`.
    PM is intentionally :attr:`Capability.NONE` because PM SKUs vary
    across every dimension (cooling / modbus / profile / comms).
    """
    return _FAMILY_PRIORS.get(family.value, Capability.NONE)

Family classification

watlowlib.devices.kind

Re-export :class:ControllerFamily under :mod:watlowlib.devices.

Callers can import it from either location. The canonical home is :mod:watlowlib.registry.families so the registry layer can construct family enums without depending on :mod:watlowlib.devices.

ControllerFamily

Bases: StrEnum

Watlow controller family discriminator.

Membership here is advisory — :class:watlowlib.devices.session.Session treats family hints as priors, not gates. See docs/design.md §5b.

classify_family

classify_family(part_number)

Return the :class:ControllerFamily for a part-number string.

Only the leading family discriminator is parsed; per-family digit decoding is in :func:decode_part_number.

Source code in src/watlowlib/registry/families.py
def classify_family(part_number: str) -> ControllerFamily:
    """Return the :class:`ControllerFamily` for a part-number string.

    Only the leading family discriminator is parsed; per-family digit
    decoding is in :func:`decode_part_number`.
    """
    head = part_number.strip().upper()
    if head.startswith("PM"):
        return ControllerFamily.PM
    if head.startswith("RM"):
        return ControllerFamily.RM
    if head.startswith("ST"):
        return ControllerFamily.ST
    if head.startswith("F4T"):
        return ControllerFamily.F4T
    if head.startswith("SD"):
        return ControllerFamily.SD
    return ControllerFamily.UNKNOWN

Public dataclasses

watlowlib.devices.models

Public dataclasses returned by the :class:Controller facade.

All frozen, slots=True. py.typed ships.

See docs/design.md §6a.

AlarmState dataclass

AlarmState(loop, high, low, silenced, raw_bits)

Decoded alarm bits for one loop.

DeviceHealth

Bases: StrEnum

Outcome of an :meth:Controller.identify call.

Used by callers (the maintenance verify pass, the configure CLI, discovery rows) to distinguish "the device answered every probe" from "the device answered some probes but not the load-bearing part-number read." Sentinel values stay the same enum across the public API so downstream code can branch on it.

DeviceInfo dataclass

DeviceInfo(
    part_number,
    hardware_id,
    firmware_id,
    serial_number,
    family,
    protocol,
    address,
    capabilities,
    serial_settings,
    loops,
    health=DeviceHealth.OK,
    configured_protocol=None,
)

Identity + connection metadata for an open controller.

Returned by :meth:Controller.identify. Capabilities are decoded from the part number when one is captured (see :func:watlowlib.registry.families.capabilities_for_part_number) and OR-ed with the family prior; unobserved bits stay zero rather than being guessed.

protocol is the wire protocol the host is currently talking; configured_protocol is what the device's persistent EEPROM parameter (PM 17009) reports. They normally match, but when they diverge the helper :attr:protocol_mismatch flags it — useful for catching SKU/firmware combinations where the user wrote a new protocol but the runtime stack didn't pick it up (e.g. comms position-8 = 'A', no Modbus stack present even though 17009 reads 1057).

protocol_mismatch property

protocol_mismatch

True when EEPROM says one protocol and we're talking another.

Always False when :attr:configured_protocol is None (i.e. identify did not query parameter 17009).

DiscoveryResult dataclass

DiscoveryResult(
    ok,
    port,
    address,
    baudrate,
    protocol,
    device_info,
    error,
    elapsed_s,
)

One probe attempt's outcome from :func:find_devices.

Cross-library shape (mirrors :mod:alicatlib, :mod:sartoriuslib, :mod:nidaqlib) so GUI Discover dialogs and capa-style adapters can filter responsive vs silent rows on a single attribute and consume the same field set across vendors.

A populated :attr:device_info carries the full :class:Controller.identify result, including :attr:DeviceInfo.health and (when the scan queried it) :attr:DeviceInfo.configured_protocol.

The address field is typed str | int | None to match the cross-library spec; in practice every watlow probe carries an int.

LoopState dataclass

LoopState(
    loop, pv, setpoint, output_pct, raw=(lambda: {})()
)

Snapshot of one loop. Composed from several reads.

ParameterEntry dataclass

ParameterEntry(spec, instance, value, raw)

Generic registry-driven read/write result.

Returned by :data:watlowlib.commands.READ_PARAMETER and :data:watlowlib.commands.WRITE_PARAMETER. The :class:Controller translates an entry into a :class:Reading / :class:PartNumber / etc. when the public API guarantees a richer shape.

PartNumber dataclass

PartNumber(raw, family, details=(lambda: {})())

Parsed part-number string returned by read_part_number.

Per-family digit decoding is contributed by :mod:watlowlib.registry.families. Decoded fragments live in :attr:details as a free-form mapping so each family can populate only what its ordering format defines, and so adding fragments to the PM decoder later is non-breaking.

The EZ-ZONE PM decoder populates case size, control type, power input, three output codes, and options string. Other families fall through to a stub: only :attr:family is set, and :attr:details is empty.

Reading dataclass

Reading(
    value, unit, received_at, monotonic_ns, raw, protocol
)

A single timestamped value from the controller.

protocol is set by the variant decoder, not by the facade — it reflects which wire protocol produced the value (per docs/design.md invariant 7).

Factory

watlowlib.devices.factory

open_device — single entry point for opening a controller.

Honours :attr:ProtocolKind.STDBUS, :attr:ProtocolKind.MODBUS_RTU, and :attr:ProtocolKind.AUTO (Std Bus probe → Modbus probe → fail). The detector itself lives in :mod:watlowlib.protocol.detect; the factory only orchestrates.

The factory does not sweep bauds — the user sets one. See docs/design.md §7 for why baud sweeping is opt-in via the watlow-discover CLI rather than the open path.

coerce_wire_temperature_unit

coerce_wire_temperature_unit(value)

Normalise the assert_wire_temperature_unit kwarg.

Accepts a :class:Unit, a case-insensitive alias, or None. Rejects :attr:Unit.PERCENT pre-I/O — a temperature scale must be °C or °F.

Source code in src/watlowlib/devices/factory.py
def coerce_wire_temperature_unit(value: Unit | str | None) -> Unit | None:
    """Normalise the ``assert_wire_temperature_unit`` kwarg.

    Accepts a :class:`Unit`, a case-insensitive alias, or ``None``.
    Rejects :attr:`Unit.PERCENT` pre-I/O — a temperature scale must
    be °C or °F.
    """
    from watlowlib.errors import WatlowValidationError  # noqa: PLC0415 — cold path

    if value is None:
        return None
    resolved = coerce_unit(value)
    if resolved is Unit.PERCENT:
        raise WatlowValidationError(
            "assert_wire_temperature_unit accepts CELSIUS / FAHRENHEIT only; "
            "PERCENT is not a temperature scale",
        )
    return resolved

open_device async

open_device(
    port,
    *,
    profile=EZZONE_PROFILE,
    protocol=None,
    address=1,
    serial_settings=None,
    assert_wire_temperature_unit=None,
    identify=True,
)

Open a controller on a serial port.

Parameters:

Name Type Description Default
port str

Serial-port path (/dev/ttyUSB0, COM3, ...).

required
profile DeviceProfile

The device profile to open against. Defaults to :data:~watlowlib.devices.profile.EZZONE_PROFILE (EZ-ZONE PM), which preserves all historical behaviour. Pass :data:~watlowlib.devices.profile.SERIES_SD_PROFILE for a Series SD. The profile supplies the default protocol, factory serial framing, parameter registry, identity strategy, and (for the SD) the wire temperature unit.

EZZONE_PROFILE
protocol ProtocolKind | None

Wire protocol. None (default) adopts profile.default_protocol (Std Bus for PM, Modbus RTU for SD). STDBUS / MODBUS_RTU open directly; AUTO runs the conservative detector (Std Bus → Modbus → fail) per docs/design.md §7.

None
address int

Bus address. Std Bus accepts 1..16; Modbus RTU accepts 1..247. Under AUTO the same address is tried against both probes.

1
serial_settings SerialSettings | None

Optional framing override. None (default) adopts profile.default_serial with port applied — 38400 8-N-1 for the EZ-ZONE PM Std Bus profile, 9600 8-N-1 for the Series SD profile. An explicit :class:SerialSettings still has its port forced to the positional port arg. There is no baud sweeping in the open path (cross-cutting invariant 5).

None
identify bool

When True (default), :meth:Controller.identify runs after the transport opens so :meth:Controller.snapshot renders without further wire I/O. Set False for the fast-path open scenarios where caller code drives identity itself or wants the open to return immediately.

True
assert_wire_temperature_unit Unit | str | None

User-asserted scale of temperature values on the wire. Sets :class:Reading.unit / :class:Sample.unit for temperature parameters. Accepts a :class:Unit or a case-insensitive string alias ("C", "F", "celsius", "degF", "°C", ...). :attr:Unit.PERCENT is rejected. None (the default) means temperature readings carry unit=None. The library does not infer this from parameter 17050 — on at least one PM3 firmware 17050 is a label-only register and would silently mis-tag. Verify the actual scale externally — the bundled watlow-diag probe-unit CLI automates the comparison against a known panel reading; see docs/devices.md §Units — before asserting it here.

None

Returns:

Type Description
Controller

An opened :class:Controller whose transport is ready for

Controller

meth:Controller.poll / :meth:Controller.poll_many calls.

Controller

Every protocol (STDBUS, MODBUS_RTU, AUTO) returns

Controller

an opened controller; __aenter__ is a no-op and

Controller

__aexit__ closes the transport.

Raises:

Type Description
WatlowConfigurationError

address is out of range or protocol is unsupported.

WatlowValidationError

assert_wire_temperature_unit is :attr:Unit.PERCENT or an unrecognised alias.

WatlowProtocolUnsupportedError

protocol=AUTO and both probes failed.

Source code in src/watlowlib/devices/factory.py
async def open_device(
    port: str,
    *,
    profile: DeviceProfile = EZZONE_PROFILE,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    assert_wire_temperature_unit: Unit | str | None = None,
    identify: bool = True,
) -> Controller:
    """Open a controller on a serial port.

    Args:
        port: Serial-port path (``/dev/ttyUSB0``, ``COM3``, ...).
        profile: The device profile to open against. Defaults to
            :data:`~watlowlib.devices.profile.EZZONE_PROFILE` (EZ-ZONE
            PM), which preserves all historical behaviour. Pass
            :data:`~watlowlib.devices.profile.SERIES_SD_PROFILE` for a
            Series SD. The profile supplies the default protocol,
            factory serial framing, parameter registry, identity
            strategy, and (for the SD) the wire temperature unit.
        protocol: Wire protocol. ``None`` (default) adopts
            ``profile.default_protocol`` (Std Bus for PM, Modbus RTU for
            SD). ``STDBUS`` / ``MODBUS_RTU`` open directly; ``AUTO``
            runs the conservative detector (Std Bus → Modbus → fail)
            per ``docs/design.md`` §7.
        address: Bus address. Std Bus accepts ``1..16``; Modbus RTU
            accepts ``1..247``. Under ``AUTO`` the same address is
            tried against both probes.
        serial_settings: Optional framing override. ``None`` (default)
            adopts ``profile.default_serial`` with ``port`` applied —
            **38400 8-N-1** for the EZ-ZONE PM Std Bus profile, **9600
            8-N-1** for the Series SD profile. An explicit
            :class:`SerialSettings` still has its ``port`` forced to the
            positional ``port`` arg. There is no baud sweeping in the
            open path (cross-cutting invariant 5).
        identify: When ``True`` (default), :meth:`Controller.identify`
            runs after the transport opens so :meth:`Controller.snapshot`
            renders without further wire I/O. Set ``False`` for the
            fast-path open scenarios where caller code drives identity
            itself or wants the open to return immediately.
        assert_wire_temperature_unit: User-asserted scale of
            temperature values on the wire. Sets
            :class:`Reading.unit` / :class:`Sample.unit` for
            temperature parameters. Accepts a :class:`Unit` or a
            case-insensitive string alias (``"C"``, ``"F"``,
            ``"celsius"``, ``"degF"``, ``"°C"``, ...).
            :attr:`Unit.PERCENT` is rejected. ``None`` (the default)
            means temperature readings carry ``unit=None``. The
            library does **not** infer this from parameter 17050 —
            on at least one PM3 firmware 17050 is a label-only
            register and would silently mis-tag. Verify the actual
            scale externally — the bundled
            ``watlow-diag probe-unit`` CLI automates the comparison
            against a known panel reading; see ``docs/devices.md``
            §Units — before asserting it here.

    Returns:
        An *opened* :class:`Controller` whose transport is ready for
        :meth:`Controller.poll` / :meth:`Controller.poll_many` calls.
        Every protocol (``STDBUS``, ``MODBUS_RTU``, ``AUTO``) returns
        an opened controller; ``__aenter__`` is a no-op and
        ``__aexit__`` closes the transport.

    Raises:
        WatlowConfigurationError: ``address`` is out of range or
            ``protocol`` is unsupported.
        WatlowValidationError: ``assert_wire_temperature_unit`` is
            :attr:`Unit.PERCENT` or an unrecognised alias.
        WatlowProtocolUnsupportedError: ``protocol=AUTO`` and both
            probes failed.
    """
    # ``protocol=None`` adopts the profile's factory protocol (Std Bus
    # for EZ-ZONE PM, Modbus RTU for Series SD).
    resolved_protocol = protocol if protocol is not None else profile.default_protocol
    if resolved_protocol not in (
        ProtocolKind.STDBUS,
        ProtocolKind.MODBUS_RTU,
        ProtocolKind.AUTO,
    ):
        raise WatlowConfigurationError(
            f"unsupported protocol kind: {resolved_protocol!r}",
            context=ErrorContext(port=port),
        )

    # ``assert_wire_temperature_unit`` overrides the profile default
    # (the SD knows it speaks °F; the PM contract is "user must assert").
    wire_unit = (
        profile.wire_temperature_unit
        if assert_wire_temperature_unit is None
        else coerce_wire_temperature_unit(assert_wire_temperature_unit)
    )

    # ``serial_settings=None`` adopts the profile's factory framing
    # (port applied from the positional arg). An explicit override still
    # has ``port`` forced to the positional arg to avoid silent surprise.
    if serial_settings is None:
        settings = replace(profile.default_serial, port=port)
    elif serial_settings.port != port:
        settings = replace(serial_settings, port=port)
    else:
        settings = serial_settings

    if resolved_protocol is ProtocolKind.AUTO:
        # Lazy import — keep the Std-Bus-only callers off the anymodbus
        # dep graph until they actually opt in to AUTO.
        from watlowlib.protocol.detect import detect_protocol  # noqa: PLC0415

        resolved = await detect_protocol(
            port,
            address=address,
            serial_settings=settings,
        )
        # Detector returned an *open* transport already paired with the
        # right client; build the controller around them and skip
        # ``Controller.__aenter__``'s open() (it short-circuits when
        # ``transport.is_open`` is already True).
        session = Session(
            resolved.client,
            profile=profile,
            address=address,
            port=resolved.transport.label,
            wire_temperature_unit=wire_unit,
        )
        controller = Controller(session, resolved.transport, serial_settings=settings)
        if identify:
            await controller.identify()
        return controller

    transport: Transport
    if resolved_protocol is ProtocolKind.MODBUS_RTU:
        # Lazy import — keep the Std-Bus path off the anymodbus dep
        # graph for users who never reach for Modbus.
        from watlowlib.protocol.modbus.transport import (  # noqa: PLC0415
            ModbusBusTransport,
        )

        transport = ModbusBusTransport(settings)
    else:
        transport = SerialTransport(settings)
    controller = await _open_controller(
        transport,
        profile=profile,
        protocol=resolved_protocol,
        address=address,
        serial_settings=settings,
        wire_temperature_unit=wire_unit,
    )
    if identify:
        await controller.identify()
    return controller

Discovery

watlowlib.devices.discovery

Port-scan discovery for watlow-discover and capa-style adapters.

:func:find_devices is the single discovery entry point. It walks the cartesian product of ports × baudrates × protocols × addresses and returns one :class:DiscoveryResult per probe attempt — same shape the sibling libraries (alicat, sartorius, nidaq) emit, so a GUI Discover dialog can filter on a single ok flag.

The scan is read-only: every probe is a bounded :meth:Controller.identify call. No setpoint writes, no parameter writes, no comms-unit-label probes — discovery routinely runs on rigs that already have other software talking to the same controller.

Per-(port, baudrate, protocol) combination, the transport is opened once and every address is probed against the same handle. Standard Bus addresses live in the BACnet MS/TP outer-frame dst MAC byte; the Modbus bus driver multiplexes slaves over a single open serial handle. Reopening per address would add ~0.5 s of cdc_acm re-init per probe on Linux for no benefit.

If a port fails to open at all, it is marked dead for the rest of the scan: every subsequent (baudrate × protocol) combination for that port short-circuits with a single :class:DiscoveryResult per planned address carrying the open error. This avoids hammering a port that anyserial listed but the kernel won't let us touch.

The default scan is narrow on purpose:

  • addresses defaults to (1,) — Modbus RTU allows 1..247, but a multi-port × multi-baud × multi-protocol scan with a 16-address default explodes the probe count and the wall-clock for a GUI scan past what operators tolerate. Callers that need a full address sweep pass addresses=range(1, 248) explicitly.
  • baudrates defaults to (38400, 19200, 9600) — the EZ-ZONE PM ships at 38400 on Std Bus and 9600 on Modbus RTU; 19200 covers rigs that have been re-configured to the middle baud.
  • protocols defaults to (STDBUS, MODBUS_RTU) — both Watlow wire protocols on serial. AUTO is not in the default set; it would double the open count per (port, baudrate) and the detector is intentionally a single-port API.

find_devices async

find_devices(
    *,
    ports=None,
    addresses=None,
    baudrates=None,
    protocols=None,
    profiles=None,
    serial_template=None,
    per_probe_timeout_s=_DEFAULT_PROBE_TIMEOUT_S,
)

Probe local serial ports for Watlow controllers.

Parameters:

Name Type Description Default
ports Sequence[str] | None

Serial-port paths to scan. None enumerates every visible port via :func:anyserial.list_serial_ports. An empty sequence returns [] without enumeration.

None
addresses Sequence[int] | None

Bus addresses to probe per (port, baudrate, protocol) combination. Defaults to :data:DEFAULT_DISCOVERY_ADDRESSES ((1,)). Std Bus accepts 1..16; Modbus RTU accepts 1..247. Out-of-range addresses for a given protocol are still emitted as ok=False rows carrying a :class:WatlowConfigurationError.

None
baudrates Sequence[int] | None

Baud rates to try. Defaults to :data:DEFAULT_DISCOVERY_BAUDRATES.

None
protocols Sequence[ProtocolKind] | None

Wire protocols to probe. Defaults to :data:DEFAULT_DISCOVERY_PROTOCOLS. ProtocolKind.AUTO is not accepted here (one row per concrete probe). Ignored when profiles is given.

None
profiles Sequence[DeviceProfile] | None

Device profiles to probe. When given, discovery iterates profiles instead of protocols — each profile contributes its own default_protocol, factory serial framing (so the Series SD's 8-N-1 is used, not the PM Modbus 8-E-1), parameter registry, and identity strategy. Pass :data:~watlowlib.devices.profile.DEVICE_PROFILES to sweep for every known device type (PM over Std Bus + SD over Modbus). None (default) keeps the historical protocol-centric scan against the EZ-ZONE PM profile.

None
serial_template SerialSettings | None

Optional :class:SerialSettings whose parity / bytesize / stopbits / rtscts / xonxoff / exclusive fields override the per-protocol factory framing for every probe. port and baudrate are always overwritten per iteration.

None
per_probe_timeout_s float

Per-probe budget. Bounds the :meth:Controller.identify call (four bounded sub-reads) so a silent address bails after one round-trip rather than four. Defaults to 0.5 — a four-port × three-baud × two-protocol scan with one address per combo lands in ~12 s of wall-clock.

_DEFAULT_PROBE_TIMEOUT_S

Returns:

Name Type Description
One list[DiscoveryResult]

class:DiscoveryResult per (port × baudrate × protocol ×

list[DiscoveryResult]

address) tuple, in input order. The cartesian product is

list[DiscoveryResult]

iterated outermost-port, then baudrate, then protocol, then

list[DiscoveryResult]

address — same input → same output ordering.

Raises:

Type Description
WatlowConfigurationError

protocols contains :attr:ProtocolKind.AUTO, or per_probe_timeout_s is non-positive.

Notes
  • Read-only. Discovery never writes to the device; it only calls :meth:Controller.identify (four parameter reads). Safe to run on rigs that already have other software talking to the controller.
  • Per-port short-circuit. If a port fails to open with a :class:WatlowConnectionError, the rest of the scan for that port emits ok=False rows without re-attempting the open. This avoids hammering a port the kernel won't give us.
Source code in src/watlowlib/devices/discovery.py
async def find_devices(
    *,
    ports: Sequence[str] | None = None,
    addresses: Sequence[int] | None = None,
    baudrates: Sequence[int] | None = None,
    protocols: Sequence[ProtocolKind] | None = None,
    profiles: Sequence[DeviceProfile] | None = None,
    serial_template: SerialSettings | None = None,
    per_probe_timeout_s: float = _DEFAULT_PROBE_TIMEOUT_S,
) -> list[DiscoveryResult]:
    """Probe local serial ports for Watlow controllers.

    Args:
        ports: Serial-port paths to scan. ``None`` enumerates every
            visible port via :func:`anyserial.list_serial_ports`. An
            empty sequence returns ``[]`` without enumeration.
        addresses: Bus addresses to probe per (port, baudrate, protocol)
            combination. Defaults to :data:`DEFAULT_DISCOVERY_ADDRESSES`
            (``(1,)``). Std Bus accepts ``1..16``; Modbus RTU accepts
            ``1..247``. Out-of-range addresses for a given protocol are
            still emitted as ``ok=False`` rows carrying a
            :class:`WatlowConfigurationError`.
        baudrates: Baud rates to try. Defaults to
            :data:`DEFAULT_DISCOVERY_BAUDRATES`.
        protocols: Wire protocols to probe. Defaults to
            :data:`DEFAULT_DISCOVERY_PROTOCOLS`. ``ProtocolKind.AUTO``
            is not accepted here (one row per concrete probe). Ignored
            when ``profiles`` is given.
        profiles: Device profiles to probe. When given, discovery
            iterates profiles instead of ``protocols`` — each profile
            contributes its own ``default_protocol``, factory serial
            framing (so the Series SD's 8-N-1 is used, not the PM
            Modbus 8-E-1), parameter registry, and identity strategy.
            Pass :data:`~watlowlib.devices.profile.DEVICE_PROFILES` to
            sweep for every known device type (PM over Std Bus + SD over
            Modbus). ``None`` (default) keeps the historical
            protocol-centric scan against the EZ-ZONE PM profile.
        serial_template: Optional :class:`SerialSettings` whose
            ``parity`` / ``bytesize`` / ``stopbits`` / ``rtscts`` /
            ``xonxoff`` / ``exclusive`` fields override the
            per-protocol factory framing for every probe. ``port``
            and ``baudrate`` are always overwritten per iteration.
        per_probe_timeout_s: Per-probe budget. Bounds the
            :meth:`Controller.identify` call (four bounded sub-reads)
            so a silent address bails after one round-trip rather than
            four. Defaults to ``0.5`` — a four-port × three-baud ×
            two-protocol scan with one address per combo lands in
            ~12 s of wall-clock.

    Returns:
        One :class:`DiscoveryResult` per (port × baudrate × protocol ×
        address) tuple, in input order. The cartesian product is
        iterated outermost-port, then baudrate, then protocol, then
        address — same input → same output ordering.

    Raises:
        WatlowConfigurationError: ``protocols`` contains
            :attr:`ProtocolKind.AUTO`, or ``per_probe_timeout_s`` is
            non-positive.

    Notes:
        - **Read-only.** Discovery never writes to the device; it
          only calls :meth:`Controller.identify` (four parameter
          reads). Safe to run on rigs that already have other
          software talking to the controller.
        - **Per-port short-circuit.** If a port fails to open with a
          :class:`WatlowConnectionError`, the rest of the scan for
          that port emits ``ok=False`` rows without re-attempting the
          open. This avoids hammering a port the kernel won't give us.
    """
    if per_probe_timeout_s <= 0:
        from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415 — cold path

        raise WatlowConfigurationError(
            f"per_probe_timeout_s must be positive; got {per_probe_timeout_s!r}",
        )

    resolved_ports = await _resolve_ports(ports)
    resolved_addresses = tuple(addresses) if addresses is not None else DEFAULT_DISCOVERY_ADDRESSES
    resolved_baudrates = tuple(baudrates) if baudrates is not None else DEFAULT_DISCOVERY_BAUDRATES

    # Build the per-probe plan: one entry per (protocol, profile,
    # framing_base). ``profiles`` (if given) drives the device type,
    # protocol, and factory framing per probe; otherwise we keep the
    # historical protocol-centric scan against the EZ-ZONE PM profile,
    # taking framing from ``SerialSettings.factory_for`` (None below).
    plan: tuple[tuple[ProtocolKind, DeviceProfile, SerialSettings | None], ...]
    if profiles is not None:
        plan = tuple((p.default_protocol, p, p.default_serial) for p in profiles)
    else:
        resolved_protocols = (
            tuple(protocols) if protocols is not None else DEFAULT_DISCOVERY_PROTOCOLS
        )
        if ProtocolKind.AUTO in resolved_protocols:
            from watlowlib.errors import WatlowConfigurationError  # noqa: PLC0415 — cold path

            raise WatlowConfigurationError(
                "find_devices does not accept ProtocolKind.AUTO; pass concrete "
                "protocols (STDBUS, MODBUS_RTU). Auto-detection is a single-port "
                "API on open_device.",
            )
        plan = tuple((protocol, EZZONE_PROFILE, None) for protocol in resolved_protocols)

    results: list[DiscoveryResult] = []
    dead_ports: set[str] = set()
    for port in resolved_ports:
        for baud in resolved_baudrates:
            for protocol, profile, framing_base in plan:
                if port in dead_ports:
                    # Emit one row per planned address so callers see
                    # the same cartesian-product shape regardless of
                    # whether the port opened.
                    error = WatlowConnectionError(
                        f"port {port!r} previously failed to open in this scan",
                        context=ErrorContext(
                            port=port,
                            protocol=protocol,
                        ),
                    )
                    results.extend(
                        DiscoveryResult(
                            ok=False,
                            port=port,
                            address=address,
                            baudrate=baud,
                            protocol=protocol,
                            device_info=None,
                            error=error,
                            elapsed_s=0.0,
                        )
                        for address in resolved_addresses
                    )
                    continue

                settings = _build_settings(
                    port=port,
                    baudrate=baud,
                    protocol=protocol,
                    template=serial_template,
                    framing_base=framing_base,
                )
                rows, port_died = await _probe_combo(
                    port=port,
                    baudrate=baud,
                    protocol=protocol,
                    profile=profile,
                    addresses=resolved_addresses,
                    serial_settings=settings,
                    timeout_s=per_probe_timeout_s,
                )
                results.extend(rows)
                if port_died:
                    dead_ports.add(port)
    return results