How to Configure Python Logging for Production

Production logging fails in three predictable ways: blocking I/O that spikes request latency, unstructured text that aggregators silently drop, and missing correlation IDs that make an incident untraceable. This guide gives backend engineers a complete, copy-ready configuration that closes all three gaps. It is a focused task within Log Levels and Severity Mapping and part of the broader Python Logging Fundamentals and Structured Data reference.

Production logging pipeline A request thread emits a record that passes through a context filter and a JSON formatter into a bounded queue, which a background listener drains to a rotating file and stdout. Request thread Filter + JSON format Bounded queue Rotating file stdout / collector listener drains
The request thread hands records to a bounded queue; a background listener does all real I/O.

Prerequisites

The configuration below uses only the standard library, so no third-party formatter is strictly required. For teams that prefer a maintained JSON formatter over a hand-rolled one, pin it explicitly:

pip install "python-json-logger>=2.0.0,<3.0.0"

Set the deployment-time environment variables your config will read. Keeping the level and log directory in the environment lets the same image run unchanged across staging and production:

export LOG_LEVEL="INFO"
export LOG_DIR="/var/log/app"
export OTEL_SERVICE_NAME="payment-service"

This guide assumes Python 3.11 or newer; the taskName attribute referenced in the formatter exists from 3.12 and is handled defensively for earlier versions.

Implementation

Build the configuration in five steps, each one closing a specific production failure mode.

  1. Emit structured JSON. Replace the default human-readable formatter with one that produces a single JSON object per line. Standardize field names across services and map Python severity to the OpenTelemetry severity_number so a log line from any service is comparable. Timestamps must be UTC and ISO 8601 so cross-region correlation never depends on a server's local clock.

  2. Resolve context at emission time. A logging.Filter reads the active request identifiers from contextvars and copies them onto the LogRecord. Resolving inside filter() rather than caching on the handler guarantees the value reflects the coroutine or thread that is actually emitting. The mechanics of safe context propagation are covered in using contextvars for request tracing.

  3. Isolate I/O behind a queue. A QueueHandler accepts records on the hot path and returns immediately. A QueueListener running on a background thread drains the queue into the real sinks, so a slow disk or a stalled collector never propagates back into request latency.

  4. Express the graph declaratively. Encode the handler topology as a dictConfig dictionary. This is idempotent: re-running it on a worker restart rebuilds the same graph rather than stacking duplicate handlers. For the full schema and per-environment overrides, see logging configuration and dictConfig.

  5. Allow runtime level changes. Verbosity must be adjustable during an incident without a redeploy. A small validated wrapper around setLevel plus an audit logger gives you that control without inviting accidental log storms.

The formatter and filter together produce the structured, correlated payload:

import json
import logging
import sys
from contextvars import ContextVar
from datetime import datetime, timezone

# Async-safe identifiers populated by request middleware.
trace_id_ctx: ContextVar[str] = ContextVar("trace_id", default="0" * 32)
span_id_ctx: ContextVar[str] = ContextVar("span_id", default="0" * 16)

# Standard LogRecord attributes we never want duplicated into the JSON body.
_RESERVED = {
    "msg", "args", "exc_info", "exc_text", "stack_info", "levelno",
    "pathname", "filename", "module", "funcName", "created", "msecs",
    "relativeCreated", "thread", "threadName", "processName", "process",
    "lineno", "taskName",
}


class OTelContextFilter(logging.Filter):
    """Copy request-scoped identifiers onto each record at emission time."""

    def filter(self, record: logging.LogRecord) -> bool:
        record.trace_id = trace_id_ctx.get()
        record.span_id = span_id_ctx.get()
        record.severity_number = record.levelno  # OTel-compatible integer
        return True


class OTelJSONFormatter(logging.Formatter):
    """Emit one JSON object per line with stable, OTel-aligned field names."""

    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "timestamp": datetime.fromtimestamp(
                record.created, tz=timezone.utc
            ).isoformat(),
            "severity_text": record.levelname,
            "severity_number": getattr(record, "severity_number", record.levelno),
            "logger": record.name,
            "message": record.getMessage(),
            "trace_id": getattr(record, "trace_id", ""),
            "span_id": getattr(record, "span_id", ""),
        }
        if record.exc_info:
            payload["exception"] = self.formatException(record.exc_info)
        # Promote anything passed via logger.info(..., extra={...}).
        for key, value in record.__dict__.items():
            if key not in payload and key not in _RESERVED and not key.startswith("_"):
                payload[key] = value
        return json.dumps(payload, default=str)

With the formatter defined, the queue pipeline keeps I/O off the request thread:

import logging
import os
import queue
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler


def build_listener() -> tuple[QueueListener, QueueHandler]:
    """Return a started listener and the handler the loggers should attach."""
    log_queue: queue.Queue = queue.Queue(maxsize=10_000)
    queue_handler = QueueHandler(log_queue)

    stream = logging.StreamHandler(sys.stdout)
    stream.setFormatter(OTelJSONFormatter())
    stream.addFilter(OTelContextFilter())

    rotating = RotatingFileHandler(
        os.path.join(os.environ.get("LOG_DIR", "."), "app.log"),
        maxBytes=10_000_000,
        backupCount=5,
        encoding="utf-8",
    )
    rotating.setFormatter(OTelJSONFormatter())
    rotating.addFilter(OTelContextFilter())

    # respect_handler_level lets each sink keep its own threshold.
    listener = QueueListener(
        log_queue, stream, rotating, respect_handler_level=True
    )
    listener.start()
    return listener, queue_handler


def configure() -> QueueListener:
    listener, queue_handler = build_listener()
    root = logging.getLogger()
    root.handlers.clear()  # idempotent: avoid stacking on worker restart
    root.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
    root.addHandler(queue_handler)
    return listener


if __name__ == "__main__":
    listener = configure()
    trace_id_ctx.set("4bf92f3577b34da6a3ce929d0e0e4736")
    span_id_ctx.set("00f067aa0ba902b7")
    logging.getLogger("payment.service").info(
        "Transaction processed", extra={"amount": 150.0}
    )
    listener.stop()  # flush the queue before exit

Expected Output:

{"timestamp": "2026-06-19T08:14:22.105312+00:00", "severity_text": "INFO", "severity_number": 20, "logger": "payment.service", "message": "Transaction processed", "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", "span_id": "00f067aa0ba902b7", "amount": 150.0}

Unhandled exceptions must reach the same JSON stream rather than escaping to a raw stderr trace that the aggregator cannot parse. Register an excepthook once at startup:

import sys


def log_uncaught(exc_type, exc_value, exc_tb):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_tb)
        return
    logging.getLogger("uncaught").critical(
        "Unhandled exception", exc_info=(exc_type, exc_value, exc_tb)
    )


sys.excepthook = log_uncaught

Finally, expose runtime verbosity control. The wrapper validates the level name, applies a cooldown, and records the change to a dedicated audit logger so the adjustment is never silent:

import logging
import time

_VALID = {"CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"}
_audit = logging.getLogger("logging.audit")
_last_change = 0.0


def set_runtime_level(name: str, level: str, actor: str) -> None:
    level = level.upper()
    if level not in _VALID:
        raise ValueError(f"rejected invalid level {level!r}")
    global _last_change
    if time.monotonic() - _last_change < 5.0:
        raise RuntimeError("level change rejected: cooldown active")
    logging.getLogger(name).setLevel(level)
    _last_change = time.monotonic()
    _audit.warning(
        "log level changed", extra={"target": name, "level": level, "actor": actor}
    )

Configuration Options

Setting Where Recommended production value
Root level root.setLevel / LOG_LEVEL env INFO; raise to DEBUG only via the runtime endpoint
Queue capacity queue.Queue(maxsize=...) 10_000; tune to memory budget and burst size
File rotation RotatingFileHandler(maxBytes, backupCount) 10_000_000 bytes, 5 backups
Handler level isolation QueueListener(respect_handler_level=True) True so each sink keeps its own threshold
Timestamp format formatter UTC ISO 8601, never local time
Severity mapping filter record.levelno as severity_number
Listener shutdown listener.stop() call once at process teardown, after all logging

Verification

Run the module and confirm the record is a single valid JSON line carrying the trace context. A quick assertion catches regressions in the field contract:

import json

line = '{"timestamp": "2026-06-19T08:14:22.105312+00:00", "severity_text": "INFO", "severity_number": 20, "logger": "payment.service", "message": "Transaction processed", "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", "span_id": "00f067aa0ba902b7", "amount": 150.0}'
record = json.loads(line)
assert record["severity_number"] == 20
assert len(record["trace_id"]) == 32
assert record["amount"] == 150.0
print("contract OK")

Expected Output:

contract OK

Under load, watch the queue depth and the file rotation count. A queue that stays near capacity means the listener cannot keep up with the sinks and you should either add a faster sink or shed DEBUG records.

Common Mistakes

logging.basicConfig() is a silent no-op after handlers exist. Once the root logger has any handler, basicConfig does nothing and returns without error, so your carefully tuned formatter never attaches. Build the graph with dictConfig or explicit addHandler calls, and clear existing handlers first to stay idempotent.

Stopping the listener inside the logging loop loses records. Calling listener.stop() before the application has finished logging drops every record still in the queue and routes later calls to a dead listener. Stop it exactly once, during process teardown, after all logging is complete.

Plain print() interleaved with JSON breaks the aggregator. A single stray print writes an unstructured line onto the same stream, and strict JSON parsers drop the malformed record and sometimes the surrounding batch. Route everything through logging and send tracebacks via logger.exception so they stay inside the JSON envelope.

Frequently Asked Questions

How do I prevent log storms during incident response?

Combine a rate-limited level-control endpoint with a queue depth guard. When the queue exceeds about 80 percent capacity, drop DEBUG and INFO records and keep WARNING and above. Record every verbosity change in a separate audit logger so the change is reconstructable later.

Should I use dictConfig or programmatic setup?

Prefer dictConfig for the static handler graph because it is declarative and idempotent across container restarts. Use a small amount of programmatic code only for the dynamic parts, such as starting a QueueListener or registering an excepthook, that dictConfig cannot express cleanly.

How do I log safely from async code?

Resolve request metadata from contextvars rather than passing it through call signatures, and wrap blocking handlers in a QueueHandler so the event loop never waits on I/O. Never perform synchronous network calls inside a Filter or Formatter.

Why are my JSON logs being dropped by the aggregator?

Almost always because plain print output or a non-JSON handler is interleaved on the same stream. Route everything through logging, give every handler the JSON formatter, and send tracebacks through logger.exception so they stay inside the JSON envelope.