Skip to Content

Examples

Complete working examples demonstrating various SDK connector patterns.

Basic Chat Function

The simplest possible endpoint using auto-mapping:

basic_chat.py
from rhesis.sdk import RhesisClient, endpoint

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

@endpoint()
def chat(input: str, session_id: str = None) -> dict:
    """Basic chat endpoint with auto-mapping."""
    response = generate_response(input)
    return {
        "output": response,
        "session_id": session_id or generate_session_id(),
    }

Multiple Functions

Register multiple endpoints in a single application:

multi_function.py
from rhesis.sdk import RhesisClient, endpoint

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

@endpoint()
def handle_chat(input: str, session_id: str = None) -> dict:
    """Process chat messages."""
    return {"output": generate_response(input), "session_id": session_id}

@endpoint()
def search_documents(input: str, context: list = None) -> dict:
    """Search documents."""
    results = perform_search(input, context or [])
    return {"output": format_results(results), "context": results}

@endpoint()
def summarize(input: str) -> dict:
    """Summarize text."""
    return {"output": generate_summary(input)}

Custom Field Mapping

Map custom API fields to function parameters:

custom_fields.py
@endpoint(
    request_mapping={
        "question": "{{ input }}",
        "policy_id": "{{ policy_number }}",
        "customer_tier": "{{ tier }}",
    },
    response_mapping={
        "output": "$.answer",
        "metadata": "$.claim_info",
    },
)
def insurance_query(question: str, policy_id: str, customer_tier: str = "standard") -> dict:
    """Query insurance policy with custom fields."""
    answer, claim_info = lookup_policy(question, policy_id, customer_tier)
    return {
        "answer": answer,
        "claim_info": claim_info,
    }

mlflow Agent Integration

Native integration with mlflow’s ChatAgent framework:

mlflow_integration.py
from mlflow.types.agent import (
    ChatAgentRequest, 
    ChatAgentResponse, 
    ChatAgentMessage, 
    ChatContext
)
import uuid
from rhesis.sdk import RhesisClient, endpoint

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

@endpoint(
    name="mlflow_chat_agent",
    request_mapping={
        "request": {
            "messages": [{"role": "user", "content": "{{ input }}"}],
            "context": {"conversation_id": "{{ session_id }}"},
        },
    },
    response_mapping={
        "output": "$.messages[-1].content",
        "session_id": "$.messages[-1].id",
        "metadata": "$.custom_outputs",
    },
)
def my_agent(request: ChatAgentRequest) -> ChatAgentResponse:
    """
    Native mlflow agent - uses standard ChatAgentRequest/Response.
    No wrappers or manual conversion needed.
    """
    # Access the user message
    user_content = request.messages[-1].content
    conv_id = request.context.conversation_id if request.context else None
    
    # Process with your LLM
    response_text = process_with_llm(user_content)
    
    return ChatAgentResponse(
        messages=[
            ChatAgentMessage(
                id=str(uuid.uuid4()),
                role="assistant",
                content=response_text,
            )
        ],
        finish_reason="stop",
        custom_outputs={
            "conversation_id": conv_id,
            "model": "gpt-4",
        },
    )

Dataclass-Based Endpoint

Using Python dataclasses for type safety:

dataclass_endpoint.py
from dataclasses import dataclass
from rhesis.sdk import RhesisClient, endpoint

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

@dataclass
class SearchRequest:
    query: str
    filters: dict | None = None
    max_results: int = 10

@dataclass
class SearchResult:
    title: str
    snippet: str
    score: float

@dataclass
class SearchResponse:
    results: list[SearchResult]
    total_count: int
    query_time_ms: float

@endpoint(
    request_mapping={
        "request": {
            "query": "{{ input }}",
            "max_results": 5,
        }
    },
    response_mapping={
        "output": "$.results[0].snippet",
        "context": "$.results",
        "metadata": "{{ {'total': total_count, 'time_ms': query_time_ms} }}",
    },
)
def search(request: SearchRequest) -> SearchResponse:
    """Search endpoint with dataclass types."""
    import time
    start = time.time()
    
    raw_results = perform_search(request.query, request.max_results)
    
    results = [
        SearchResult(
            title=r["title"],
            snippet=r["snippet"],
            score=r["score"],
        )
        for r in raw_results
    ]
    
    return SearchResponse(
        results=results,
        total_count=len(results),
        query_time_ms=(time.time() - start) * 1000,
    )

With Database Dependencies

Using parameter binding for database connections:

database_example.py
from rhesis.sdk import RhesisClient, endpoint
from myapp.database import get_db_session
from myapp.models import User, Document

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

def get_db():
    """Generator for database session with auto-cleanup."""
    with get_db_session() as session:
        yield session

@endpoint(
    bind={
        "db": get_db,
        "user_id": lambda: get_current_user_id(),
    }
)
def query_documents(db, user_id, input: str, session_id: str = None) -> dict:
    """
    Query user's documents.
    
    Remote signature: query_documents(input: str, session_id: str = None)
    """
    # Verify user has access
    user = db.query(User).get(user_id)
    if not user:
        return {"output": "User not found", "error": True}
    
    # Search documents
    documents = (
        db.query(Document)
        .filter(Document.user_id == user_id)
        .filter(Document.content.contains(input))
        .limit(10)
        .all()
    )
    
    return {
        "output": format_documents(documents),
        "context": [d.to_dict() for d in documents],
        "session_id": session_id,
    }

Async with Error Handling

Async endpoint with comprehensive error handling:

async_error_handling.py
from rhesis.sdk import RhesisClient, endpoint
import asyncio
import logging

logger = logging.getLogger(__name__)

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

@endpoint(
    request_mapping={
        "query": "{{ input }}",
        "timeout": "{{ timeout | default(30) }}",
    },
    response_mapping={
        "output": "$.result",
        "metadata": "$.metadata",
    },
)
async def async_query(query: str, timeout: int = 30) -> dict:
    """Async endpoint with timeout and error handling."""
    try:
        # Run with timeout
        result = await asyncio.wait_for(
            process_query_async(query),
            timeout=timeout
        )
        
        return {
            "result": result,
            "metadata": {"status": "success", "timeout": timeout},
        }
        
    except asyncio.TimeoutError:
        logger.warning(f"Query timed out after {timeout}s: {query[:50]}...")
        return {
            "result": None,
            "metadata": {"status": "timeout", "timeout": timeout},
            "error": f"Query timed out after {timeout} seconds",
        }
        
    except Exception as e:
        logger.error(f"Query failed: {e}")
        return {
            "result": None,
            "metadata": {"status": "error"},
            "error": str(e),
        }

RAG Pipeline

Complete RAG (Retrieval-Augmented Generation) example:

rag_pipeline.py
from rhesis.sdk import RhesisClient, endpoint
from myapp.vectorstore import VectorStore
from myapp.llm import LLMClient

client = RhesisClient(
    api_key="your-api-key",
    project_id="your-project-id",
    environment="development",
)

@endpoint(
    bind={
        "vectorstore": lambda: VectorStore.connect(),
        "llm": lambda: LLMClient(),
    },
    request_mapping={
        "question": "{{ input }}",
        "num_docs": "{{ context_size | default(5) }}",
    },
    response_mapping={
        "output": "$.answer",
        "context": "$.sources",
        "metadata": "$.metadata",
    },
)
async def rag_query(
    vectorstore, 
    llm, 
    question: str, 
    num_docs: int = 5,
    session_id: str = None,
) -> dict:
    """
    RAG pipeline: retrieve relevant documents and generate answer.
    
    Remote signature: rag_query(input: str, session_id: str = None)
    """
    # 1. Retrieve relevant documents
    docs = await vectorstore.similarity_search(question, k=num_docs)
    
    # 2. Build context from documents
    context = "\n\n".join([
        f"Document {i+1}:\n{doc.content}"
        for i, doc in enumerate(docs)
    ])
    
    # 3. Generate answer with LLM
    prompt = f"""Based on the following context, answer the question.

Context:
{context}

Question: {question}

Answer:"""
    
    answer = await llm.generate(prompt)
    
    return {
        "answer": answer,
        "sources": [
            {"title": d.title, "snippet": d.content[:200]}
            for d in docs
        ],
        "metadata": {
            "num_sources": len(docs),
            "model": llm.model_name,
        },
        "session_id": session_id,
    }

More Resources - Overview for quick start guide - Mapping for input/output configuration - Advanced Mapping for complex types - Parameter Binding for dependency injection