Skip to Content

Architecture

Detailed architecture of the Rhesis tracing system.

Component Architecture

SDK Components

OpenTelemetry Integration

The SDK wraps standard OpenTelemetry components:

SDK Tracer Setup
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Create tracer provider
provider = TracerProvider(resource=Resource.create({
    "service.name": "your-service",
    "deployment.environment": environment,
}))

# Add batch processor with OTLP exporter
span_processor = BatchSpanProcessor(
    RhesisOTLPExporter(api_key, base_url, project_id),
    max_queue_size=2048,        # Max spans in memory
    max_export_batch_size=512,  # Spans per HTTP request
    schedule_delay_millis=5000, # Export every 5 seconds
)
provider.add_span_processor(span_processor)

Decorator Flow

When a decorated function executes:

observe Decorator Flow
@wraps(func)
def wrapper(*args, **kwargs):
    tracer = trace.get_tracer(__name__)
    
    with tracer.start_as_current_span(
        name=final_span_name,
        kind=SpanKind.INTERNAL,
    ) as span:
        # Set attributes from decorator parameters
        span.set_attribute("function.name", func_name)
        for key, value in attributes.items():
            span.set_attribute(key, value)
        
        try:
            result = func(*args, **kwargs)
            span.set_status(Status(StatusCode.OK))
            return result
        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

OTLP Exporter

The custom exporter sends spans via HTTP POST:

OTLP Export
class RhesisOTLPExporter(SpanExporter):
    def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
        # Convert OTEL spans to SDK schemas
        sdk_spans = [self._convert_span(span) for span in spans]
        
        # Create batch
        batch = SpanBatch(spans=sdk_spans)
        
        # Send via HTTP POST
        response = self._session.post(
            f"{self.base_url}/telemetry/traces",
            json=batch.model_dump(mode="json"),
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=self._timeout,
        )
        
        return SpanExportResult.SUCCESS if response.ok else SpanExportResult.FAILURE

Backend Components

Ingestion Endpoint

The /telemetry/traces endpoint:

  1. Validates the OTLP JSON payload
  2. Extracts organization/project from API key
  3. Stores spans in PostgreSQL (bulk insert)
  4. Enqueues enrichment (async) or runs sync
  5. Triggers trace linking
Ingestion Flow
@router.post("/telemetry/traces")
async def ingest_traces(batch: SpanBatch, db: Session):
    # 1. Validate spans
    validated_spans = validate_spans(batch.spans)
    
    # 2. Store in database
    stored_spans = crud.create_trace_spans(db, validated_spans)
    
    # 3. Enrich (async or sync)
    if workers_available():
        enrich_trace_async.delay(trace_id, project_id)
    else:
        enricher.enrich_trace(trace_id, project_id)
    
    # 4. Link traces (if test context present)
    linking_service.link_traces_for_incoming_batch(stored_spans)
    
    return {"status": "ok", "count": len(stored_spans)}

Linking Service

The linking service connects traces to test results using a hybrid strategy:

Linking Service
class TraceLinkingService:
    def link_traces_for_test_result(
        self,
        test_run_id: str,
        test_id: str,
        test_result_id: str,
        organization_id: str,
    ):
        """Called after test result creation (catches slow tests)."""
        crud.update_traces_with_test_result_id(
            db=self.db,
            test_run_id=test_run_id,
            test_id=test_id,
            test_result_id=test_result_id,
            organization_id=organization_id,
        )
    
    def link_traces_for_incoming_batch(
        self,
        spans: List[Span],
        organization_id: str,
    ):
        """Called after span ingestion (catches fast tests)."""
        # Extract test context from span attributes
        for span in spans:
            test_run_id = span.attributes.get("rhesis.test.run_id")
            if test_run_id:
                # Find corresponding test_result
                test_result = find_test_result(test_run_id, span.trace_id)
                if test_result:
                    crud.update_traces_with_test_result_id(...)

Enrichment Service

Enrichment calculates costs and detects anomalies:

Enrichment
class TraceEnricher:
    def enrich_trace(self, trace_id: str, project_id: str):
        # Fetch all spans for trace
        spans = crud.get_spans_by_trace_id(trace_id)
        
        enriched_data = {
            "costs": self._calculate_costs(spans),
            "anomalies": self._detect_anomalies(spans),
            "metadata": self._extract_metadata(spans),
            "enriched_at": datetime.utcnow().isoformat(),
        }
        
        # Cache in enriched_data column
        crud.update_trace_enrichment(trace_id, enriched_data)
    
    def _calculate_costs(self, spans):
        """Use LiteLLM for token cost calculation."""
        total_cost = 0.0
        for span in spans:
            if span.span_name == "ai.llm.invoke":
                model = span.attributes.get("ai.model.name")
                tokens_in = span.attributes.get("ai.llm.tokens.input", 0)
                tokens_out = span.attributes.get("ai.llm.tokens.output", 0)
                cost = litellm.cost_per_token(model, tokens_in, tokens_out)
                total_cost += cost
        return {"total_cost_usd": total_cost}

Enrichment Strategy

Async-First with Graceful Fallback

Enrichment Decision
if workers_available():
    # Production path: Async enrichment
    enrich_trace_async.delay(trace_id, project_id)
    # → Ingestion returns immediately (~10ms)
    # → Enrichment happens in background (~50-100ms)
else:
    # Development path: Sync enrichment
    enricher.enrich_trace(trace_id, project_id)
    # → Enrichment happens inline (~50-100ms)
    # → Ingestion takes longer but still works

Benefits:

  • ✅ Production optimal (async)
  • ✅ Development simple (no workers needed)
  • ✅ Automatic detection (no config)
  • ✅ Graceful degradation (workers crash → sync fallback)

Enrichment Timing

ModeIngestionEnrichmentQuery
Async (production)~10msBackground (~50-100ms)~10ms
Sync (development)~60-110msInline~10ms

Both modes produce the same result, just different timing.

Query API

Trace Retrieval

Query Endpoint
@router.get("/traces/{trace_id}")
async def get_trace(trace_id: str, db: Session):
    # 1. Fetch all spans for trace_id
    spans = crud.get_spans_by_trace_id(db, trace_id)
    
    # 2. Build span tree (parent-child relationships)
    span_tree = build_span_tree(spans)
    
    # 3. Return with cached enrichment
    root_span = spans[0]
    return {
        "trace_id": trace_id,
        "spans": span_tree,
        "enriched_data": root_span.enriched_data,
        "test_result_id": root_span.test_result_id,
    }

Configuration

BatchSpanProcessor

SDK Configuration
BatchSpanProcessor(
    schedule_delay_millis=5000,    # Export every 5 seconds
    max_export_batch_size=512,     # Max spans per batch
    max_queue_size=2048,           # Queue size before forced export
)

Environment Variables

.env
# Backend
DATABASE_URL=postgresql://user:pass@localhost:5432/rhesis
CELERY_BROKER_URL=redis://localhost:6379/0

# Enrichment
USD_TO_EUR_RATE=0.92

# SDK
RHESIS_API_KEY=your_api_key
RHESIS_BACKEND_URL=http://localhost:8080

Celery Workers

Start Workers
# Start workers
celery -A rhesis.backend.app.main worker --concurrency=4

# Autoscale
celery -A rhesis.backend.app.main worker --autoscale=10,2