Skip to content

watlowlib.sync

Sync facade over the async core. Every async method has a sync parity routed through a SyncPortal (an anyio.from_thread.BlockingPortal). See Sync quickstart.

Public surface

watlowlib.sync

Sync facade — blocking wrappers over the async core.

The sync surface targets scripts, notebooks, and REPL use. The async core remains canonical; every sync facade routes coroutines through a :class:SyncPortal (an :class:anyio.from_thread.BlockingPortal wrapper) so the event loop runs on a background thread.

What ships here:

  • :class:SyncPortal — single dispatch primitive used by the rest of the sync facade.
  • :class:Watlow / :class:SyncController — sync mirror of :class:~watlowlib.devices.controller.Controller.
  • :class:SyncWatlowManager — sync mirror of :class:~watlowlib.manager.WatlowManager.
  • :func:record / :func:pipe — sync mirrors of the streaming primitives.
  • :class:SyncSinkAdapter + per-sink wrappers (SyncCsvSink, SyncJsonlSink, SyncSqliteSink, SyncInMemorySink, SyncParquetSink, SyncPostgresSink).

Design reference: docs/design.md §6 (sync portal).

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/watlowlib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncController

SyncController(controller, portal)

Blocking facade over :class:watlowlib.devices.controller.Controller.

Instances are produced by :meth:Watlow.open or yielded by the sync manager; users do not call this constructor directly.

Source code in src/watlowlib/sync/controller.py
def __init__(self, controller: Controller, portal: SyncPortal) -> None:
    self._ctl = controller
    self._portal = portal

loops property

loops

Cached loop count — passes through :attr:Controller.loops.

portal property

portal

The :class:SyncPortal this controller routes coroutines through.

session property

session

Underlying async :class:Session (advanced escape-hatch).

close

close()

Blocking :meth:Controller.close. Idempotent.

Source code in src/watlowlib/sync/controller.py
def close(self) -> None:
    """Blocking :meth:`Controller.close`. Idempotent."""
    if not self._portal.running:
        return
    self._portal.call(self._ctl.close)

identify

identify(*, timeout=None)

Blocking :meth:Controller.identify.

Source code in src/watlowlib/sync/controller.py
def identify(self, *, timeout: float | None = None) -> DeviceInfo:
    """Blocking :meth:`Controller.identify`."""
    return self._portal.call(self._ctl.identify, timeout=timeout)

loop

loop(n)

Return a sync sub-facade bound to loop n (1-indexed).

Source code in src/watlowlib/sync/controller.py
def loop(self, n: int) -> SyncControllerLoop:
    """Return a sync sub-facade bound to loop ``n`` (1-indexed)."""
    return SyncControllerLoop(self._ctl.loop(n), self._portal)

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:Controller.poll_many.

Source code in src/watlowlib/sync/controller.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`Controller.poll_many`."""
    return self._portal.call(
        self._ctl.poll_many,
        parameters,
        names=names,
        instances=instances,
    )

read_comms_unit_label

read_comms_unit_label(*, timeout=None)

Blocking :meth:Controller.read_comms_unit_label.

Source code in src/watlowlib/sync/controller.py
def read_comms_unit_label(self, *, timeout: float | None = None) -> Unit | None:
    """Blocking :meth:`Controller.read_comms_unit_label`."""
    return self._portal.call(self._ctl.read_comms_unit_label, timeout=timeout)

read_parameter

read_parameter(name_or_id, *, instance=1, timeout=None)

Blocking :meth:Controller.read_parameter.

Source code in src/watlowlib/sync/controller.py
def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.read_parameter`."""
    return self._portal.call(
        self._ctl.read_parameter,
        name_or_id,
        instance=instance,
        timeout=timeout,
    )

read_pv

read_pv(*, instance=1, timeout=None)

Blocking :meth:Controller.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_pv`."""
    return self._portal.call(self._ctl.read_pv, instance=instance, timeout=timeout)

read_setpoint

read_setpoint(*, instance=1, timeout=None)

Blocking :meth:Controller.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_setpoint`."""
    return self._portal.call(self._ctl.read_setpoint, instance=instance, timeout=timeout)

set_comms_unit_label

set_comms_unit_label(unit, *, confirm=False, timeout=None)

Blocking :meth:Controller.set_comms_unit_label.

Source code in src/watlowlib/sync/controller.py
def set_comms_unit_label(
    self,
    unit: Unit | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Unit | None:
    """Blocking :meth:`Controller.set_comms_unit_label`."""
    return self._portal.call(
        self._ctl.set_comms_unit_label,
        unit,
        confirm=confirm,
        timeout=timeout,
    )

set_setpoint

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Blocking :meth:Controller.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`Controller.set_setpoint`."""
    return self._portal.call(
        self._ctl.set_setpoint,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

snapshot

snapshot(*, name=None)

Blocking :meth:Controller.snapshot.

Source code in src/watlowlib/sync/controller.py
def snapshot(self, *, name: str | None = None) -> WatlowDeviceSnapshot:
    """Blocking :meth:`Controller.snapshot`."""
    return self._portal.call(self._ctl.snapshot, name=name)

write_parameter

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Blocking :meth:Controller.write_parameter.

Source code in src/watlowlib/sync/controller.py
def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.write_parameter`."""
    return self._portal.call(
        self._ctl.write_parameter,
        name_or_id,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

SyncControllerLoop

SyncControllerLoop(async_loop, portal)

Blocking view over a single control loop (mirror of :class:ControllerLoop).

Returned by :meth:SyncController.loop; never instantiated directly. Lifetime is bound to the parent :class:SyncController and its portal — closing the controller is the only cleanup needed.

Source code in src/watlowlib/sync/controller.py
def __init__(self, async_loop: ControllerLoop, portal: SyncPortal) -> None:
    self._loop = async_loop
    self._portal = portal

number property

number

The 1-indexed loop number this view binds.

read_alarms

read_alarms()

Blocking :meth:ControllerLoop.read_alarms.

Source code in src/watlowlib/sync/controller.py
def read_alarms(self) -> AlarmState:
    """Blocking :meth:`ControllerLoop.read_alarms`."""
    return self._portal.call(self._loop.read_alarms)

read_output

read_output()

Blocking :meth:ControllerLoop.read_output.

Source code in src/watlowlib/sync/controller.py
def read_output(self) -> Reading:
    """Blocking :meth:`ControllerLoop.read_output`."""
    return self._portal.call(self._loop.read_output)

read_pid

read_pid()

Blocking :meth:ControllerLoop.read_pid.

Source code in src/watlowlib/sync/controller.py
def read_pid(self) -> PidGains:
    """Blocking :meth:`ControllerLoop.read_pid`."""
    return self._portal.call(self._loop.read_pid)

read_pv

read_pv(*, timeout=None)

Blocking :meth:ControllerLoop.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_pv`."""
    return self._portal.call(self._loop.read_pv, timeout=timeout)

read_setpoint

read_setpoint(*, timeout=None)

Blocking :meth:ControllerLoop.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_setpoint`."""
    return self._portal.call(self._loop.read_setpoint, timeout=timeout)

set_setpoint

set_setpoint(value, *, confirm=False, timeout=None)

Blocking :meth:ControllerLoop.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`ControllerLoop.set_setpoint`."""
    return self._portal.call(
        self._loop.set_setpoint,
        value,
        confirm=confirm,
        timeout=timeout,
    )

write_pid

write_pid(gains, *, confirm=False)

Blocking :meth:ControllerLoop.write_pid.

Source code in src/watlowlib/sync/controller.py
def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Blocking :meth:`ControllerLoop.write_pid`."""
    return self._portal.call(self._loop.write_pid, gains, confirm=confirm)

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.csv.CsvSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.jsonl.JsonlSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.parquet.ParquetSink.

Requires the watlowlib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)

Source code in src/watlowlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped.

Source code in src/watlowlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/watlowlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/watlowlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.postgres.PostgresSink.

Requires the watlowlib[postgres] extra — dependency check runs on :meth:open.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncRecording dataclass

SyncRecording(stream, summary, rate_hz)

Sync mirror of :class:watlowlib.streaming.Recording.

The async producer owns summary; reading the mutable dataclass attributes from the calling thread is safe — attribute reads on a plain Python dataclass are atomic, and the recorder is the only writer.

Attributes:

Name Type Description
stream Iterator[Sequence[Sample]]

Blocking iterator over per-tick :class:Sample batches. Iterating drives the async receive stream via the portal.

summary AcquisitionSummary

Live :class:AcquisitionSummary. finished_at is None while running and set on CM exit.

rate_hz float

The cadence the recorder is running at.

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

close

close()

Blocking :meth:SampleSink.close — idempotent.

Source code in src/watlowlib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent."""
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/watlowlib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/watlowlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.sqlite.SqliteSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )

SyncWatlowManager

SyncWatlowManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:watlowlib.manager.WatlowManager.

Source code in src/watlowlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: WatlowManager | None = None
    self._wrapped: dict[str, SyncController] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed controller names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

add

add(
    name,
    source,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
    profile=EZZONE_PROFILE,
    assert_wire_temperature_unit=None,
)

Blocking :meth:WatlowManager.add.

Accepts a :class:SyncController as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Controller before delegation.

Source code in src/watlowlib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncController | Controller | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    profile: DeviceProfile = EZZONE_PROFILE,
    assert_wire_temperature_unit: Unit | str | None = None,
) -> SyncController:
    """Blocking :meth:`WatlowManager.add`.

    Accepts a :class:`SyncController` as ``source`` in addition to
    the async shapes — the wrapper is unwrapped to the underlying
    :class:`Controller` before delegation.
    """
    mgr = self._require_mgr()
    async_source: Controller | str | Transport = unwrap_sync_controller(source)
    async_controller = self.portal.call(
        mgr.add,
        name,
        async_source,
        protocol=protocol,
        address=address,
        serial_settings=serial_settings,
        profile=profile,
        assert_wire_temperature_unit=assert_wire_temperature_unit,
    )
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:WatlowManager.close — idempotent.

Source code in src/watlowlib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`WatlowManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute_each

execute_each(op, names=None)

Blocking :meth:WatlowManager.execute_each.

op receives the async :class:Controller so existing coroutines compose. If you have a sync helper, wrap it in an async stub or run it on the portal yourself.

Source code in src/watlowlib/sync/manager.py
def execute_each[T](
    self,
    op: Callable[[Controller], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Blocking :meth:`WatlowManager.execute_each`.

    ``op`` receives the **async** :class:`Controller` so existing
    coroutines compose. If you have a sync helper, wrap it in an
    async stub or run it on the portal yourself.
    """
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute_each, op, names)

get

get(name)

Return the sync wrapper for the controller registered under name.

Source code in src/watlowlib/sync/manager.py
def get(self, name: str) -> SyncController:
    """Return the sync wrapper for the controller registered under ``name``."""
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_controller = mgr.get(name)
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:WatlowManager.poll_many.

Source code in src/watlowlib/sync/manager.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`WatlowManager.poll_many`."""
    mgr = self._require_mgr()
    return self.portal.call(
        mgr.poll_many,
        parameters,
        names=names,
        instances=instances,
    )

record_to_sink

record_to_sink(
    *,
    parameters,
    rate_hz,
    duration=None,
    sink,
    names=None,
    instances=(1,),
    overflow=None,
    buffer_size=64,
    batch_size=64,
    flush_interval=1.0,
)

Record polled samples directly into a sink — one-call convenience.

Combines :func:watlowlib.sync.record and :func:watlowlib.sync.pipe into a single blocking call. The manager's portal is reused for both legs so the recorder and the sink share an event loop. sink may be either a :class:SyncSinkAdapter (preferred, opened externally) or a bare async :class:SampleSink — in the latter case this method opens the sink against the manager's portal and closes it after the recording finishes.

Returns the :class:AcquisitionSummary from :func:watlowlib.sync.pipe.

Source code in src/watlowlib/sync/manager.py
def record_to_sink(
    self,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    sink: SyncSinkAdapter | SampleSink,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 64,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    """Record polled samples directly into a sink — one-call convenience.

    Combines :func:`watlowlib.sync.record` and
    :func:`watlowlib.sync.pipe` into a single blocking call. The
    manager's portal is reused for both legs so the recorder and
    the sink share an event loop. ``sink`` may be either a
    :class:`SyncSinkAdapter` (preferred, opened externally) or a
    bare async :class:`SampleSink` — in the latter case this
    method opens the sink against the manager's portal and closes
    it after the recording finishes.

    Returns the :class:`AcquisitionSummary` from
    :func:`watlowlib.sync.pipe`.
    """
    # Lazy imports — sink machinery pulls heavy deps (anyio sink
    # primitives, sqlite, etc.) and we want the surface importable
    # without that until the user reaches for streaming.
    from watlowlib.streaming.recorder import OverflowPolicy as _OverflowPolicy  # noqa: PLC0415
    from watlowlib.sync.recording import pipe, record  # noqa: PLC0415
    from watlowlib.sync.sinks import SyncSinkAdapter  # noqa: PLC0415

    self._require_mgr()
    active_overflow = overflow if overflow is not None else _OverflowPolicy.BLOCK
    portal = self.portal

    with ExitStack() as stack:
        sink_for_pipe: SyncSinkAdapter | SampleSink
        if isinstance(sink, SyncSinkAdapter):
            # Caller-owned sync wrapper — no open / close here.
            sink_for_pipe = sink
        else:
            # Bare async sink — wrap on this manager's portal so it
            # shares the recorder's event loop, and own the
            # open/close lifecycle through the ExitStack.
            wrapped = SyncSinkAdapter(sink, portal=portal)
            stack.enter_context(wrapped)
            sink_for_pipe = wrapped

        recording = stack.enter_context(
            record(
                self,
                parameters=parameters,
                rate_hz=rate_hz,
                duration=duration,
                names=names,
                instances=instances,
                overflow=active_overflow,
                buffer_size=buffer_size,
                portal=portal,
            ),
        )
        return pipe(
            recording.stream,
            sink_for_pipe,
            batch_size=batch_size,
            flush_interval=flush_interval,
            portal=portal,
        )

remove

remove(name)

Blocking :meth:WatlowManager.remove.

Source code in src/watlowlib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`WatlowManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

Watlow

Namespace for the sync controller entry point.

Use :meth:Watlow.open as a context manager::

from watlowlib.sync import Watlow

with Watlow.open("/dev/ttyUSB0") as ctl:
    print(ctl.read_pv())

open staticmethod

open(
    port,
    *,
    profile=EZZONE_PROFILE,
    protocol=None,
    address=1,
    serial_settings=None,
    assert_wire_temperature_unit=None,
    identify=True,
    portal=None,
)

Open a sync :class:SyncController scoped to a with block.

Mirrors :func:watlowlib.open_device parameter-for-parameter (modulo the portal plumbing). The sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless one is passed in via portal=.

protocol=None adopts profile.default_protocol (Std Bus for the default EZ-ZONE PM profile, Modbus RTU for the Series SD profile) — same semantics as :func:watlowlib.open_device.

Source code in src/watlowlib/sync/controller.py
@staticmethod
@contextmanager
def open(
    port: str,
    *,
    profile: DeviceProfile = EZZONE_PROFILE,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    assert_wire_temperature_unit: Unit | str | None = None,
    identify: bool = True,
    portal: SyncPortal | None = None,
) -> Generator[SyncController]:
    """Open a sync :class:`SyncController` scoped to a ``with`` block.

    Mirrors :func:`watlowlib.open_device` parameter-for-parameter
    (modulo the portal plumbing). The sync CM drives the async
    factory through a :class:`SyncPortal`; the portal is created
    per-call unless one is passed in via ``portal=``.

    ``protocol=None`` adopts ``profile.default_protocol`` (Std Bus
    for the default EZ-ZONE PM profile, Modbus RTU for the Series SD
    profile) — same semantics as :func:`watlowlib.open_device`.
    """
    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        controller = active_portal.call(
            open_device,
            port,
            profile=profile,
            protocol=protocol,
            address=address,
            serial_settings=serial_settings,
            assert_wire_temperature_unit=assert_wire_temperature_unit,
            identify=identify,
        )
        # ``open_device`` returns a controller that may or may not
        # already be open: AUTO returned by the detector is open;
        # STDBUS / MODBUS_RTU need ``__aenter__`` to run open().
        # ``Controller.__aenter__`` short-circuits when already open
        # and returns ``self``; calling it through the portal here
        # gives us the same lifecycle as ``async with`` does.
        active_portal.call(controller.__aenter__)
        try:
            yield wrap_controller(controller, active_portal)
        finally:
            # Close the underlying transport through the portal.
            active_portal.call(controller.close)

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:watlowlib.sinks.pipe.

Source code in src/watlowlib/sync/recording.py
def pipe(
    stream: Iterator[Sequence[Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`watlowlib.sinks.pipe`."""
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch)
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
    )

record

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:watlowlib.streaming.record.

Yields a :class:SyncRecording — iterate recording.stream for per-tick batches, read recording.summary for live counters, recording.rate_hz for the running rate.

If source is a :class:SyncWatlowManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override.

Source code in src/watlowlib/sync/recording.py
@contextmanager
def record(
    source: SyncWatlowManager | PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[SyncRecording]:
    """Sync :func:`watlowlib.streaming.record`.

    Yields a :class:`SyncRecording` — iterate ``recording.stream``
    for per-tick batches, read ``recording.summary`` for live
    counters, ``recording.rate_hz`` for the running rate.

    If ``source`` is a :class:`SyncWatlowManager`, its portal is
    reused — the recorder and manager must share an event loop. Pass
    ``portal=`` to override.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            parameters=parameters,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            instances=instances,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_recording = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_recording.stream))
        yield SyncRecording(
            stream=sync_iter,
            summary=async_recording.summary,
            rate_hz=async_recording.rate_hz,
        )

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Portal

watlowlib.sync.portal

Blocking portal primitive — sync access to the async core.

:class:SyncPortal wraps :func:anyio.from_thread.start_blocking_portal so the rest of the sync facade (controller, manager, recording, sinks) can share one dispatch primitive.

Shape:

  • Lifecycle is a plain with block. Each portal owns one background event-loop thread; the portal closes when the block exits. Portals are one-shot — re-entering after exit raises.
  • call(func, *args, **kwargs) runs a coroutine. kwargs are bound through :func:functools.partial because :meth:anyio.from_thread.BlockingPortal.call only accepts positional arguments.
  • Single-member :class:ExceptionGroup s are unwrapped. The async core runs inside task groups (manager, recorder), so AnyIO occasionally rewraps a single exception into a group. Unwrap so callers see the concrete :class:~watlowlib.errors.WatlowError subclass they branch on. Aggregates with two or more exceptions stay as :class:ExceptionGroup, so sync callers under manager ErrorPolicy.RAISE handle one-failure and multi-failure cases with different exception shapes.
  • wrap_async_context_manager delegates to the portal's helper.
  • wrap_async_iter bridges async iteration. The returned :class:SyncAsyncIterator is both iterable and closeable.

Design reference: docs/design.md §6.

SyncAsyncIterator

SyncAsyncIterator(portal, async_iter)

Blocking view over an async iterator, bound to a :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def __init__(self, portal: SyncPortal, async_iter: AsyncIterator[T]) -> None:
    self._portal = portal
    self._aiter = async_iter
    self._closed = False

close

close()

Cancel the underlying async iterator if it exposes aclose.

Source code in src/watlowlib/sync/portal.py
def close(self) -> None:
    """Cancel the underlying async iterator if it exposes ``aclose``."""
    if self._closed:
        return
    self._closed = True
    if not self._portal.running:
        return
    aclose: Callable[[], Awaitable[Any]] | None = getattr(self._aiter, "aclose", None)
    if aclose is None:
        return
    with contextlib.suppress(Exception):
        self._portal.call(aclose)

SyncPortal

SyncPortal(*, backend='asyncio')

Per-context wrapper around :class:anyio.from_thread.BlockingPortal.

Example

with SyncPortal() as portal: # doctest: +SKIP ... result = portal.call(some_async_func, arg1, arg2)

Source code in src/watlowlib/sync/portal.py
def __init__(self, *, backend: str = "asyncio") -> None:
    self._backend = backend
    self._cm: AbstractContextManager[BlockingPortal] | None = None
    self._portal: BlockingPortal | None = None
    self._entered = False

running property

running

True between :meth:__enter__ and :meth:__exit__.

call

call(func, *args, **kwargs)

Run func(*args, **kwargs) on the portal's event loop.

Single-member :class:ExceptionGroup wrappers are stripped.

Source code in src/watlowlib/sync/portal.py
def call[**P, T](
    self,
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run ``func(*args, **kwargs)`` on the portal's event loop.

    Single-member :class:`ExceptionGroup` wrappers are stripped.
    """
    portal = self._require_portal()
    bound: Callable[[], Awaitable[T]] = (
        partial(func, *args, **kwargs) if kwargs else partial(func, *args)
    )
    try:
        return portal.call(bound)
    except Exception as exc:
        unwrapped = _unwrap_single_group(exc)
        if unwrapped is exc:
            raise
        raise unwrapped from None

wrap_async_context_manager

wrap_async_context_manager(acm)

Present an async context manager as a sync context manager.

Source code in src/watlowlib/sync/portal.py
def wrap_async_context_manager[T](
    self, acm: AbstractAsyncContextManager[T]
) -> AbstractContextManager[T]:
    """Present an async context manager as a sync context manager."""
    return self._require_portal().wrap_async_context_manager(acm)

wrap_async_iter

wrap_async_iter(async_iter)

Present an async iterator as a blocking, closeable iterator.

Source code in src/watlowlib/sync/portal.py
def wrap_async_iter[T](self, async_iter: AsyncIterator[T]) -> SyncAsyncIterator[T]:
    """Present an async iterator as a blocking, closeable iterator."""
    self._require_portal()
    return SyncAsyncIterator(self, async_iter)

run_sync

run_sync(func, *args, **kwargs)

Run one coroutine in a throwaway :class:SyncPortal.

Source code in src/watlowlib/sync/portal.py
def run_sync[**P, T](
    func: Callable[P, Awaitable[T]],
    *args: P.args,
    **kwargs: P.kwargs,
) -> T:
    """Run one coroutine in a throwaway :class:`SyncPortal`."""
    with SyncPortal() as portal:
        return portal.call(func, *args, **kwargs)

Controller

watlowlib.sync.controller

Sync controller facade — portal-driven wrapper over :class:Controller.

Each :class:SyncController holds a reference to an async :class:~watlowlib.devices.controller.Controller and a :class:~watlowlib.sync.portal.SyncPortal; every public method is a one-liner that hands the underlying coroutine to the portal.

The :class:Watlow namespace exposes a Watlow.open(...) context manager that drives the async :func:~watlowlib.devices.factory.open_device through the portal.

Design reference: docs/design.md §6.

SyncController

SyncController(controller, portal)

Blocking facade over :class:watlowlib.devices.controller.Controller.

Instances are produced by :meth:Watlow.open or yielded by the sync manager; users do not call this constructor directly.

Source code in src/watlowlib/sync/controller.py
def __init__(self, controller: Controller, portal: SyncPortal) -> None:
    self._ctl = controller
    self._portal = portal

loops property

loops

Cached loop count — passes through :attr:Controller.loops.

portal property

portal

The :class:SyncPortal this controller routes coroutines through.

session property

session

Underlying async :class:Session (advanced escape-hatch).

close

close()

Blocking :meth:Controller.close. Idempotent.

Source code in src/watlowlib/sync/controller.py
def close(self) -> None:
    """Blocking :meth:`Controller.close`. Idempotent."""
    if not self._portal.running:
        return
    self._portal.call(self._ctl.close)

identify

identify(*, timeout=None)

Blocking :meth:Controller.identify.

Source code in src/watlowlib/sync/controller.py
def identify(self, *, timeout: float | None = None) -> DeviceInfo:
    """Blocking :meth:`Controller.identify`."""
    return self._portal.call(self._ctl.identify, timeout=timeout)

loop

loop(n)

Return a sync sub-facade bound to loop n (1-indexed).

Source code in src/watlowlib/sync/controller.py
def loop(self, n: int) -> SyncControllerLoop:
    """Return a sync sub-facade bound to loop ``n`` (1-indexed)."""
    return SyncControllerLoop(self._ctl.loop(n), self._portal)

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:Controller.poll_many.

Source code in src/watlowlib/sync/controller.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`Controller.poll_many`."""
    return self._portal.call(
        self._ctl.poll_many,
        parameters,
        names=names,
        instances=instances,
    )

read_comms_unit_label

read_comms_unit_label(*, timeout=None)

Blocking :meth:Controller.read_comms_unit_label.

Source code in src/watlowlib/sync/controller.py
def read_comms_unit_label(self, *, timeout: float | None = None) -> Unit | None:
    """Blocking :meth:`Controller.read_comms_unit_label`."""
    return self._portal.call(self._ctl.read_comms_unit_label, timeout=timeout)

read_parameter

read_parameter(name_or_id, *, instance=1, timeout=None)

Blocking :meth:Controller.read_parameter.

Source code in src/watlowlib/sync/controller.py
def read_parameter(
    self,
    name_or_id: str | int,
    *,
    instance: int = 1,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.read_parameter`."""
    return self._portal.call(
        self._ctl.read_parameter,
        name_or_id,
        instance=instance,
        timeout=timeout,
    )

read_pv

read_pv(*, instance=1, timeout=None)

Blocking :meth:Controller.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_pv`."""
    return self._portal.call(self._ctl.read_pv, instance=instance, timeout=timeout)

read_setpoint

read_setpoint(*, instance=1, timeout=None)

Blocking :meth:Controller.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, instance: int = 1, timeout: float | None = None) -> Reading:
    """Blocking :meth:`Controller.read_setpoint`."""
    return self._portal.call(self._ctl.read_setpoint, instance=instance, timeout=timeout)

set_comms_unit_label

set_comms_unit_label(unit, *, confirm=False, timeout=None)

Blocking :meth:Controller.set_comms_unit_label.

Source code in src/watlowlib/sync/controller.py
def set_comms_unit_label(
    self,
    unit: Unit | str,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Unit | None:
    """Blocking :meth:`Controller.set_comms_unit_label`."""
    return self._portal.call(
        self._ctl.set_comms_unit_label,
        unit,
        confirm=confirm,
        timeout=timeout,
    )

set_setpoint

set_setpoint(
    value, *, instance=1, confirm=False, timeout=None
)

Blocking :meth:Controller.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`Controller.set_setpoint`."""
    return self._portal.call(
        self._ctl.set_setpoint,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

snapshot

snapshot(*, name=None)

Blocking :meth:Controller.snapshot.

Source code in src/watlowlib/sync/controller.py
def snapshot(self, *, name: str | None = None) -> WatlowDeviceSnapshot:
    """Blocking :meth:`Controller.snapshot`."""
    return self._portal.call(self._ctl.snapshot, name=name)

write_parameter

write_parameter(
    name_or_id,
    value,
    *,
    instance=1,
    confirm=False,
    timeout=None,
)

Blocking :meth:Controller.write_parameter.

Source code in src/watlowlib/sync/controller.py
def write_parameter(
    self,
    name_or_id: str | int,
    value: float | int | str,
    *,
    instance: int = 1,
    confirm: bool = False,
    timeout: float | None = None,
) -> ParameterEntry:
    """Blocking :meth:`Controller.write_parameter`."""
    return self._portal.call(
        self._ctl.write_parameter,
        name_or_id,
        value,
        instance=instance,
        confirm=confirm,
        timeout=timeout,
    )

SyncControllerLoop

SyncControllerLoop(async_loop, portal)

Blocking view over a single control loop (mirror of :class:ControllerLoop).

Returned by :meth:SyncController.loop; never instantiated directly. Lifetime is bound to the parent :class:SyncController and its portal — closing the controller is the only cleanup needed.

Source code in src/watlowlib/sync/controller.py
def __init__(self, async_loop: ControllerLoop, portal: SyncPortal) -> None:
    self._loop = async_loop
    self._portal = portal

number property

number

The 1-indexed loop number this view binds.

read_alarms

read_alarms()

Blocking :meth:ControllerLoop.read_alarms.

Source code in src/watlowlib/sync/controller.py
def read_alarms(self) -> AlarmState:
    """Blocking :meth:`ControllerLoop.read_alarms`."""
    return self._portal.call(self._loop.read_alarms)

read_output

read_output()

Blocking :meth:ControllerLoop.read_output.

Source code in src/watlowlib/sync/controller.py
def read_output(self) -> Reading:
    """Blocking :meth:`ControllerLoop.read_output`."""
    return self._portal.call(self._loop.read_output)

read_pid

read_pid()

Blocking :meth:ControllerLoop.read_pid.

Source code in src/watlowlib/sync/controller.py
def read_pid(self) -> PidGains:
    """Blocking :meth:`ControllerLoop.read_pid`."""
    return self._portal.call(self._loop.read_pid)

read_pv

read_pv(*, timeout=None)

Blocking :meth:ControllerLoop.read_pv.

Source code in src/watlowlib/sync/controller.py
def read_pv(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_pv`."""
    return self._portal.call(self._loop.read_pv, timeout=timeout)

read_setpoint

read_setpoint(*, timeout=None)

Blocking :meth:ControllerLoop.read_setpoint.

Source code in src/watlowlib/sync/controller.py
def read_setpoint(self, *, timeout: float | None = None) -> Reading:
    """Blocking :meth:`ControllerLoop.read_setpoint`."""
    return self._portal.call(self._loop.read_setpoint, timeout=timeout)

set_setpoint

set_setpoint(value, *, confirm=False, timeout=None)

Blocking :meth:ControllerLoop.set_setpoint.

Source code in src/watlowlib/sync/controller.py
def set_setpoint(
    self,
    value: float,
    *,
    confirm: bool = False,
    timeout: float | None = None,
) -> Reading:
    """Blocking :meth:`ControllerLoop.set_setpoint`."""
    return self._portal.call(
        self._loop.set_setpoint,
        value,
        confirm=confirm,
        timeout=timeout,
    )

write_pid

write_pid(gains, *, confirm=False)

Blocking :meth:ControllerLoop.write_pid.

Source code in src/watlowlib/sync/controller.py
def write_pid(self, gains: PidGains, *, confirm: bool = False) -> PidGains:
    """Blocking :meth:`ControllerLoop.write_pid`."""
    return self._portal.call(self._loop.write_pid, gains, confirm=confirm)

Watlow

Namespace for the sync controller entry point.

Use :meth:Watlow.open as a context manager::

from watlowlib.sync import Watlow

with Watlow.open("/dev/ttyUSB0") as ctl:
    print(ctl.read_pv())

open staticmethod

open(
    port,
    *,
    profile=EZZONE_PROFILE,
    protocol=None,
    address=1,
    serial_settings=None,
    assert_wire_temperature_unit=None,
    identify=True,
    portal=None,
)

Open a sync :class:SyncController scoped to a with block.

Mirrors :func:watlowlib.open_device parameter-for-parameter (modulo the portal plumbing). The sync CM drives the async factory through a :class:SyncPortal; the portal is created per-call unless one is passed in via portal=.

protocol=None adopts profile.default_protocol (Std Bus for the default EZ-ZONE PM profile, Modbus RTU for the Series SD profile) — same semantics as :func:watlowlib.open_device.

Source code in src/watlowlib/sync/controller.py
@staticmethod
@contextmanager
def open(
    port: str,
    *,
    profile: DeviceProfile = EZZONE_PROFILE,
    protocol: ProtocolKind | None = None,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    assert_wire_temperature_unit: Unit | str | None = None,
    identify: bool = True,
    portal: SyncPortal | None = None,
) -> Generator[SyncController]:
    """Open a sync :class:`SyncController` scoped to a ``with`` block.

    Mirrors :func:`watlowlib.open_device` parameter-for-parameter
    (modulo the portal plumbing). The sync CM drives the async
    factory through a :class:`SyncPortal`; the portal is created
    per-call unless one is passed in via ``portal=``.

    ``protocol=None`` adopts ``profile.default_protocol`` (Std Bus
    for the default EZ-ZONE PM profile, Modbus RTU for the Series SD
    profile) — same semantics as :func:`watlowlib.open_device`.
    """
    with ExitStack() as stack:
        active_portal = portal if portal is not None else stack.enter_context(SyncPortal())
        controller = active_portal.call(
            open_device,
            port,
            profile=profile,
            protocol=protocol,
            address=address,
            serial_settings=serial_settings,
            assert_wire_temperature_unit=assert_wire_temperature_unit,
            identify=identify,
        )
        # ``open_device`` returns a controller that may or may not
        # already be open: AUTO returned by the detector is open;
        # STDBUS / MODBUS_RTU need ``__aenter__`` to run open().
        # ``Controller.__aenter__`` short-circuits when already open
        # and returns ``self``; calling it through the portal here
        # gives us the same lifecycle as ``async with`` does.
        active_portal.call(controller.__aenter__)
        try:
            yield wrap_controller(controller, active_portal)
        finally:
            # Close the underlying transport through the portal.
            active_portal.call(controller.close)

unwrap_sync_controller

unwrap_sync_controller(source)

Return the async :class:Controller inside source if wrapped.

Package-private helper used by :class:SyncWatlowManager.

Source code in src/watlowlib/sync/controller.py
def unwrap_sync_controller[T](source: T | SyncController) -> T | Controller:
    """Return the async :class:`Controller` inside ``source`` if wrapped.

    Package-private helper used by :class:`SyncWatlowManager`.
    """
    if isinstance(source, SyncController):
        return source._ctl  # pyright: ignore[reportPrivateUsage]
    return source

wrap_controller

wrap_controller(controller, portal)

Return a :class:SyncController wrapping controller on portal.

Package-private helper used by :class:SyncWatlowManager.

Source code in src/watlowlib/sync/controller.py
def wrap_controller(controller: Controller, portal: SyncPortal) -> SyncController:
    """Return a :class:`SyncController` wrapping ``controller`` on ``portal``.

    Package-private helper used by :class:`SyncWatlowManager`.
    """
    return SyncController(controller, portal)

Manager

watlowlib.sync.manager

Sync manager facade — portal-driven wrapper over :class:WatlowManager.

:class:SyncWatlowManager wraps the async :class:~watlowlib.manager.WatlowManager through a :class:~watlowlib.sync.portal.SyncPortal. Every coroutine method becomes a blocking method here; the synchronous :meth:get stays synchronous and delegates directly.

Lifecycle mirrors the async side: the class is a with context manager. By default each instance owns its own portal; callers that need several facades to share one event loop can pass portal= to reuse a long-lived :class:SyncPortal.

Design reference: docs/design.md §6.

DeviceResult dataclass

DeviceResult(value, error)

Per-device result container — value or error, never both.

The protocol that produced the failure is available via result.error.context.protocol when the error carries context; keeping it off the result keeps the success-path representation clean and aligns with the ecosystem DeviceResult shape used by :mod:alicatlib and :mod:sartoriuslib.

ok property

ok

True when the controller produced a value (error is None).

failure classmethod

failure(error)

Build a failure result wrapping error.

Source code in src/watlowlib/manager.py
@classmethod
def failure(cls, error: WatlowError) -> Self:
    """Build a failure result wrapping ``error``."""
    return cls(value=None, error=error)

success classmethod

success(value)

Build a success result wrapping value.

Source code in src/watlowlib/manager.py
@classmethod
def success(cls, value: T) -> Self:
    """Build a success result wrapping ``value``."""
    return cls(value=value, error=None)

ErrorPolicy

Bases: Enum

How the manager surfaces per-device failures.

Under :attr:RAISE, the manager collects every controller's result and — if any call failed — raises an :class:ExceptionGroup containing the per-device exceptions after the task group joins. Under :attr:RETURN, each controller produces a :class:DeviceResult and the caller inspects .error per entry.

SyncWatlowManager

SyncWatlowManager(
    *, error_policy=ErrorPolicy.RAISE, portal=None
)

Blocking facade over :class:watlowlib.manager.WatlowManager.

Source code in src/watlowlib/sync/manager.py
def __init__(
    self,
    *,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    portal: SyncPortal | None = None,
) -> None:
    self._error_policy = error_policy
    self._portal_override = portal
    self._stack: ExitStack | None = None
    self._portal: SyncPortal | None = None
    self._mgr: WatlowManager | None = None
    self._wrapped: dict[str, SyncController] = {}
    self._entered = False

closed property

closed

True once :meth:close or __exit__ has run.

error_policy property

error_policy

The :class:ErrorPolicy this manager was constructed with.

names property

names

Insertion-ordered tuple of managed controller names.

portal property

portal

The :class:SyncPortal this manager's coroutines run on.

add

add(
    name,
    source,
    *,
    protocol=ProtocolKind.STDBUS,
    address=1,
    serial_settings=None,
    profile=EZZONE_PROFILE,
    assert_wire_temperature_unit=None,
)

Blocking :meth:WatlowManager.add.

Accepts a :class:SyncController as source in addition to the async shapes — the wrapper is unwrapped to the underlying :class:Controller before delegation.

Source code in src/watlowlib/sync/manager.py
def add(
    self,
    name: str,
    source: SyncController | Controller | str | Transport,
    *,
    protocol: ProtocolKind = ProtocolKind.STDBUS,
    address: int = 1,
    serial_settings: SerialSettings | None = None,
    profile: DeviceProfile = EZZONE_PROFILE,
    assert_wire_temperature_unit: Unit | str | None = None,
) -> SyncController:
    """Blocking :meth:`WatlowManager.add`.

    Accepts a :class:`SyncController` as ``source`` in addition to
    the async shapes — the wrapper is unwrapped to the underlying
    :class:`Controller` before delegation.
    """
    mgr = self._require_mgr()
    async_source: Controller | str | Transport = unwrap_sync_controller(source)
    async_controller = self.portal.call(
        mgr.add,
        name,
        async_source,
        protocol=protocol,
        address=address,
        serial_settings=serial_settings,
        profile=profile,
        assert_wire_temperature_unit=assert_wire_temperature_unit,
    )
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

close

close()

Blocking :meth:WatlowManager.close — idempotent.

Source code in src/watlowlib/sync/manager.py
def close(self) -> None:
    """Blocking :meth:`WatlowManager.close` — idempotent."""
    self._wrapped.clear()
    mgr = self._mgr
    if mgr is None:
        return
    portal = self._portal
    if portal is None:
        return
    portal.call(mgr.close)

execute_each

execute_each(op, names=None)

Blocking :meth:WatlowManager.execute_each.

op receives the async :class:Controller so existing coroutines compose. If you have a sync helper, wrap it in an async stub or run it on the portal yourself.

Source code in src/watlowlib/sync/manager.py
def execute_each[T](
    self,
    op: Callable[[Controller], Awaitable[T]],
    names: Sequence[str] | None = None,
) -> dict[str, DeviceResult[T]]:
    """Blocking :meth:`WatlowManager.execute_each`.

    ``op`` receives the **async** :class:`Controller` so existing
    coroutines compose. If you have a sync helper, wrap it in an
    async stub or run it on the portal yourself.
    """
    mgr = self._require_mgr()
    return self.portal.call(mgr.execute_each, op, names)

get

get(name)

Return the sync wrapper for the controller registered under name.

Source code in src/watlowlib/sync/manager.py
def get(self, name: str) -> SyncController:
    """Return the sync wrapper for the controller registered under ``name``."""
    cached = self._wrapped.get(name)
    if cached is not None:
        return cached
    mgr = self._require_mgr()
    async_controller = mgr.get(name)
    wrapped = wrap_controller(async_controller, self.portal)
    self._wrapped[name] = wrapped
    return wrapped

poll

poll(parameters, *, names=None, instances=(1,))

Blocking :meth:WatlowManager.poll_many.

Source code in src/watlowlib/sync/manager.py
def poll(
    self,
    parameters: Sequence[str | int],
    *,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
) -> list[Sample]:
    """Blocking :meth:`WatlowManager.poll_many`."""
    mgr = self._require_mgr()
    return self.portal.call(
        mgr.poll_many,
        parameters,
        names=names,
        instances=instances,
    )

record_to_sink

record_to_sink(
    *,
    parameters,
    rate_hz,
    duration=None,
    sink,
    names=None,
    instances=(1,),
    overflow=None,
    buffer_size=64,
    batch_size=64,
    flush_interval=1.0,
)

Record polled samples directly into a sink — one-call convenience.

Combines :func:watlowlib.sync.record and :func:watlowlib.sync.pipe into a single blocking call. The manager's portal is reused for both legs so the recorder and the sink share an event loop. sink may be either a :class:SyncSinkAdapter (preferred, opened externally) or a bare async :class:SampleSink — in the latter case this method opens the sink against the manager's portal and closes it after the recording finishes.

Returns the :class:AcquisitionSummary from :func:watlowlib.sync.pipe.

Source code in src/watlowlib/sync/manager.py
def record_to_sink(
    self,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    sink: SyncSinkAdapter | SampleSink,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy | None = None,
    buffer_size: int = 64,
    batch_size: int = 64,
    flush_interval: float = 1.0,
) -> AcquisitionSummary:
    """Record polled samples directly into a sink — one-call convenience.

    Combines :func:`watlowlib.sync.record` and
    :func:`watlowlib.sync.pipe` into a single blocking call. The
    manager's portal is reused for both legs so the recorder and
    the sink share an event loop. ``sink`` may be either a
    :class:`SyncSinkAdapter` (preferred, opened externally) or a
    bare async :class:`SampleSink` — in the latter case this
    method opens the sink against the manager's portal and closes
    it after the recording finishes.

    Returns the :class:`AcquisitionSummary` from
    :func:`watlowlib.sync.pipe`.
    """
    # Lazy imports — sink machinery pulls heavy deps (anyio sink
    # primitives, sqlite, etc.) and we want the surface importable
    # without that until the user reaches for streaming.
    from watlowlib.streaming.recorder import OverflowPolicy as _OverflowPolicy  # noqa: PLC0415
    from watlowlib.sync.recording import pipe, record  # noqa: PLC0415
    from watlowlib.sync.sinks import SyncSinkAdapter  # noqa: PLC0415

    self._require_mgr()
    active_overflow = overflow if overflow is not None else _OverflowPolicy.BLOCK
    portal = self.portal

    with ExitStack() as stack:
        sink_for_pipe: SyncSinkAdapter | SampleSink
        if isinstance(sink, SyncSinkAdapter):
            # Caller-owned sync wrapper — no open / close here.
            sink_for_pipe = sink
        else:
            # Bare async sink — wrap on this manager's portal so it
            # shares the recorder's event loop, and own the
            # open/close lifecycle through the ExitStack.
            wrapped = SyncSinkAdapter(sink, portal=portal)
            stack.enter_context(wrapped)
            sink_for_pipe = wrapped

        recording = stack.enter_context(
            record(
                self,
                parameters=parameters,
                rate_hz=rate_hz,
                duration=duration,
                names=names,
                instances=instances,
                overflow=active_overflow,
                buffer_size=buffer_size,
                portal=portal,
            ),
        )
        return pipe(
            recording.stream,
            sink_for_pipe,
            batch_size=batch_size,
            flush_interval=flush_interval,
            portal=portal,
        )

remove

remove(name)

Blocking :meth:WatlowManager.remove.

Source code in src/watlowlib/sync/manager.py
def remove(self, name: str) -> None:
    """Blocking :meth:`WatlowManager.remove`."""
    mgr = self._require_mgr()
    self._wrapped.pop(name, None)
    self.portal.call(mgr.remove, name)

Recording

watlowlib.sync.recording

Sync wrappers for :func:watlowlib.streaming.record and :func:watlowlib.sinks.pipe.

:func:record — sync context manager wrapping the async recorder. Yields a :class:SyncRecording exposing .stream (blocking iterator), .summary (live :class:AcquisitionSummary mutated by the async producer; consumers treat it as read-only), and .rate_hz. On CM exit the underlying async task group is cancelled and joined by the portal, and summary.finished_at is populated.

:func:pipe — sync drain loop matching :func:watlowlib.sinks.pipe's batch / time flush semantics. Rebuilt in sync-land rather than wrapping the async driver so buffering stays under sync control and the time threshold uses :func:time.monotonic, not :func:anyio.current_time.

Design reference: docs/design.md §6.

AcquisitionSummary dataclass

AcquisitionSummary(
    started_at,
    finished_at=None,
    samples_emitted=0,
    samples_late=0,
    max_drift_ms=0.0,
    tick_duration_ms_p50=0.0,
    tick_duration_ms_p99=0.0,
    disconnects=0,
)

Per-run summary owned and mutated by the recorder.

Mutability contract (§M of UNIFIED_API_HANDOFF.md): the recorder is the only writer. Counters update in place during the run so progress-polling consumers (TUIs, dashboards) see live values. Consumers must treat this object as read-only.

finished_at is None while the recording is in flight and is set on context-manager exit. Percentile fields (tick_duration_ms_p50 / p99) are materialized at exit only because percentiles are batch-computed; the in-flight counters reflect the latest observation.

Attributes:

Name Type Description
started_at datetime

Wall-clock at the first scheduled tick.

finished_at datetime | None

Wall-clock at producer shutdown, or None while running.

samples_emitted int

Count of per-tick batches actually pushed onto the receive stream. A tick that produced zero samples (every device errored) still counts as one emitted batch.

samples_late int

Count of ticks that missed their target slot (producer overran the previous tick, or overflow policy dropped the batch). Auto-reconnect ticks also count as late.

max_drift_ms float

Largest observed positive drift of an emitted batch relative to its absolute target, in milliseconds. A healthy run stays well under one period; values approaching 1000 / rate_hz indicate the device or consumer is saturating the schedule.

tick_duration_ms_p50 float

Median wall-clock duration of a single source.poll_many(...) call across the run, in milliseconds. Set on exit only. Compares directly to 1000 / rate_hz — if it approaches the period, the schedule is saturated.

tick_duration_ms_p99 float

99th-percentile tick duration, in milliseconds. Set on exit only. Surfaces rare-but-bad cases where a tick stalled behind a contended port lock or a slow EEPROM commit.

disconnects int

Count of WatlowConnectionError events the producer absorbed under auto_reconnect=True. Always 0 when auto_reconnect was off.

OverflowPolicy

Bases: Enum

What record() does when the receive-stream buffer is full.

The producer runs on an absolute-target schedule; the consumer drains at its own pace. Slow consumers create backpressure — this knob picks how the recorder responds.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Await the slow consumer. Default. Silent drops are surprising in a data-acquisition setting, so the recorder blocks the producer rather than quietly discarding samples.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the batch that was about to be enqueued. Counted as late.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Evict the oldest queued batch and enqueue the newest. Useful for real-time monitoring where the latest reading matters more than historical buffer contents. Each evicted batch is counted as late.

SyncRecording dataclass

SyncRecording(stream, summary, rate_hz)

Sync mirror of :class:watlowlib.streaming.Recording.

The async producer owns summary; reading the mutable dataclass attributes from the calling thread is safe — attribute reads on a plain Python dataclass are atomic, and the recorder is the only writer.

Attributes:

Name Type Description
stream Iterator[Sequence[Sample]]

Blocking iterator over per-tick :class:Sample batches. Iterating drives the async receive stream via the portal.

summary AcquisitionSummary

Live :class:AcquisitionSummary. finished_at is None while running and set on CM exit.

rate_hz float

The cadence the recorder is running at.

pipe

pipe(
    stream,
    sink,
    *,
    batch_size=64,
    flush_interval=1.0,
    portal=None,
)

Sync :func:watlowlib.sinks.pipe.

Source code in src/watlowlib/sync/recording.py
def pipe(
    stream: Iterator[Sequence[Sample]],
    sink: SyncSinkAdapter | SampleSink,
    *,
    batch_size: int = 64,
    flush_interval: float = 1.0,
    portal: SyncPortal | None = None,
) -> AcquisitionSummary:
    """Sync :func:`watlowlib.sinks.pipe`."""
    if batch_size < 1:
        raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
    if flush_interval <= 0:
        raise ValueError(f"flush_interval must be > 0, got {flush_interval!r}")

    if isinstance(sink, SyncSinkAdapter):
        flush = sink.write_many
    else:
        resolved: SyncPortal | None = portal
        if resolved is None and isinstance(stream, SyncAsyncIterator):
            resolved = stream._portal  # pyright: ignore[reportPrivateUsage]
        if resolved is None:
            raise RuntimeError(
                "pipe: passing an async SampleSink requires a portal — "
                "wrap the sink in a SyncSinkAdapter or pass portal=.",
            )
        async_sink = sink
        active: SyncPortal = resolved

        def flush(samples: Sequence[Sample]) -> None:
            active.call(async_sink.write_many, samples)

    started_at = datetime.now(UTC)
    emitted = 0
    buffer: list[Sample] = []
    last_flush = time.monotonic()

    for batch in stream:
        buffer.extend(batch)
        now = time.monotonic()
        if len(buffer) >= batch_size or (now - last_flush) >= flush_interval:
            flush(buffer)
            emitted += len(buffer)
            buffer.clear()
            last_flush = now

    if buffer:
        flush(buffer)
        emitted += len(buffer)
        buffer.clear()

    finished_at = datetime.now(UTC)
    return AcquisitionSummary(
        started_at=started_at,
        finished_at=finished_at,
        samples_emitted=emitted,
    )

record

record(
    source,
    *,
    parameters,
    rate_hz,
    duration=None,
    names=None,
    instances=(1,),
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
    portal=None,
)

Sync :func:watlowlib.streaming.record.

Yields a :class:SyncRecording — iterate recording.stream for per-tick batches, read recording.summary for live counters, recording.rate_hz for the running rate.

If source is a :class:SyncWatlowManager, its portal is reused — the recorder and manager must share an event loop. Pass portal= to override.

Source code in src/watlowlib/sync/recording.py
@contextmanager
def record(
    source: SyncWatlowManager | PollSource,
    *,
    parameters: Sequence[str | int],
    rate_hz: float,
    duration: float | None = None,
    names: Sequence[str] | None = None,
    instances: Sequence[int] = (1,),
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
    portal: SyncPortal | None = None,
) -> Generator[SyncRecording]:
    """Sync :func:`watlowlib.streaming.record`.

    Yields a :class:`SyncRecording` — iterate ``recording.stream``
    for per-tick batches, read ``recording.summary`` for live
    counters, ``recording.rate_hz`` for the running rate.

    If ``source`` is a :class:`SyncWatlowManager`, its portal is
    reused — the recorder and manager must share an event loop. Pass
    ``portal=`` to override.
    """
    poll_source = _resolve_poll_source(source)
    with ExitStack() as stack:
        active_portal = _resolve_portal(portal, source, None) or stack.enter_context(SyncPortal())
        async_cm = async_record(
            poll_source,
            parameters=parameters,
            rate_hz=rate_hz,
            duration=duration,
            names=names,
            instances=instances,
            overflow=overflow,
            buffer_size=buffer_size,
        )
        async_recording = stack.enter_context(active_portal.wrap_async_context_manager(async_cm))
        sync_iter = stack.enter_context(active_portal.wrap_async_iter(async_recording.stream))
        yield SyncRecording(
            stream=sync_iter,
            summary=async_recording.summary,
            rate_hz=async_recording.rate_hz,
        )

Sinks

watlowlib.sync.sinks

Sync wrappers for :mod:watlowlib.sinks.

Every in-tree sink has a one-to-one sync counterpart. All of them share :class:SyncSinkAdapter: the per-sink subclass only constructs the matching async sink with its own parameters and hands it to the adapter, which owns the portal + open/write/close plumbing.

Sinks follow the same portal-ownership pattern as the rest of the sync facade — each wrapper creates a throwaway :class:SyncPortal on __enter__ unless the caller passes one in. Pass a shared portal when the sink must share an event loop with a :class:SyncWatlowManager or :func:record, otherwise the sink's writes run on a different loop than the data producer.

Design reference: docs/design.md §6.

PostgresConfig dataclass

PostgresConfig(
    dsn=None,
    host=None,
    port=5432,
    user=None,
    password=None,
    database=None,
    schema="public",
    table="samples",
    pool_min_size=1,
    pool_max_size=4,
    statement_timeout_ms=30000,
    command_timeout_s=10.0,
    create_table=False,
    use_copy=True,
)

Connection + target settings for :class:PostgresSink.

Either dsn or the discrete host/user/database set must be provided. Credentials are not logged.

Attributes:

Name Type Description
dsn str | None

Full libpq-style connection string (e.g. postgres://user:pass@host:5432/db). Mutually exclusive with the discrete fields.

host str | None

Database host. Required if dsn is not set.

port int

Database port. Defaults to 5432.

user str | None

Database role.

password str | None

Role password. Never logged.

database str | None

Database name.

schema str

Target schema. Validated against [A-Za-z_][A-Za-z0-9_]{0,62}.

table str

Target table. Validated against the same pattern.

pool_min_size int

Minimum pool size. Defaults to 1.

pool_max_size int

Maximum pool size. Defaults to 4.

statement_timeout_ms int

statement_timeout applied as a server setting. Defaults to 30 s.

command_timeout_s float

asyncpg's per-call command timeout. Defaults to 10 s.

create_table bool

If True, infer the schema from the first batch and run CREATE TABLE IF NOT EXISTS. If False (the safer default), require the table to exist and lock the schema from information_schema.columns.

use_copy bool

If True (default), bulk-write via asyncpg's binary COPY path. Disable only if your environment does not grant the COPY privilege to the sink's role, in which case writes fall back to prepared executemany.

target

target()

Return a log-safe description of the target: host:port/db.schema.table.

Source code in src/watlowlib/sinks/postgres.py
def target(self) -> str:
    """Return a log-safe description of the target: ``host:port/db.schema.table``."""
    if self.dsn is not None:
        parsed = urlparse(self.dsn)
        host = parsed.hostname or "?"
        port = parsed.port or self.port
        db = (parsed.path or "/?").lstrip("/") or "?"
    else:
        host = self.host or "?"
        port = self.port
        db = self.database or "?"
    return f"{host}:{port}/{db}.{self.schema}.{self.table}"

SyncCsvSink

SyncCsvSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.csv.CsvSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(CsvSink(path), portal=portal)

SyncInMemorySink

SyncInMemorySink(*, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.memory.InMemorySink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, *, portal: SyncPortal | None = None) -> None:
    super().__init__(InMemorySink(), portal=portal)

samples property

samples

Captured samples — proxy for :attr:InMemorySink.samples.

SyncJsonlSink

SyncJsonlSink(path, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.jsonl.JsonlSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(self, path: str | Path, *, portal: SyncPortal | None = None) -> None:
    super().__init__(JsonlSink(path), portal=portal)

SyncParquetSink

SyncParquetSink(
    path,
    *,
    compression="zstd",
    use_dictionary=True,
    row_group_size=None,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.parquet.ParquetSink.

Requires the watlowlib[parquet] extra — the dependency check runs on :meth:open, same as the async sink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    compression: _Compression = "zstd",
    use_dictionary: bool = True,
    row_group_size: int | None = None,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        ParquetSink(
            path,
            compression=compression,
            use_dictionary=use_dictionary,
            row_group_size=row_group_size,
        ),
        portal=portal,
    )

SyncPostgresSink

SyncPostgresSink(config, *, portal=None)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.postgres.PostgresSink.

Requires the watlowlib[postgres] extra — dependency check runs on :meth:open.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    config: PostgresConfig,
    *,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(PostgresSink(config), portal=portal)

SyncSampleSink

Bases: Protocol

Sync shape of an acquisition sink.

Mirrors :class:~watlowlib.sinks.base.SampleSink — same method names, no await. Every concrete wrapper in this module satisfies this Protocol.

__enter__

__enter__()

Open the sink and return self.

Source code in src/watlowlib/sync/sinks.py
def __enter__(self) -> Self:
    """Open the sink and return self."""
    ...

__exit__

__exit__(exc_type, exc, tb)

Close the sink on exit.

Source code in src/watlowlib/sync/sinks.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Close the sink on exit."""
    ...

close

close()

Flush and release the backing resource — idempotent.

Source code in src/watlowlib/sync/sinks.py
def close(self) -> None:
    """Flush and release the backing resource — idempotent."""
    ...

open

open()

Allocate the sink's backing resource.

Source code in src/watlowlib/sync/sinks.py
def open(self) -> None:
    """Allocate the sink's backing resource."""
    ...

write_many

write_many(samples)

Append samples to the sink.

Source code in src/watlowlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Append ``samples`` to the sink."""
    ...

SyncSinkAdapter

SyncSinkAdapter(async_sink, *, portal=None)

Generic sync wrapper around any :class:SampleSink.

Subclasses typically only override :meth:__init__ to build the matching async sink with sink-specific parameters and hand it to this base class.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    async_sink: SampleSink,
    *,
    portal: SyncPortal | None = None,
) -> None:
    self._async_sink = async_sink
    self._portal_override = portal
    self._portal: SyncPortal | None = None
    self._stack: ExitStack | None = None
    self._entered = False

async_sink property

async_sink

The wrapped async :class:SampleSink — escape hatch.

portal property

portal

Active :class:SyncPortal (raises if outside with block).

close

close()

Blocking :meth:SampleSink.close — idempotent.

Source code in src/watlowlib/sync/sinks.py
def close(self) -> None:
    """Blocking :meth:`SampleSink.close` — idempotent."""
    portal = self._portal
    if portal is None:
        return
    portal.call(self._async_sink.close)

open

open()

Blocking :meth:SampleSink.open.

Source code in src/watlowlib/sync/sinks.py
def open(self) -> None:
    """Blocking :meth:`SampleSink.open`."""
    self.portal.call(self._async_sink.open)

write_many

write_many(samples)

Blocking :meth:SampleSink.write_many.

Source code in src/watlowlib/sync/sinks.py
def write_many(self, samples: Sequence[Sample]) -> None:
    """Blocking :meth:`SampleSink.write_many`."""
    self.portal.call(self._async_sink.write_many, samples)

SyncSqliteSink

SyncSqliteSink(
    path,
    *,
    table="samples",
    create_table=True,
    journal_mode="WAL",
    synchronous="NORMAL",
    busy_timeout_ms=5000,
    portal=None,
)

Bases: SyncSinkAdapter

Sync wrapper over :class:~watlowlib.sinks.sqlite.SqliteSink.

Source code in src/watlowlib/sync/sinks.py
def __init__(
    self,
    path: str | Path,
    *,
    table: str = "samples",
    create_table: bool = True,
    journal_mode: _JournalMode = "WAL",
    synchronous: _Synchronous = "NORMAL",
    busy_timeout_ms: int = 5000,
    portal: SyncPortal | None = None,
) -> None:
    super().__init__(
        SqliteSink(
            path,
            table=table,
            create_table=create_table,
            journal_mode=journal_mode,
            synchronous=synchronous,
            busy_timeout_ms=busy_timeout_ms,
        ),
        portal=portal,
    )