How to Configure Python Logging for Production
Production logging fails in three predictable ways: blocking I/O that spikes request latency, unstructured text that aggregators silently drop, and missing correlation IDs that make an incident untraceable. This guide gives backend engineers a complete, copy-ready configuration that closes all three gaps. It is a focused task within Log Levels and Severity Mapping and part of the broader Python Logging Fundamentals and Structured Data reference.
Prerequisites
The configuration below uses only the standard library, so no third-party formatter is strictly required. For teams that prefer a maintained JSON formatter over a hand-rolled one, pin it explicitly:
pip install "python-json-logger>=2.0.0,<3.0.0"
Set the deployment-time environment variables your config will read. Keeping the level and log directory in the environment lets the same image run unchanged across staging and production:
export LOG_LEVEL="INFO"
export LOG_DIR="/var/log/app"
export OTEL_SERVICE_NAME="payment-service"
This guide assumes Python 3.11 or newer; the taskName attribute referenced in the formatter exists from 3.12 and is handled defensively for earlier versions.
Implementation
Build the configuration in five steps, each one closing a specific production failure mode.
-
Emit structured JSON. Replace the default human-readable formatter with one that produces a single JSON object per line. Standardize field names across services and map Python severity to the OpenTelemetry
severity_numberso a log line from any service is comparable. Timestamps must be UTC and ISO 8601 so cross-region correlation never depends on a server's local clock. -
Resolve context at emission time. A
logging.Filterreads the active request identifiers fromcontextvarsand copies them onto theLogRecord. Resolving insidefilter()rather than caching on the handler guarantees the value reflects the coroutine or thread that is actually emitting. The mechanics of safe context propagation are covered in using contextvars for request tracing. -
Isolate I/O behind a queue. A
QueueHandleraccepts records on the hot path and returns immediately. AQueueListenerrunning on a background thread drains the queue into the real sinks, so a slow disk or a stalled collector never propagates back into request latency. -
Express the graph declaratively. Encode the handler topology as a
dictConfigdictionary. This is idempotent: re-running it on a worker restart rebuilds the same graph rather than stacking duplicate handlers. For the full schema and per-environment overrides, see logging configuration and dictConfig. -
Allow runtime level changes. Verbosity must be adjustable during an incident without a redeploy. A small validated wrapper around
setLevelplus an audit logger gives you that control without inviting accidental log storms.
The formatter and filter together produce the structured, correlated payload:
import json
import logging
import sys
from contextvars import ContextVar
from datetime import datetime, timezone
# Async-safe identifiers populated by request middleware.
trace_id_ctx: ContextVar[str] = ContextVar("trace_id", default="0" * 32)
span_id_ctx: ContextVar[str] = ContextVar("span_id", default="0" * 16)
# Standard LogRecord attributes we never want duplicated into the JSON body.
_RESERVED = {
"msg", "args", "exc_info", "exc_text", "stack_info", "levelno",
"pathname", "filename", "module", "funcName", "created", "msecs",
"relativeCreated", "thread", "threadName", "processName", "process",
"lineno", "taskName",
}
class OTelContextFilter(logging.Filter):
"""Copy request-scoped identifiers onto each record at emission time."""
def filter(self, record: logging.LogRecord) -> bool:
record.trace_id = trace_id_ctx.get()
record.span_id = span_id_ctx.get()
record.severity_number = record.levelno # OTel-compatible integer
return True
class OTelJSONFormatter(logging.Formatter):
"""Emit one JSON object per line with stable, OTel-aligned field names."""
def format(self, record: logging.LogRecord) -> str:
payload = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"severity_text": record.levelname,
"severity_number": getattr(record, "severity_number", record.levelno),
"logger": record.name,
"message": record.getMessage(),
"trace_id": getattr(record, "trace_id", ""),
"span_id": getattr(record, "span_id", ""),
}
if record.exc_info:
payload["exception"] = self.formatException(record.exc_info)
# Promote anything passed via logger.info(..., extra={...}).
for key, value in record.__dict__.items():
if key not in payload and key not in _RESERVED and not key.startswith("_"):
payload[key] = value
return json.dumps(payload, default=str)
With the formatter defined, the queue pipeline keeps I/O off the request thread:
import logging
import os
import queue
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler
def build_listener() -> tuple[QueueListener, QueueHandler]:
"""Return a started listener and the handler the loggers should attach."""
log_queue: queue.Queue = queue.Queue(maxsize=10_000)
queue_handler = QueueHandler(log_queue)
stream = logging.StreamHandler(sys.stdout)
stream.setFormatter(OTelJSONFormatter())
stream.addFilter(OTelContextFilter())
rotating = RotatingFileHandler(
os.path.join(os.environ.get("LOG_DIR", "."), "app.log"),
maxBytes=10_000_000,
backupCount=5,
encoding="utf-8",
)
rotating.setFormatter(OTelJSONFormatter())
rotating.addFilter(OTelContextFilter())
# respect_handler_level lets each sink keep its own threshold.
listener = QueueListener(
log_queue, stream, rotating, respect_handler_level=True
)
listener.start()
return listener, queue_handler
def configure() -> QueueListener:
listener, queue_handler = build_listener()
root = logging.getLogger()
root.handlers.clear() # idempotent: avoid stacking on worker restart
root.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
root.addHandler(queue_handler)
return listener
if __name__ == "__main__":
listener = configure()
trace_id_ctx.set("4bf92f3577b34da6a3ce929d0e0e4736")
span_id_ctx.set("00f067aa0ba902b7")
logging.getLogger("payment.service").info(
"Transaction processed", extra={"amount": 150.0}
)
listener.stop() # flush the queue before exit
Expected Output:
{"timestamp": "2026-06-19T08:14:22.105312+00:00", "severity_text": "INFO", "severity_number": 20, "logger": "payment.service", "message": "Transaction processed", "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", "span_id": "00f067aa0ba902b7", "amount": 150.0}
Unhandled exceptions must reach the same JSON stream rather than escaping to a raw stderr trace that the aggregator cannot parse. Register an excepthook once at startup:
import sys
def log_uncaught(exc_type, exc_value, exc_tb):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_tb)
return
logging.getLogger("uncaught").critical(
"Unhandled exception", exc_info=(exc_type, exc_value, exc_tb)
)
sys.excepthook = log_uncaught
Finally, expose runtime verbosity control. The wrapper validates the level name, applies a cooldown, and records the change to a dedicated audit logger so the adjustment is never silent:
import logging
import time
_VALID = {"CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"}
_audit = logging.getLogger("logging.audit")
_last_change = 0.0
def set_runtime_level(name: str, level: str, actor: str) -> None:
level = level.upper()
if level not in _VALID:
raise ValueError(f"rejected invalid level {level!r}")
global _last_change
if time.monotonic() - _last_change < 5.0:
raise RuntimeError("level change rejected: cooldown active")
logging.getLogger(name).setLevel(level)
_last_change = time.monotonic()
_audit.warning(
"log level changed", extra={"target": name, "level": level, "actor": actor}
)
Configuration Options
| Setting | Where | Recommended production value |
|---|---|---|
| Root level | root.setLevel / LOG_LEVEL env |
INFO; raise to DEBUG only via the runtime endpoint |
| Queue capacity | queue.Queue(maxsize=...) |
10_000; tune to memory budget and burst size |
| File rotation | RotatingFileHandler(maxBytes, backupCount) |
10_000_000 bytes, 5 backups |
| Handler level isolation | QueueListener(respect_handler_level=True) |
True so each sink keeps its own threshold |
| Timestamp format | formatter | UTC ISO 8601, never local time |
| Severity mapping | filter | record.levelno as severity_number |
| Listener shutdown | listener.stop() |
call once at process teardown, after all logging |
Verification
Run the module and confirm the record is a single valid JSON line carrying the trace context. A quick assertion catches regressions in the field contract:
import json
line = '{"timestamp": "2026-06-19T08:14:22.105312+00:00", "severity_text": "INFO", "severity_number": 20, "logger": "payment.service", "message": "Transaction processed", "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", "span_id": "00f067aa0ba902b7", "amount": 150.0}'
record = json.loads(line)
assert record["severity_number"] == 20
assert len(record["trace_id"]) == 32
assert record["amount"] == 150.0
print("contract OK")
Expected Output:
contract OK
Under load, watch the queue depth and the file rotation count. A queue that stays near capacity means the listener cannot keep up with the sinks and you should either add a faster sink or shed DEBUG records.
Common Mistakes
logging.basicConfig() is a silent no-op after handlers exist. Once the root logger has any handler, basicConfig does nothing and returns without error, so your carefully tuned formatter never attaches. Build the graph with dictConfig or explicit addHandler calls, and clear existing handlers first to stay idempotent.
Stopping the listener inside the logging loop loses records. Calling listener.stop() before the application has finished logging drops every record still in the queue and routes later calls to a dead listener. Stop it exactly once, during process teardown, after all logging is complete.
Plain print() interleaved with JSON breaks the aggregator. A single stray print writes an unstructured line onto the same stream, and strict JSON parsers drop the malformed record and sometimes the surrounding batch. Route everything through logging and send tracebacks via logger.exception so they stay inside the JSON envelope.
Frequently Asked Questions
How do I prevent log storms during incident response?
Combine a rate-limited level-control endpoint with a queue depth guard. When the queue exceeds about 80 percent capacity, drop DEBUG and INFO records and keep WARNING and above. Record every verbosity change in a separate audit logger so the change is reconstructable later.
Should I use dictConfig or programmatic setup?
Prefer dictConfig for the static handler graph because it is declarative and idempotent across container restarts. Use a small amount of programmatic code only for the dynamic parts, such as starting a QueueListener or registering an excepthook, that dictConfig cannot express cleanly.
How do I log safely from async code?
Resolve request metadata from contextvars rather than passing it through call signatures, and wrap blocking handlers in a QueueHandler so the event loop never waits on I/O. Never perform synchronous network calls inside a Filter or Formatter.
Why are my JSON logs being dropped by the aggregator?
Almost always because plain print output or a non-JSON handler is interleaved on the same stream. Route everything through logging, give every handler the JSON formatter, and send tracebacks through logger.exception so they stay inside the JSON envelope.