Implementing Custom Sinks in Loguru for Production Observability

You need to route structured Loguru records to an external backend such as an OTLP collector or a log aggregator without blocking request threads or losing events when the backend is unreachable. This guide covers the exact callable contract, schema transformation, queue-backed dispatch, and dead-letter routing. It builds on the Loguru Configuration and Sinks reference and is part of the Modern Python Logging Libraries Deep Dive.

Custom sink dispatch path The sink callable enqueues a transformed record onto a bounded queue. A background worker dispatches to the OTLP collector; on repeated failure the record is routed to a dead-letter file sink instead of crashing the process. sink() transform bounded queue worker retry collector v1/logs DLQ file replay later
A custom sink transforms and enqueues; a worker dispatches and falls back to a dead-letter file on repeated failure.

Prerequisites

Pin Loguru and an async HTTP client for the OTLP example. The transformation example needs only the standard library.

pip install \
  "loguru>=0.7.0,<0.8.0" \
  "httpx>=0.27.0,<1.0.0"

Set the collector endpoint through the environment so the sink stays deployment-agnostic.

export OTEL_EXPORTER_OTLP_LOGS_ENDPOINT="http://otel-collector:4318/v1/logs"

Implementation

A custom sink is the right tool whenever the destination is not a file or a stream: an HTTP endpoint, a message broker, a database, or any backend that needs a schema Loguru's built-in serializer does not produce. The contract is deliberately minimal — a callable taking one Message — but that minimalism puts every concern back on you: threading, back-pressure, schema, retries, and failure isolation. The four steps below address each in turn, and they compose: the queue worker in step two dispatches the schema from step three and falls back to the dead-letter path from step four.

Step 1 — Match the callable contract. Loguru invokes a sink synchronously with a single Message argument. Read the record through message.record and never mutate it in place; the same record object is shared with every other sink, so an in-place change corrupts their output. Extract context defensively with .get and explicit defaults.

import json
from loguru import logger


def sync_otel_sink(message) -> None:
    """Synchronous sink: safe extraction, flat JSON, isolated failures."""
    record = message.record
    extra = record["extra"]
    payload = {
        "timestamp": record["time"].isoformat(),
        "severity_number": record["level"].no,
        "severity_text": record["level"].name,
        "module": record["name"],
        "body": record["message"],
        "trace_id": extra.get("trace_id"),
        "span_id": extra.get("span_id"),
    }
    try:
        print(json.dumps(payload, default=str, separators=(",", ":")))
    except Exception as exc:  # never let the sink crash the caller
        import sys
        print(f"sink serialization failed: {exc}", file=sys.stderr)


logger.remove()
logger.add(sync_otel_sink, level="INFO", enqueue=True)
logger.bind(
    trace_id="0af7651916cd43dd8448eb211c80319c",
    span_id="b7ad6b7169203331",
).info("Service initialized")

Expected Output:

{"timestamp":"2024-05-12T10:15:30.123456+00:00","severity_number":20,"severity_text":"INFO","module":"__main__","body":"Service initialized","trace_id":"0af7651916cd43dd8448eb211c80319c","span_id":"b7ad6b7169203331"}

Step 2 — Decouple network I/O with a queue-backed worker. A direct HTTP call inside the sink blocks the calling thread for the full round trip. Push the record onto a bounded asyncio.Queue instead and dispatch from a background worker, which enforces back-pressure and keeps logging off the request path. Provide an explicit stop coroutine to drain pending records at shutdown; Loguru does not call it for you.

import asyncio
import json
from loguru import logger


class AsyncOTLPSink:
    """Enqueues records; a background worker dispatches them."""

    def __init__(self, maxsize: int = 10000) -> None:
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
        self._task: asyncio.Task | None = None

    async def _worker(self) -> None:
        while True:
            record = await self.queue.get()
            try:
                payload = json.dumps({
                    "timestamp": record["time"].isoformat(),
                    "severity_text": record["level"].name,
                    "body": record["message"],
                    "traceparent": record["extra"].get("traceparent"),
                }, default=str)
                # await http_client.post(endpoint, content=payload)
                print(f"[DISPATCH] {payload}")
            except Exception as exc:
                import sys
                print(f"async dispatch failed: {exc}", file=sys.stderr)
            finally:
                self.queue.task_done()

    def __call__(self, message) -> None:
        """Called by Loguru; enqueues without blocking, drops when full."""
        try:
            self.queue.put_nowait(message.record)
        except asyncio.QueueFull:
            import sys
            print("log queue full, dropping message", file=sys.stderr)

    def start(self) -> None:
        self._task = asyncio.create_task(self._worker())

    async def stop(self) -> None:
        await self.queue.join()
        if self._task:
            self._task.cancel()
            await asyncio.gather(self._task, return_exceptions=True)


async def main() -> None:
    sink = AsyncOTLPSink(maxsize=5000)
    logger.remove()
    logger.add(sink, level="DEBUG")
    sink.start()
    logger.info(
        "Async pipeline active",
        traceparent="00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
    )
    await asyncio.sleep(0.1)
    await sink.stop()


if __name__ == "__main__":
    asyncio.run(main())

Expected Output:

[DISPATCH] {"timestamp": "2024-05-12T10:15:30.123456+00:00", "severity_text": "INFO", "body": "Async pipeline active", "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"}

Step 3 — Transform to a vendor-agnostic schema. Aggregators want flat JSON. Map Loguru's severity number directly to the OpenTelemetry severity_number range, strip ANSI escape sequences that break strict parsers, and flatten nested context with dot notation for Elasticsearch or ClickHouse.

import re
from loguru import logger

ANSI = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")


def flatten(d: dict, prefix: str = "", sep: str = ".") -> dict:
    out = {}
    for k, v in d.items():
        key = f"{prefix}{sep}{k}" if prefix else k
        out.update(flatten(v, key, sep) if isinstance(v, dict) else {key: v})
    return out


def to_otel_schema(message) -> dict:
    record = message.record
    extra = record["extra"]
    return {
        "resource": {
            "service.name": extra.get("service_name", "unknown"),
            "deployment.environment": extra.get("env", "production"),
        },
        "severity_number": record["level"].no,
        "severity_text": record["level"].name,
        "body": ANSI.sub("", record["message"]),
        "trace_id": extra.get("trace_id"),
        "span_id": extra.get("span_id"),
        "attributes": flatten(
            {k: v for k, v in extra.items() if k not in ("trace_id", "span_id")}
        ),
    }

The severity mapping deserves a note: Loguru's numeric levels (DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50) do not line up one-to-one with the OpenTelemetry severity_number range of 1–24, where INFO is 9 and ERROR is 17. If your backend enforces the OTel scale, add an explicit lookup table rather than passing record["level"].no through unchanged, or downstream severity-based alerting will misclassify records. The example above forwards the Loguru number for brevity; a production transform should translate it.

Step 4 — Isolate failures with a dead-letter fallback. A sink must never crash the host process. Wrap dispatch in try/except, retry transient errors with exponential backoff and jitter, and on exhaustion route the record to a local dead-letter file sink for later replay. The dead-letter logger must be a separate Loguru instance writing to a plain file; routing failures back through the same network sink that just failed produces an infinite retry storm. Lazily creating it on first failure, as below, also avoids paying for the file handle in the common case where the backend is healthy.

import random
import time

_dlq = None  # lazily created to avoid recursion at import


def resilient_sink(message, max_retries: int = 3) -> None:
    global _dlq
    payload = to_otel_schema(message)
    for attempt in range(max_retries):
        try:
            raise ConnectionError("backend unreachable")  # simulate dispatch
        except ConnectionError:
            if attempt == max_retries - 1:
                if _dlq is None:
                    from loguru import logger as _l
                    _l.add("dlq.jsonl", mode="a", serialize=True)
                    _dlq = _l
                _dlq.bind(**payload).warning("dispatch exhausted, routed to DLQ")
                return
            time.sleep((2 ** attempt) + random.uniform(0, 1))

Configuration options

Concern Mechanism Recommended setting
Dispatch threading enqueue on logger.add True for any network sink
Back-pressure bounded asyncio.Queue(maxsize=...) size to a few seconds of peak volume
Drop policy put_nowait plus QueueFull handler drop and count, never block
Severity mapping record["level"].no map to OTel severity_number 1–24
Failure isolation try/except around all I/O always; add a fallback sink
Retry exponential backoff with jitter cap at 3 attempts, then DLQ

For complete control over dispatch threading and graceful drain without writing the queue plumbing yourself, see how Loguru's own background worker behaves in async and non-blocking logging with Loguru enqueue. For where these sinks sit in the broader configuration surface, the Loguru Configuration and Sinks reference covers rotation, retention, and the level-routing topology a callable sink slots into.

Verification

Run the queue-backed example and confirm the dispatch line is emitted from the worker, not the caller, by observing it after the await asyncio.sleep. To assert the schema in CI, capture a transformed record and check the OTel fields.

from types import SimpleNamespace
from datetime import datetime, timezone

msg = SimpleNamespace(record={
    "time": datetime(2024, 5, 12, tzinfo=timezone.utc),
    "level": SimpleNamespace(no=20, name="INFO"),
    "name": "__main__",
    "message": "ok",
    "extra": {"trace_id": "0af7", "span_id": "b7ad", "user": "u1"},
})
out = to_otel_schema(msg)
assert out["severity_number"] == 20
assert out["trace_id"] == "0af7"
assert out["attributes"] == {"user": "u1"}
print("schema assertions passed")

Expected Output:

schema assertions passed

Common mistakes

  • Blocking the worker thread with synchronous HTTP inside the sink. Even with enqueue=True the callable runs on Loguru's queue thread, so a synchronous request throttles every record. Remediation: dispatch from your own async worker or a concurrent.futures.ThreadPoolExecutor.
  • Mutating the shared record dict. In-place edits corrupt the output of every other sink reading the same object. Remediation: reconstruct the payload; copy with dict(record["extra"]) before transforming.
  • Letting an exception escape the callable. Loguru catches it but writes it to stderr, turning the failure into a silent drop. Remediation: wrap the whole body in try/except Exception and route to a fallback or dead-letter sink.

Frequently Asked Questions

How do I handle sink failures without crashing the application?

Wrap all I/O in the sink callable with explicit try and except blocks, log failures to a secondary fallback sink, and never allow an exception to bubble past the sink boundary. Loguru catches escaped exceptions but turns them into silent stderr noise.

Can I route logs to multiple custom sinks simultaneously?

Yes. Call logger.add once per sink callable and give each its own level and filter. Each sink runs independently, so a slow or failing sink does not block the others when enqueue is enabled.

What is the performance impact of a custom sink versus a built-in one?

A custom sink adds negligible overhead when its I/O is decoupled through a queue or thread pool. A synchronous sink that performs network calls scales linearly with network latency and will throttle the whole application under load.

Does Loguru call my sink from a background thread when enqueue is true?

Yes. With enqueue set to true Loguru dispatches records to your callable from its internal queue thread, so the callable still must avoid long blocking calls or it becomes the bottleneck for every other record.