Skip to Content
ContributeWorkerTrace Ingestion Pipeline

Trace Ingestion Pipeline

This document explains how traces are processed after arriving via POST /telemetry/traces, covering the full pipeline from storage through enrichment and metric evaluation.

Pipeline Overview

When traces are ingested, the backend stores spans immediately, then dispatches post-ingestion work (linking, enrichment, evaluation) either asynchronously via Celery or synchronously as a fallback.

Phase 1: Span Storage

The telemetry router receives an OTELTraceBatch payload containing one or more spans. Before storage, the backend injects any pending mapped output into span attributes (for SDK endpoints where output arrives asynchronously).

Spans are then stored in the Trace table via crud.create_trace_spans(). Each span record includes the OTEL trace ID, span ID, parent span ID, timing data, attributes, and tenant context (organization and project).

Phase 2: Post-Ingestion Dispatch

After storage, the router checks for Celery worker availability using a TTL-cached ping (300-second cache to avoid repeated 3-second Celery inspect calls).

Async Path (Workers Available)

When workers are available, the router dispatches a single post_ingest_link Celery task that handles all post-ingestion work:

routers/telemetry.py
if check_workers_available():
    post_ingest_link.delay(
        stored_span_ids=stored_span_ids,
        unique_trace_ids=unique_trace_ids,
        organization_id=organization_id,
        project_id=str(project_id),
        test_run_id=...,
        test_id=...,
        test_configuration_id=...,
    )

Sync Fallback (No Workers)

Without workers, the router runs linking and enrichment synchronously in the same request. Metric evaluation is skipped because it involves LLM calls that should not block API responses.

Phase 3: Linking

The post_ingest_link task performs three types of linking:

  1. Test-result linking: Associates trace spans with test results when spans carry test execution context attributes (rhesis.test.test_run_id, rhesis.test.test_result_id, etc.).

  2. Conversation-id linking: Patches first-turn spans with conversation IDs that were not known at the time the span was stored. This happens when a stateful endpoint generates the conversation ID during invocation.

  3. Input file linking: Attaches pending file records (images, documents) to their corresponding trace spans.

Phase 4: Enrichment

After linking, the pipeline dispatches an enrichment chain per unique trace ID. The first task in the chain is enrich_trace_async, which runs the TraceEnricher processor:

tasks/telemetry/enrich.py
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def enrich_trace_async(self, trace_id, project_id, organization_id):
    db = SessionLocal()
    try:
        enricher = TraceEnricher(db)
        enriched_data = enricher.enrich_trace(trace_id, project_id, organization_id)
        # ...
    finally:
        db.close()

Enrichment calculates three things from the trace’s spans:

  • Token costs: Uses LiteLLM’s pricing database to calculate USD and EUR costs for each LLM invocation span. Looks for spans with ai.operation.type = "ai.llm.invoke" and reads token counts from ai.llm.tokens.input / ai.llm.tokens.output.

  • Anomaly detection: Flags slow spans (greater than 10 seconds), high token usage (greater than 10,000 tokens), and error spans.

  • Metadata extraction: Collects unique models, tools, and operation types used across the trace.

The enriched data is stored in the enriched_data JSON column on the root span.

Phase 5: Trace Metrics Evaluation

The second task in the chain is evaluate_turn_trace_metrics, which runs LLM-based metric evaluation on the trace. This is the step that applies configured quality metrics (relevance, coherence, safety, etc.) to trace content.

Prerequisites for Evaluation

Evaluation requires all of the following:

  1. Celery workers running — evaluation never runs in the sync fallback path.

  2. Trace metrics enabled on the project — the project’s attributes.trace_metrics.enabled must not be false.

  3. Trace-scoped metrics configured — at least one metric with Trace in its metric_scope must exist for the organization.

  4. Input/output attributes on the root span — the root span (a span with no parent_span_id) must include:

    • rhesis.conversation.input — the user’s input text
    • rhesis.conversation.output — the system’s response text

If any of these conditions are missing, evaluation is skipped silently with a log message.

Evaluation Flow

Multi-Turn (Conversation) Evaluation

For traces with a conversation_id, a second evaluation phase runs on a debounce timer. The evaluate_conversation_trace_metrics task:

  1. Loads all root spans sharing the same trace_id, ordered by start_time.
  2. Reconstructs the full conversation from rhesis.conversation.input / rhesis.conversation.output attributes across all turns.
  3. Evaluates Multi-Turn scoped metrics against the full conversation history.
  4. Derives a combined Pass/Fail status from both turn-level and conversation-level results.

External Trace Ingestion

For deployments where the customer generates traces externally (e.g., with SDK tracing disabled), the same pipeline applies as long as the published spans meet the requirements.

Required Span Attributes for Evaluation

When posting traces via POST /telemetry/traces, include these attributes on the root span to enable the full pipeline:

code.txt
{
  "spans": [
    {
      "trace_id": "abc123...",
      "span_id": "def456...",
      "parent_span_id": null,
      "project_id": "your-project-uuid",
      "span_name": "your.operation.name",
      "span_kind": "SERVER",
      "start_time": "2025-01-15T10:00:00Z",
      "end_time": "2025-01-15T10:00:02Z",
      "status_code": "OK",
      "attributes": {
        "rhesis.conversation.input": "What is the return policy?",
        "rhesis.conversation.output": "Our return policy allows..."
      }
    }
  ]
}
AttributeRequired ForDescription
rhesis.conversation.inputEvaluationThe user’s input text for this turn
rhesis.conversation.outputEvaluationThe system’s response text for this turn
ai.operation.typeCost calculationSet to ai.llm.invoke for LLM spans
ai.model.nameCost calculationModel identifier (e.g., gpt-4o, claude-3-sonnet)
ai.llm.tokens.inputCost calculationNumber of input tokens
ai.llm.tokens.outputCost calculationNumber of output tokens
conversation_idMulti-turn evalShared conversation identifier across turns

What Runs Without These Attributes

  • Enrichment (Phase 4) always runs. Cost calculation skips spans that lack LLM-specific attributes; anomaly detection and metadata extraction still process all spans.
  • Evaluation (Phase 5) requires rhesis.conversation.input and/or rhesis.conversation.output. Without them, evaluation returns early with status no_io.