Background Tasks
Overview
The Rhesis backend uses Celery to handle asynchronous background tasks. This allows the API to offload time-consuming operations and improve responsiveness. The task processing system is designed to be scalable, fault-tolerant, and context-aware.
Celery Configuration
The Celery application is configured in worker.py:
The application uses PostgreSQL as both the broker and result backend:
Base Task Class
All tasks inherit from a BaseTask class that provides retry logic, error handling, and most importantly, context awareness for multi-tenant operations:
This enhanced BaseTask ensures that:
- Tasks have access to organization_id and user_id for proper multi-tenant operations
- Context is automatically propagated through the task execution
- Error handling and retry logic are standardized
- Logging and monitoring include context information
Tenant Context Decorator
A task decorator is provided to automatically handle database sessions with proper tenant context:
Using this decorator simplifies working with database operations in tasks:
Task Launcher Utility
A task_launcher utility method is provided to easily launch tasks with proper context from FastAPI routes:
Task Organization
Tasks are organized in the tasks/ directory:
Creating Tasks
When creating a task, you no longer need to explicitly require organization_id and user_id as parameters. The context system handles this automatically:
Simple Task without Database Access
Task with Automatic Database Context
Task with Manual Database Session Control
Using Tasks in FastAPI Routes
The most common way to launch tasks is from FastAPI route handlers:
Worker Configuration
Celery workers are configured with performance optimizations:
The worker startup script applies these configurations:
Optional monitoring is available through Flower:
Task Monitoring
Task status can be monitored through several interfaces:
API Endpoint
Flower Dashboard
Access the Flower web UI at http://localhost:5555 when enabled.
Error Handling
The enhanced BaseTask provides improved error handling:
- Exceptions are logged with tenant context information
- Failed tasks are automatically retried with exponential backoff
- After maximum retries, the error is recorded in the result backend
- Both success and failure callbacks include context information
- Task execution time and other metrics are tracked automatically
Troubleshooting
Dealing with Stuck Tasks
Sometimes tasks can get stuck in an infinite retry loop, especially chord tasks (chord_unlock) when subtasks fail. This can happen if:
- One or more subtasks in a chord fail permanently
- The broker connection is interrupted during a chord execution
- The worker processes are killed unexpectedly
Symptoms of Stuck Tasks
The most obvious symptom is thousands of repeated log entries like these:
These messages indicate that there are “zombie” tasks that keep retrying indefinitely.
Configuration to Prevent Stuck Tasks
The worker.py file includes configuration to limit chord retries:
Additionally, the results handling in tasks/execution/results.py includes logic to detect and handle failed subtasks:
Purging Stuck Tasks
If you encounter stuck tasks, you can purge all tasks from the queue:
This command removes all pending tasks from all queues. Use it with caution in production as it will delete all tasks, including those that are legitimately waiting to be processed.
For a more targeted approach, you can inspect and revoke specific tasks:
Troubleshooting Tenant Context Issues
If tasks fail with errors related to the tenant context, such as:
Ensure that:
- Your database has the proper configuration parameters set
- The
organization_idanduser_idare correctly passed to the task - The tenant context is explicitly set at the beginning of database operations
The execute_single_test task in tasks/execution/test.py includes defensive coding to handle such issues:
Task Monitoring
Task status can be monitored through several interfaces: