Implementing Custom Sinks in Loguru for Production Observability

This guide provides the exact configuration patterns for implementing custom sinks in Loguru to route structured telemetry data to external observability backends. It covers synchronous and asynchronous dispatch contracts, payload transformation, and fault-tolerant routing. For foundational architecture context, review the Modern Python Logging Libraries Deep Dive before deploying production sinks. Detailed sink lifecycle management and configuration parameters are documented in the Loguru Configuration and Sinks reference.

Key implementation objectives:

Sink Interface Contract & Message Dict Structure

Loguru invokes sinks synchronously by passing a single loguru.Message object. You must access the underlying record via message.record without mutating the shared reference. In-place modifications corrupt downstream sinks and violate thread-safety guarantees.

Extract context safely using .get() with explicit defaults. Always isolate the payload before serialization. This prevents race conditions when multiple workers process the same log event concurrently.

import json
from loguru import logger

def sync_otel_sink(message: object) -> None:
 """Synchronous sink with safe dict extraction and JSON serialization."""
 record = message.record
 payload = {
 "timestamp": record["time"].isoformat(),
 "severity_number": record["level"].no,
 "severity_text": record["level"].name,
 "module": record["name"],
 "message": str(message),
 "context": record.get("extra", {}),
 "trace_id": record.get("extra", {}).get("trace_id"),
 "span_id": record.get("extra", {}).get("span_id"),
 }
 try:
 serialized = json.dumps(payload, default=str, separators=(",", ":"))
 # Replace with actual synchronous dispatch (e.g., file write, local buffer)
 print(serialized)
 except Exception as e:
 # Isolate sink failure from the main application
 logger.error(f"Sink serialization failed: {e}")

# Configuration
logger.add(sync_otel_sink, format="{message}", level="INFO")
logger.info("Service initialized", trace_id="0af7651916cd43dd8448eb211c80319c", span_id="b7ad6b7169203331")

Expected Output:

{"timestamp":"2024-05-12T10:15:30.123456+00:00","severity_number":20,"severity_text":"INFO","module":"__main__","message":"Service initialized","context":{"trace_id":"0af7651916cd43dd8448eb211c80319c","span_id":"b7ad6b7169203331"},"trace_id":"0af7651916cd43dd8448eb211c80319c","span_id":"b7ad6b7169203331"}

Async Dispatch & Non-Blocking I/O

Direct network calls inside a sink block the calling thread. High-throughput services require queue-backed dispatch to maintain baseline latency. Use asyncio.Queue with a bounded maxsize to enforce backpressure.

Implement a stop() method to drain pending records during graceful shutdown. Loguru calls stop() automatically when the interpreter exits or when logger.remove() is invoked. This prevents data loss during pod termination or process restarts.

import asyncio
import json
from loguru import logger

class AsyncOTLPSink:
 def __init__(self, maxsize: int = 10000) -> None:
 self.queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
 self._task: asyncio.Task | None = None

 async def _worker(self) -> None:
 while True:
 record = await self.queue.get()
 try:
 # Simulate async HTTP/gRPC dispatch to OTLP collector
 payload = json.dumps({
 "timestamp": record["time"].isoformat(),
 "severity_text": record["level"].name,
 "message": str(record["message"]),
 "traceparent": record.get("extra", {}).get("traceparent"),
 }, default=str)
 # await http_client.post("http://otel-collector:4318/v1/logs", data=payload)
 print(f"[ASYNC_DISPATCH] {payload}")
 except Exception as e:
 logger.error(f"Async dispatch failed: {e}")
 finally:
 self.queue.task_done()

 def __call__(self, message: object) -> None:
 try:
 self.queue.put_nowait(message.record)
 except asyncio.QueueFull:
 logger.warning("Log queue full. Dropping message to preserve application latency.")

 async def stop(self) -> None:
 await self.queue.join()
 if self._task:
 self._task.cancel()
 await asyncio.gather(self._task, return_exceptions=True)

 def start(self) -> None:
 self._task = asyncio.create_task(self._worker())

# Initialize and attach sink
async_sink = AsyncOTLPSink(maxsize=5000)
logger.add(async_sink, format="{message}", level="DEBUG")

# Execution context
async def main() -> None:
 async_sink.start()
 logger.info("Async pipeline active", traceparent="00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01")
 await asyncio.sleep(0.1)
 await async_sink.stop()

if __name__ == "__main__":
 asyncio.run(main())

Expected Output:

[ASYNC_DISPATCH] {"timestamp": "2024-05-12T10:15:30.123456+00:00", "severity_text": "INFO", "message": "Async pipeline active", "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"}

Structured Payload Transformation

Observability backends require flat, vendor-agnostic JSON schemas. Map Loguru's internal severity levels to OpenTelemetry severity_number values (1-24). Extract W3C Trace Context headers from record["extra"] for distributed tracing correlation.

Strip ANSI escape sequences before serialization. Terminal formatting breaks JSON parsers and corrupts downstream ingestion pipelines. Flatten nested dictionaries using dot-notation for Elasticsearch or ClickHouse compatibility.

import re
from loguru import logger

ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")

def flatten_dict(d: dict, parent_key: str = "", sep: str = ".") -> dict:
 items = []
 for k, v in d.items():
 new_key = f"{parent_key}{sep}{k}" if parent_key else k
 if isinstance(v, dict):
 items.extend(flatten_dict(v, new_key, sep=sep).items())
 else:
 items.append((new_key, v))
 return dict(items)

def transform_to_otel_schema(message: object) -> dict:
 record = message.record
 extra = record.get("extra", {})
 
 return {
 "resource": {
 "service.name": extra.get("service_name", "unknown"),
 "deployment.environment": extra.get("env", "production"),
 },
 "severity_number": record["level"].no,
 "severity_text": record["level"].name,
 "body": ANSI_ESCAPE.sub("", str(message)),
 "trace_id": extra.get("trace_id"),
 "span_id": extra.get("span_id"),
 "attributes": flatten_dict({k: v for k, v in extra.items() if k not in ("trace_id", "span_id")}),
 }

Error Isolation & Dead-Letter Routing

Sink failures must never crash the host process. Wrap all external dispatch calls in explicit try/except blocks. Catch ConnectionRefusedError, asyncio.TimeoutError, and HTTP 5xx responses explicitly.

Implement a fallback file sink when the primary endpoint becomes unreachable. Route dropped records to a local dead-letter queue (DLQ) for asynchronous replay. Apply exponential backoff with jitter for retryable network errors.

import time
import random
from loguru import logger

def resilient_sink(message: object, max_retries: int = 3) -> None:
 payload = transform_to_otel_schema(message)
 for attempt in range(max_retries):
 try:
 # Simulate network dispatch
 raise ConnectionError("Backend unreachable")
 except ConnectionError:
 if attempt == max_retries - 1:
 logger.add("dlq_logs.jsonl", format="{message}", mode="a")
 logger.warning("Max retries exceeded. Payload routed to DLQ.")
 return
 # Exponential backoff with jitter
 delay = (2 ** attempt) + random.uniform(0, 1)
 time.sleep(delay)

Common Mistakes & Immediate Remediation

Blocking the main thread with synchronous HTTP calls inside the sink Loguru invokes sinks synchronously. Direct network requests halt application execution under load. Remediation: Offload I/O to asyncio.to_thread() or a concurrent.futures.ThreadPoolExecutor. Use queue-backed dispatch to decouple logging from network latency.

Mutating the incoming message record dictionary In-place modifications to the record dict corrupt downstream sinks sharing the same reference. Remediation: Always copy or reconstruct the payload. Use record.copy() or dictionary unpacking before transformation.

Uncaught exceptions propagating out of the sink callable Loguru catches sink exceptions internally but logs them to stderr. Failures become silent drops. Remediation: Wrap the entire sink body in try/except Exception. Log to a secondary fallback sink and implement circuit breakers for external endpoints.

FAQ

How do I handle sink failures without crashing the application? Wrap all I/O operations in the sink callable with explicit try/except blocks. Log failures to a secondary fallback sink and implement retry logic with circuit breakers for external endpoints. Never allow exceptions to bubble past the sink boundary.

Can I route logs to multiple custom sinks simultaneously? Yes. Call logger.add() multiple times with different sink callables. Apply level-specific filters (e.g., logger.add(sink_a, level="DEBUG"), logger.add(sink_b, level="ERROR")). Each sink operates independently in its own execution context.

What is the performance impact of custom sinks versus built-in ones? Custom sinks introduce minimal overhead if I/O is decoupled via queues or thread pools. Synchronous sinks scale linearly with I/O latency. Async dispatch maintains baseline application throughput by offloading serialization and network calls to background workers.