Implementing Custom Sinks in Loguru for Production Observability
You need to route structured Loguru records to an external backend such as an OTLP collector or a log aggregator without blocking request threads or losing events when the backend is unreachable. This guide covers the exact callable contract, schema transformation, queue-backed dispatch, and dead-letter routing. It builds on the Loguru Configuration and Sinks reference and is part of the Modern Python Logging Libraries Deep Dive.
Prerequisites
Pin Loguru and an async HTTP client for the OTLP example. The transformation example needs only the standard library.
pip install \
"loguru>=0.7.0,<0.8.0" \
"httpx>=0.27.0,<1.0.0"
Set the collector endpoint through the environment so the sink stays deployment-agnostic.
export OTEL_EXPORTER_OTLP_LOGS_ENDPOINT="http://otel-collector:4318/v1/logs"
Implementation
A custom sink is the right tool whenever the destination is not a file or a stream: an HTTP endpoint, a message broker, a database, or any backend that needs a schema Loguru's built-in serializer does not produce. The contract is deliberately minimal — a callable taking one Message — but that minimalism puts every concern back on you: threading, back-pressure, schema, retries, and failure isolation. The four steps below address each in turn, and they compose: the queue worker in step two dispatches the schema from step three and falls back to the dead-letter path from step four.
Step 1 — Match the callable contract. Loguru invokes a sink synchronously with a single Message argument. Read the record through message.record and never mutate it in place; the same record object is shared with every other sink, so an in-place change corrupts their output. Extract context defensively with .get and explicit defaults.
import json
from loguru import logger
def sync_otel_sink(message) -> None:
"""Synchronous sink: safe extraction, flat JSON, isolated failures."""
record = message.record
extra = record["extra"]
payload = {
"timestamp": record["time"].isoformat(),
"severity_number": record["level"].no,
"severity_text": record["level"].name,
"module": record["name"],
"body": record["message"],
"trace_id": extra.get("trace_id"),
"span_id": extra.get("span_id"),
}
try:
print(json.dumps(payload, default=str, separators=(",", ":")))
except Exception as exc: # never let the sink crash the caller
import sys
print(f"sink serialization failed: {exc}", file=sys.stderr)
logger.remove()
logger.add(sync_otel_sink, level="INFO", enqueue=True)
logger.bind(
trace_id="0af7651916cd43dd8448eb211c80319c",
span_id="b7ad6b7169203331",
).info("Service initialized")
Expected Output:
{"timestamp":"2024-05-12T10:15:30.123456+00:00","severity_number":20,"severity_text":"INFO","module":"__main__","body":"Service initialized","trace_id":"0af7651916cd43dd8448eb211c80319c","span_id":"b7ad6b7169203331"}
Step 2 — Decouple network I/O with a queue-backed worker. A direct HTTP call inside the sink blocks the calling thread for the full round trip. Push the record onto a bounded asyncio.Queue instead and dispatch from a background worker, which enforces back-pressure and keeps logging off the request path. Provide an explicit stop coroutine to drain pending records at shutdown; Loguru does not call it for you.
import asyncio
import json
from loguru import logger
class AsyncOTLPSink:
"""Enqueues records; a background worker dispatches them."""
def __init__(self, maxsize: int = 10000) -> None:
self.queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
self._task: asyncio.Task | None = None
async def _worker(self) -> None:
while True:
record = await self.queue.get()
try:
payload = json.dumps({
"timestamp": record["time"].isoformat(),
"severity_text": record["level"].name,
"body": record["message"],
"traceparent": record["extra"].get("traceparent"),
}, default=str)
# await http_client.post(endpoint, content=payload)
print(f"[DISPATCH] {payload}")
except Exception as exc:
import sys
print(f"async dispatch failed: {exc}", file=sys.stderr)
finally:
self.queue.task_done()
def __call__(self, message) -> None:
"""Called by Loguru; enqueues without blocking, drops when full."""
try:
self.queue.put_nowait(message.record)
except asyncio.QueueFull:
import sys
print("log queue full, dropping message", file=sys.stderr)
def start(self) -> None:
self._task = asyncio.create_task(self._worker())
async def stop(self) -> None:
await self.queue.join()
if self._task:
self._task.cancel()
await asyncio.gather(self._task, return_exceptions=True)
async def main() -> None:
sink = AsyncOTLPSink(maxsize=5000)
logger.remove()
logger.add(sink, level="DEBUG")
sink.start()
logger.info(
"Async pipeline active",
traceparent="00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
)
await asyncio.sleep(0.1)
await sink.stop()
if __name__ == "__main__":
asyncio.run(main())
Expected Output:
[DISPATCH] {"timestamp": "2024-05-12T10:15:30.123456+00:00", "severity_text": "INFO", "body": "Async pipeline active", "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"}
Step 3 — Transform to a vendor-agnostic schema. Aggregators want flat JSON. Map Loguru's severity number directly to the OpenTelemetry severity_number range, strip ANSI escape sequences that break strict parsers, and flatten nested context with dot notation for Elasticsearch or ClickHouse.
import re
from loguru import logger
ANSI = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def flatten(d: dict, prefix: str = "", sep: str = ".") -> dict:
out = {}
for k, v in d.items():
key = f"{prefix}{sep}{k}" if prefix else k
out.update(flatten(v, key, sep) if isinstance(v, dict) else {key: v})
return out
def to_otel_schema(message) -> dict:
record = message.record
extra = record["extra"]
return {
"resource": {
"service.name": extra.get("service_name", "unknown"),
"deployment.environment": extra.get("env", "production"),
},
"severity_number": record["level"].no,
"severity_text": record["level"].name,
"body": ANSI.sub("", record["message"]),
"trace_id": extra.get("trace_id"),
"span_id": extra.get("span_id"),
"attributes": flatten(
{k: v for k, v in extra.items() if k not in ("trace_id", "span_id")}
),
}
The severity mapping deserves a note: Loguru's numeric levels (DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50) do not line up one-to-one with the OpenTelemetry severity_number range of 1–24, where INFO is 9 and ERROR is 17. If your backend enforces the OTel scale, add an explicit lookup table rather than passing record["level"].no through unchanged, or downstream severity-based alerting will misclassify records. The example above forwards the Loguru number for brevity; a production transform should translate it.
Step 4 — Isolate failures with a dead-letter fallback. A sink must never crash the host process. Wrap dispatch in try/except, retry transient errors with exponential backoff and jitter, and on exhaustion route the record to a local dead-letter file sink for later replay. The dead-letter logger must be a separate Loguru instance writing to a plain file; routing failures back through the same network sink that just failed produces an infinite retry storm. Lazily creating it on first failure, as below, also avoids paying for the file handle in the common case where the backend is healthy.
import random
import time
_dlq = None # lazily created to avoid recursion at import
def resilient_sink(message, max_retries: int = 3) -> None:
global _dlq
payload = to_otel_schema(message)
for attempt in range(max_retries):
try:
raise ConnectionError("backend unreachable") # simulate dispatch
except ConnectionError:
if attempt == max_retries - 1:
if _dlq is None:
from loguru import logger as _l
_l.add("dlq.jsonl", mode="a", serialize=True)
_dlq = _l
_dlq.bind(**payload).warning("dispatch exhausted, routed to DLQ")
return
time.sleep((2 ** attempt) + random.uniform(0, 1))
Configuration options
| Concern | Mechanism | Recommended setting |
|---|---|---|
| Dispatch threading | enqueue on logger.add |
True for any network sink |
| Back-pressure | bounded asyncio.Queue(maxsize=...) |
size to a few seconds of peak volume |
| Drop policy | put_nowait plus QueueFull handler |
drop and count, never block |
| Severity mapping | record["level"].no |
map to OTel severity_number 1–24 |
| Failure isolation | try/except around all I/O |
always; add a fallback sink |
| Retry | exponential backoff with jitter | cap at 3 attempts, then DLQ |
For complete control over dispatch threading and graceful drain without writing the queue plumbing yourself, see how Loguru's own background worker behaves in async and non-blocking logging with Loguru enqueue. For where these sinks sit in the broader configuration surface, the Loguru Configuration and Sinks reference covers rotation, retention, and the level-routing topology a callable sink slots into.
Verification
Run the queue-backed example and confirm the dispatch line is emitted from the worker, not the caller, by observing it after the await asyncio.sleep. To assert the schema in CI, capture a transformed record and check the OTel fields.
from types import SimpleNamespace
from datetime import datetime, timezone
msg = SimpleNamespace(record={
"time": datetime(2024, 5, 12, tzinfo=timezone.utc),
"level": SimpleNamespace(no=20, name="INFO"),
"name": "__main__",
"message": "ok",
"extra": {"trace_id": "0af7", "span_id": "b7ad", "user": "u1"},
})
out = to_otel_schema(msg)
assert out["severity_number"] == 20
assert out["trace_id"] == "0af7"
assert out["attributes"] == {"user": "u1"}
print("schema assertions passed")
Expected Output:
schema assertions passed
Common mistakes
- Blocking the worker thread with synchronous HTTP inside the sink. Even with
enqueue=Truethe callable runs on Loguru's queue thread, so a synchronous request throttles every record. Remediation: dispatch from your own async worker or aconcurrent.futures.ThreadPoolExecutor. - Mutating the shared
recorddict. In-place edits corrupt the output of every other sink reading the same object. Remediation: reconstruct the payload; copy withdict(record["extra"])before transforming. - Letting an exception escape the callable. Loguru catches it but writes it to stderr, turning the failure into a silent drop. Remediation: wrap the whole body in
try/except Exceptionand route to a fallback or dead-letter sink.
Frequently Asked Questions
How do I handle sink failures without crashing the application?
Wrap all I/O in the sink callable with explicit try and except blocks, log failures to a secondary fallback sink, and never allow an exception to bubble past the sink boundary. Loguru catches escaped exceptions but turns them into silent stderr noise.
Can I route logs to multiple custom sinks simultaneously?
Yes. Call logger.add once per sink callable and give each its own level and filter. Each sink runs independently, so a slow or failing sink does not block the others when enqueue is enabled.
What is the performance impact of a custom sink versus a built-in one?
A custom sink adds negligible overhead when its I/O is decoupled through a queue or thread pool. A synchronous sink that performs network calls scales linearly with network latency and will throttle the whole application under load.
Does Loguru call my sink from a background thread when enqueue is true?
Yes. With enqueue set to true Loguru dispatches records to your callable from its internal queue thread, so the callable still must avoid long blocking calls or it becomes the bottleneck for every other record.