Worker Tasks and Background Processing
Overview
The Rhesis backend uses Celery to handle asynchronous background tasks. This allows the API to offload time-consuming operations and improve responsiveness. The task processing system is designed to be scalable, fault-tolerant, and context-aware.
Celery Configuration
The Celery application is configured in worker.py
:
import os
from celery import Celery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Create the Celery app
app = Celery("rhesis")
# Configure Celery
app.conf.update(
broker_url=os.getenv("BROKER_URL"),
result_backend=os.getenv("CELERY_RESULT_BACKEND"),
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
)
# Auto-discover tasks without loading config files
app.autodiscover_tasks(["rhesis.backend.tasks"], force=True)
The application uses Redis as both the broker and result backend with TLS support:
# Development (local Redis)
BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# Production (Redis with TLS)
BROKER_URL=rediss://:password@redis-host:6378/0?ssl_cert_reqs=CERT_NONE
CELERY_RESULT_BACKEND=rediss://:password@redis-host:6378/1?ssl_cert_reqs=CERT_NONE
# Model API Keys (required for evaluation tasks)
GEMINI_API_KEY=your_gemini_api_key
GEMINI_MODEL_NAME=gemini-1.5-pro
AZURE_OPENAI_ENDPOINT=https://your-endpoint.openai.azure.com/
AZURE_OPENAI_API_KEY=your_azure_openai_key
AZURE_OPENAI_DEPLOYMENT_NAME=your-deployment-name
AZURE_OPENAI_API_VERSION=2024-02-01
Note: The rediss://
protocol indicates Redis with TLS/SSL encryption. The ssl_cert_reqs=CERT_NONE
parameter is used when connecting to managed Redis services that use self-signed certificates.
Base Task Class
All tasks inherit from a BaseTask
class that provides retry logic, error handling, and most importantly, context awareness for multi-tenant operations:
from celery import Task
from contextlib import contextmanager
from rhesis.backend.app.database import SessionLocal, set_tenant
class BaseTask(Task):
"""Base task class with retry settings and tenant context management."""
# Retry settings
autoretry_for = (Exception,)
max_retries = 3
retry_backoff = True
retry_backoff_max = 600 # 10 minutes max delay
track_started = True
@contextmanager
def get_db_session(self):
"""Get a database session with the proper tenant context."""
db = SessionLocal()
try:
# Start with a clean session
db.expire_all()
# Get task context
request = getattr(self, 'request', None)
org_id = getattr(request, 'organization_id', None)
user_id = getattr(request, 'user_id', None)
# Set tenant context if available
if org_id or user_id:
set_tenant(db, org_id, user_id)
yield db
finally:
db.close()
def apply_async(self, args=None, kwargs=None, **options):
"""Store org_id and user_id in task context when task is queued."""
args = args or ()
kwargs = kwargs or {}
# Extract context from kwargs and store in headers
if kwargs and ('organization_id' in kwargs or 'user_id' in kwargs):
headers = options.get('headers', {})
if 'organization_id' in kwargs:
headers['organization_id'] = kwargs.pop('organization_id')
if 'user_id' in kwargs:
headers['user_id'] = kwargs.pop('user_id')
options['headers'] = headers
return super().apply_async(args, kwargs, **options)
def before_start(self, task_id, args, kwargs):
"""Add organization_id and user_id to task request context."""
# Try to get from kwargs first, then headers (which take precedence)
if 'organization_id' in kwargs:
self.request.organization_id = kwargs.pop('organization_id')
if 'user_id' in kwargs:
self.request.user_id = kwargs.pop('user_id')
headers = getattr(self.request, 'headers', {}) or {}
if headers:
if 'organization_id' in headers:
self.request.organization_id = headers['organization_id']
if 'user_id' in headers:
self.request.user_id = headers['user_id']
This enhanced BaseTask
ensures that:
- Tasks have access to organization_id and user_id for proper multi-tenant operations
- Context is automatically propagated through the task execution
- Error handling and retry logic are standardized
- Logging and monitoring include context information
Tenant Context Decorator
A task decorator is provided to automatically handle database sessions with proper tenant context:
from functools import wraps
def with_tenant_context(func):
"""
Decorator to automatically maintain tenant context in task functions.
This ensures all database operations use the proper tenant context.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# Get task request for context
request = getattr(self, 'request', None)
organization_id = getattr(request, 'organization_id', None)
user_id = getattr(request, 'user_id', None)
# Get a database session
with self.get_db_session() as db:
# Set tenant for this session
if organization_id or user_id:
set_tenant(db, organization_id, user_id)
# Add db to kwargs and execute the task function
kwargs['db'] = db
return func(self, *args, **kwargs)
return wrapper
Using this decorator simplifies working with database operations in tasks:
@app.task(base=BaseTask, name="rhesis.backend.tasks.count_test_sets")
@with_tenant_context
def count_test_sets(self, db=None):
"""
Task that counts total test sets.
The db parameter is automatically provided with tenant context.
"""
# Access context from task request
org_id = getattr(self.request, 'organization_id', 'unknown')
user_id = getattr(self.request, 'user_id', 'unknown')
# The db session already has tenant context set
test_sets = crud.get_test_sets(db)
count = len(test_sets)
return {
"count": count,
"organization_id": org_id,
"user_id": user_id
}
Task Launcher Utility
A task_launcher
utility method is provided to easily launch tasks with proper context from FastAPI routes:
def task_launcher(task: Callable, *args: Any, current_user=None, **kwargs: Any):
"""
Launch a task with proper context from a FastAPI route.
This helper automatically adds organization_id and user_id from current_user
to the task context, removing the need to pass them explicitly.
"""
# Add user context if available and not already specified
if current_user is not None:
if hasattr(current_user, 'id') and current_user.id is not None:
kwargs.setdefault('user_id', str(current_user.id))
if hasattr(current_user, 'organization_id') and current_user.organization_id is not None:
kwargs.setdefault('organization_id', str(current_user.organization_id))
# Launch the task
return task.delay(*args, **kwargs)
Task Organization
Tasks are organized in the tasks/
directory:
tasks/
├── __init__.py
├── base.py
├── example_task.py
├── test_configuration.py
└── test_set.py
Creating Tasks
When creating a task, you no longer need to explicitly require organization_id and user_id as parameters. The context system handles this automatically:
Simple Task without Database Access
@app.task(base=BaseTask, name="rhesis.backend.tasks.echo")
def echo(message: str):
"""Echo task for testing context."""
task = echo.request
org_id = getattr(task, 'organization_id', 'unknown')
user_id = getattr(task, 'user_id', 'unknown')
print(f"Task executed for organization: {org_id}, by user: {user_id}")
return f"Message: {message}, Organization: {org_id}, User: {user_id}"
Task with Automatic Database Context
@app.task(base=BaseTask, name="rhesis.backend.tasks.get_test_configuration")
@with_tenant_context
def get_test_configuration(test_configuration_id: str, db=None):
"""
Get a test configuration with proper tenant context.
The @with_tenant_context decorator automatically:
1. Creates a database session
2. Sets the tenant context from task headers
3. Passes the session to the function
4. Closes the session when done
"""
# Convert string ID to UUID
config_id = UUID(test_configuration_id)
# Use existing crud functions with proper tenant context
test_config = crud.get_test_configuration(db, test_configuration_id=config_id)
return {
"found": test_config is not None,
"id": str(test_config.id) if test_config else None
}
Task with Manual Database Session Control
@app.task(base=BaseTask, name="rhesis.backend.tasks.manual_db_example")
def manual_db_example():
"""Example of manually managing database sessions."""
results = {}
# Use the context manager to get a properly configured session
with manual_db_example.get_db_session() as db:
# The session already has tenant context set
test_sets = crud.get_test_sets(db)
results["test_set_count"] = len(test_sets)
return results
Using Tasks in FastAPI Routes
The most common way to launch tasks is from FastAPI route handlers:
from rhesis.backend.tasks import task_launcher, execute_test_configuration
@router.post("/{test_configuration_id}/execute")
def execute_test_configuration_endpoint(
test_configuration_id: UUID,
current_user: schemas.User = Depends(require_current_user_or_token)
):
# Schedule the task with automatic context handling
result = task_launcher(
execute_test_configuration,
str(test_configuration_id),
current_user=current_user
)
# Return the task ID for checking status later
return {"task_id": result.id}
Worker Configuration
Celery workers are configured with Redis-optimized performance settings:
# Default Celery configuration optimized for Redis
ENV CELERY_WORKER_CONCURRENCY=8 \
CELERY_WORKER_PREFETCH_MULTIPLIER=4 \
CELERY_WORKER_MAX_TASKS_PER_CHILD=1000 \
CELERY_WORKER_LOGLEVEL=INFO \
CELERY_WORKER_OPTS=""
Redis-Specific Configuration
The Celery app includes Redis-optimized settings:
app.conf.update(
# Redis configuration with TLS support
broker_url=os.getenv("BROKER_URL", "redis://localhost:6379/0"),
result_backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
# Redis-optimized settings
result_expires=3600, # 1 hour - shorter for Redis efficiency
result_compression="gzip",
# Connection settings for Redis reliability
broker_connection_retry_on_startup=True,
broker_connection_retry=True,
broker_connection_max_retries=10,
# Redis transport options for TLS connections
broker_transport_options={
'retry_on_timeout': True,
'connection_pool_kwargs': {
'retry_on_timeout': True,
'socket_connect_timeout': 30,
'socket_timeout': 30,
}
},
result_backend_transport_options={
'retry_on_timeout': True,
'connection_pool_kwargs': {
'retry_on_timeout': True,
'socket_connect_timeout': 30,
'socket_timeout': 30,
}
},
)
The worker startup script applies these configurations:
# Start Celery worker with Redis-optimized settings
celery -A rhesis.backend.worker.app worker \
--queues=celery,execution \
--loglevel=${CELERY_WORKER_LOGLEVEL:-INFO} \
--concurrency=${CELERY_WORKER_CONCURRENCY:-8} \
--prefetch-multiplier=${CELERY_WORKER_PREFETCH_MULTIPLIER:-4} \
--max-tasks-per-child=${CELERY_WORKER_MAX_TASKS_PER_CHILD:-1000} \
${CELERY_WORKER_OPTS}
Optional monitoring is available through Flower:
# Enable Flower monitoring
docker run -e ENABLE_FLOWER=yes -p 5555:5555 your-worker-image
Task Monitoring
Task status can be monitored through several interfaces:
API Endpoint
@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get the status of a task."""
result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
"error": str(result.error) if result.failed() else None,
}
Flower Dashboard
Access the Flower web UI at http://localhost:5555
when enabled.
Error Handling
The enhanced BaseTask provides improved error handling:
- Exceptions are logged with tenant context information
- Failed tasks are automatically retried with exponential backoff
- After maximum retries, the error is recorded in the result backend
- Both success and failure callbacks include context information
- Task execution time and other metrics are tracked automatically
Troubleshooting
For detailed information about troubleshooting common worker issues, including:
- Dealing with stuck tasks and chord_unlock zombies
- Fixing tenant context errors
- Connection problems with the broker
- Task execution failures
Please refer to the Worker Troubleshooting Guide.
Task Monitoring
Task status can be monitored through several interfaces: