Examples
Complete working examples demonstrating various SDK connector patterns.
Basic Chat Function
The simplest possible endpoint using auto-mapping:
basic_chat.py
from rhesis.sdk import RhesisClient, endpoint
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
@endpoint()
def chat(input: str, session_id: str = None) -> dict:
"""Basic chat endpoint with auto-mapping."""
response = generate_response(input)
return {
"output": response,
"session_id": session_id or generate_session_id(),
}Multiple Functions
Register multiple endpoints in a single application:
multi_function.py
from rhesis.sdk import RhesisClient, endpoint
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
@endpoint()
def handle_chat(input: str, session_id: str = None) -> dict:
"""Process chat messages."""
return {"output": generate_response(input), "session_id": session_id}
@endpoint()
def search_documents(input: str, context: list = None) -> dict:
"""Search documents."""
results = perform_search(input, context or [])
return {"output": format_results(results), "context": results}
@endpoint()
def summarize(input: str) -> dict:
"""Summarize text."""
return {"output": generate_summary(input)}Custom Field Mapping
Map custom API fields to function parameters:
custom_fields.py
@endpoint(
request_mapping={
"question": "{{ input }}",
"policy_id": "{{ policy_number }}",
"customer_tier": "{{ tier }}",
},
response_mapping={
"output": "$.answer",
"metadata": "$.claim_info",
},
)
def insurance_query(question: str, policy_id: str, customer_tier: str = "standard") -> dict:
"""Query insurance policy with custom fields."""
answer, claim_info = lookup_policy(question, policy_id, customer_tier)
return {
"answer": answer,
"claim_info": claim_info,
}mlflow Agent Integration
Native integration with mlflow’s ChatAgent framework:
mlflow_integration.py
from mlflow.types.agent import (
ChatAgentRequest,
ChatAgentResponse,
ChatAgentMessage,
ChatContext
)
import uuid
from rhesis.sdk import RhesisClient, endpoint
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
@endpoint(
name="mlflow_chat_agent",
request_mapping={
"request": {
"messages": [{"role": "user", "content": "{{ input }}"}],
"context": {"conversation_id": "{{ session_id }}"},
},
},
response_mapping={
"output": "$.messages[-1].content",
"session_id": "$.messages[-1].id",
"metadata": "$.custom_outputs",
},
)
def my_agent(request: ChatAgentRequest) -> ChatAgentResponse:
"""
Native mlflow agent - uses standard ChatAgentRequest/Response.
No wrappers or manual conversion needed.
"""
# Access the user message
user_content = request.messages[-1].content
conv_id = request.context.conversation_id if request.context else None
# Process with your LLM
response_text = process_with_llm(user_content)
return ChatAgentResponse(
messages=[
ChatAgentMessage(
id=str(uuid.uuid4()),
role="assistant",
content=response_text,
)
],
finish_reason="stop",
custom_outputs={
"conversation_id": conv_id,
"model": "gpt-4",
},
)Dataclass-Based Endpoint
Using Python dataclasses for type safety:
dataclass_endpoint.py
from dataclasses import dataclass
from rhesis.sdk import RhesisClient, endpoint
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
@dataclass
class SearchRequest:
query: str
filters: dict | None = None
max_results: int = 10
@dataclass
class SearchResult:
title: str
snippet: str
score: float
@dataclass
class SearchResponse:
results: list[SearchResult]
total_count: int
query_time_ms: float
@endpoint(
request_mapping={
"request": {
"query": "{{ input }}",
"max_results": 5,
}
},
response_mapping={
"output": "$.results[0].snippet",
"context": "$.results",
"metadata": "{{ {'total': total_count, 'time_ms': query_time_ms} }}",
},
)
def search(request: SearchRequest) -> SearchResponse:
"""Search endpoint with dataclass types."""
import time
start = time.time()
raw_results = perform_search(request.query, request.max_results)
results = [
SearchResult(
title=r["title"],
snippet=r["snippet"],
score=r["score"],
)
for r in raw_results
]
return SearchResponse(
results=results,
total_count=len(results),
query_time_ms=(time.time() - start) * 1000,
)With Database Dependencies
Using parameter binding for database connections:
database_example.py
from rhesis.sdk import RhesisClient, endpoint
from myapp.database import get_db_session
from myapp.models import User, Document
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
def get_db():
"""Generator for database session with auto-cleanup."""
with get_db_session() as session:
yield session
@endpoint(
bind={
"db": get_db,
"user_id": lambda: get_current_user_id(),
}
)
def query_documents(db, user_id, input: str, session_id: str = None) -> dict:
"""
Query user's documents.
Remote signature: query_documents(input: str, session_id: str = None)
"""
# Verify user has access
user = db.query(User).get(user_id)
if not user:
return {"output": "User not found", "error": True}
# Search documents
documents = (
db.query(Document)
.filter(Document.user_id == user_id)
.filter(Document.content.contains(input))
.limit(10)
.all()
)
return {
"output": format_documents(documents),
"context": [d.to_dict() for d in documents],
"session_id": session_id,
}Async with Error Handling
Async endpoint with comprehensive error handling:
async_error_handling.py
from rhesis.sdk import RhesisClient, endpoint
import asyncio
import logging
logger = logging.getLogger(__name__)
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
@endpoint(
request_mapping={
"query": "{{ input }}",
"timeout": "{{ timeout | default(30) }}",
},
response_mapping={
"output": "$.result",
"metadata": "$.metadata",
},
)
async def async_query(query: str, timeout: int = 30) -> dict:
"""Async endpoint with timeout and error handling."""
try:
# Run with timeout
result = await asyncio.wait_for(
process_query_async(query),
timeout=timeout
)
return {
"result": result,
"metadata": {"status": "success", "timeout": timeout},
}
except asyncio.TimeoutError:
logger.warning(f"Query timed out after {timeout}s: {query[:50]}...")
return {
"result": None,
"metadata": {"status": "timeout", "timeout": timeout},
"error": f"Query timed out after {timeout} seconds",
}
except Exception as e:
logger.error(f"Query failed: {e}")
return {
"result": None,
"metadata": {"status": "error"},
"error": str(e),
}RAG Pipeline
Complete RAG (Retrieval-Augmented Generation) example:
rag_pipeline.py
from rhesis.sdk import RhesisClient, endpoint
from myapp.vectorstore import VectorStore
from myapp.llm import LLMClient
client = RhesisClient(
api_key="your-api-key",
project_id="your-project-id",
environment="development",
)
@endpoint(
bind={
"vectorstore": lambda: VectorStore.connect(),
"llm": lambda: LLMClient(),
},
request_mapping={
"question": "{{ input }}",
"num_docs": "{{ context_size | default(5) }}",
},
response_mapping={
"output": "$.answer",
"context": "$.sources",
"metadata": "$.metadata",
},
)
async def rag_query(
vectorstore,
llm,
question: str,
num_docs: int = 5,
session_id: str = None,
) -> dict:
"""
RAG pipeline: retrieve relevant documents and generate answer.
Remote signature: rag_query(input: str, session_id: str = None)
"""
# 1. Retrieve relevant documents
docs = await vectorstore.similarity_search(question, k=num_docs)
# 2. Build context from documents
context = "\n\n".join([
f"Document {i+1}:\n{doc.content}"
for i, doc in enumerate(docs)
])
# 3. Generate answer with LLM
prompt = f"""Based on the following context, answer the question.
Context:
{context}
Question: {question}
Answer:"""
answer = await llm.generate(prompt)
return {
"answer": answer,
"sources": [
{"title": d.title, "snippet": d.content[:200]}
for d in docs
],
"metadata": {
"num_sources": len(docs),
"model": llm.model_name,
},
"session_id": session_id,
}More Resources - Overview for quick start guide - Mapping for input/output configuration - Advanced Mapping for complex types - Parameter Binding for dependency injection