Chord Management and Monitoring
This document provides comprehensive information about managing Celery chords in the Rhesis worker system, including monitoring, troubleshooting, and best practices.
What are Chords?
A chord in Celery is a pattern that allows you to execute a group of tasks in parallel and then run a callback function once all tasks in the group have completed. This is particularly useful for scenarios like:
- Running multiple test executions in parallel and then collecting the results
- Processing multiple files concurrently and then aggregating the output
- Performing parallel computations and combining the results
Chord Structure
from celery import chord
# Create a group of parallel tasks
tasks = [
execute_single_test.s(test_id_1, ...),
execute_single_test.s(test_id_2, ...),
execute_single_test.s(test_id_3, ...),
]
# Define a callback to run after all tasks complete
callback = collect_results.s(start_time, config_id, ...)
# Execute the chord
job = chord(tasks)(callback)
How Rhesis Uses Chords
In the Rhesis system, chords are primarily used in test execution:
- Parallel Test Execution: Individual tests are executed in parallel using
execute_single_test
tasks - Result Collection: Once all tests complete,
collect_results
is called to aggregate the results - Status Updates: The test run status is updated based on the aggregated results
Example from orchestration.py
:
# Create tasks for parallel execution
tasks = []
for test in tests:
task = execute_single_test.s(
test_config_id=str(test_config.id),
test_run_id=str(test_run.id),
test_id=str(test.id),
# ... other parameters
)
tasks.append(task)
# Collect results callback
callback = collect_results.s(
datetime.utcnow().isoformat(),
str(test_config.id),
str(test_run.id),
# ... other parameters
)
# Execute chord
job = chord(tasks)(callback)
Common Chord Issues
1. chord_unlock
MaxRetriesExceededError
Symptoms:
MaxRetriesExceededError: Can't retry celery.chord_unlock[task-id] args:(...) kwargs:{...}
Causes:
- Individual tasks returning
None
instead of proper results - Tasks failing without proper error handling
- Network interruptions during chord execution
- Worker processes being terminated unexpectedly
Solutions:
- Ensure all tasks return valid results (even on failure)
- Configure maximum retries for
chord_unlock
tasks - Implement proper error handling in callback functions
- Use monitoring tools to detect and resolve stuck chords
2. Chord Never Completing
Symptoms:
- Callback function (
collect_results
) never executes - Test runs remain in “IN_PROGRESS” status indefinitely
- Tasks appear to complete but no final status update
Causes:
- One or more subtasks in the chord failed silently
- Result backend issues preventing result storage
- Incorrect chord setup or callback configuration
Chord Monitoring Tools
Built-in Monitoring Script
The system includes a comprehensive monitoring script at src/rhesis/backend/tasks/execution/chord_monitor.py
that provides several utilities:
1. Check Chord Status
# Check for stuck chords running longer than 2 hours
python -m rhesis.backend.tasks.execution.chord_monitor check --max-hours 2
# Check with JSON output
python -m rhesis.backend.tasks.execution.chord_monitor check --max-hours 1 --json
2. Show Current Status
# Display summary of active chord_unlock tasks
python -m rhesis.backend.tasks.execution.chord_monitor status
3. Revoke Stuck Chords
# Dry run - show what would be revoked
python -m rhesis.backend.tasks.execution.chord_monitor revoke --max-hours 1 --dry-run
# Actually revoke stuck chords
python -m rhesis.backend.tasks.execution.chord_monitor revoke --max-hours 1
4. Inspect Specific Chord
# Get detailed information about a specific chord
python -m rhesis.backend.tasks.execution.chord_monitor inspect <chord-id> --verbose
5. Clean All Tasks (Emergency)
# Purge all tasks from all queues (use with extreme caution)
python -m rhesis.backend.tasks.execution.chord_monitor clean --force
Quick Fix Script
A simplified monitoring script is available at the root level:
# Check and interactively fix chord issues
python fix_chords.py
This script:
- Shows current chord status
- Detects stuck chords (>30 minutes)
- Offers to revoke stuck chords interactively
- Auto-revokes very stuck chords (>2 hours)
- Provides recommendations for next steps
Monitoring Best Practices
1. Regular Monitoring
Set up periodic monitoring to catch chord issues early:
# Add to crontab to run every 15 minutes
*/15 * * * * cd /path/to/backend && python fix_chords.py
2. Automated Cleanup
Use the built-in periodic monitoring function:
from rhesis.backend.tasks.execution.chord_monitor import setup_periodic_monitoring
# This can be called from a scheduled task
result = setup_periodic_monitoring()
3. Logging and Alerting
Monitor your logs for chord-related errors:
# Monitor for chord_unlock errors
tail -f celery_worker.log | grep "chord_unlock"
# Monitor for MaxRetriesExceededError
tail -f celery_worker.log | grep "MaxRetriesExceededError"
4. Health Checks
Include chord status in your health check endpoints:
from rhesis.backend.tasks.execution.chord_monitor import get_active_chord_unlocks
def health_check():
active_chords = get_active_chord_unlocks()
stuck_chords = check_stuck_chords(max_runtime_hours=1)
return {
"active_chord_unlocks": len(active_chords),
"stuck_chords": len(stuck_chords),
"status": "unhealthy" if stuck_chords else "healthy"
}
Configuration for Chord Stability
Worker Configuration
In worker.py
, ensure proper chord configuration:
app.conf.update(
# Chord configuration - prevent infinite retry loops
chord_unlock_max_retries=3,
chord_unlock_retry_delay=1.0,
# Result backend configuration
result_persistent=True,
result_expires=3600, # Results expire after 1 hour
# Task tracking for better chord monitoring
task_track_started=True,
task_send_sent_event=True,
worker_send_task_events=True,
# Task-specific configurations
task_annotations={
'celery.chord_unlock': {
'max_retries': 3,
'retry_backoff': True,
'retry_backoff_max': 60,
'retry_jitter': True,
'soft_time_limit': 300, # 5 minutes soft limit
'time_limit': 600, # 10 minutes hard limit
},
'rhesis.backend.tasks.execution.results.collect_results': {
'max_retries': 3,
'retry_backoff': True,
'retry_backoff_max': 60,
'soft_time_limit': 600, # 10 minutes soft limit
'time_limit': 900, # 15 minutes hard limit
}
}
)
Task Implementation Best Practices
Always Return Valid Results
@app.task(bind=True)
def execute_single_test(self, ...):
try:
result = perform_test_execution(...)
# Ensure we always return a valid result
if result is None:
result = {
"test_id": test_id,
"status": "failed",
"error": "Test execution returned None",
"execution_time": 0
}
return result
except Exception as e:
# Return failure result instead of raising exception
failure_result = {
"test_id": test_id,
"status": "failed",
"error": str(e),
"execution_time": 0
}
if self.request.retries < self.max_retries:
try:
self.retry(exc=e, kwargs=original_kwargs)
except self.MaxRetriesExceededError:
return failure_result
return failure_result
Handle Malformed Results in Callbacks
@app.task(bind=True)
def collect_results(self, results, ...):
# Handle different result formats from chord execution
processed_results = []
if results:
for result in results:
if result is None:
processed_results.append(None)
elif isinstance(result, list) and len(result) == 2:
# Handle [[task_id, result], error] format
task_result = result[1] if result[1] is not None else None
processed_results.append(task_result)
else:
processed_results.append(result)
# Process results and handle failures gracefully
failed_tasks = sum(1 for result in processed_results
if result is None or
(isinstance(result, dict) and result.get("status") == "failed"))
Troubleshooting Workflows
When You Encounter Chord Issues
-
Immediate Assessment
python fix_chords.py
-
Check Active Tasks
python -m rhesis.backend.tasks.execution.chord_monitor status
-
Look for Stuck Chords
python -m rhesis.backend.tasks.execution.chord_monitor check --max-hours 1
-
Review Logs
tail -50 celery_worker.log | grep -E "(chord_unlock|MaxRetries|ERROR)"
-
Clean Up if Necessary
# Revoke stuck chords python -m rhesis.backend.tasks.execution.chord_monitor revoke --max-hours 0.5 # Restart workers to pick up new configuration pkill -f celery celery -A rhesis.backend.worker.app worker --loglevel=INFO &
Emergency Recovery
If the system is completely stuck with many chord_unlock tasks:
-
Stop All Workers
pkill -f celery
-
Purge All Tasks (use with caution)
python -m rhesis.backend.tasks.execution.chord_monitor clean --force
-
Restart Workers
celery -A rhesis.backend.worker.app worker --loglevel=INFO &
-
Monitor Recovery
python fix_chords.py
Monitoring Script Reference
Command Line Options
Command | Description | Example |
---|---|---|
status | Show current chord status | python -m ...chord_monitor status |
check | Check for stuck chords | python -m ...chord_monitor check --max-hours 2 |
revoke | Revoke stuck chords | python -m ...chord_monitor revoke --max-hours 1 |
inspect | Inspect specific chord | python -m ...chord_monitor inspect <chord-id> |
clean | Purge all tasks | python -m ...chord_monitor clean --force |
Common Options
--max-hours N
: Consider chords stuck after N hours--dry-run
: Show what would be done without executing--json
: Output results in JSON format--verbose
: Show detailed information--force
: Required for destructive operations
Return Codes
0
: Success, no issues found1
: Issues found or errors occurred130
: Operation cancelled by user
Prevention Strategies
- Proper Task Design: Always return valid results, handle exceptions gracefully
- Configuration: Set appropriate timeouts and retry limits
- Monitoring: Regular checks for stuck chords
- Testing: Test chord behavior in development with various failure scenarios
- Logging: Comprehensive logging to diagnose issues quickly
- Documentation: Keep this documentation updated with new patterns and solutions
Related Files
src/rhesis/backend/worker.py
- Celery configurationsrc/rhesis/backend/tasks/execution/orchestration.py
- Chord implementationsrc/rhesis/backend/tasks/execution/test.py
- Individual task implementationsrc/rhesis/backend/tasks/execution/results.py
- Chord callback implementationsrc/rhesis/backend/tasks/execution/chord_monitor.py
- Monitoring utilitiesfix_chords.py
- Quick monitoring script