Skip to Content
DevelopmentWorkerMulti-Worker RPC

Multi-Worker RPC Coordination

When running multiple backend workers (e.g., --workers 4), RPC requests need to be routed to the specific worker that holds the WebSocket connection to the SDK. This is achieved through direct worker routing using Redis.

Architecture Overview

The system uses a routing registry in Redis to track which worker owns each SDK connection, then routes RPC requests directly to that worker via dedicated channels.

Implementation

Worker Registration

When a backend worker establishes a WebSocket connection with an SDK, it registers itself as the handler:

manager.py
async def _register_worker_for_connection(self, connection_id: str):
    """Register this worker as handler for a connection."""
    routing_key = f"ws:routing:{connection_id}"
    
    # Register with 30s TTL (refreshed every 10s by heartbeat)
    await redis_manager.client.setex(
        routing_key,
        30,
        self.worker_id  # e.g., "backend@server1-a1b2c3d4"
    )

Key points:

  • Each backend worker has a unique ID: backend@{hostname}-{uuid}
  • Registration key format: ws:routing:{project_id}:{environment}
  • 30-second TTL prevents stale registrations; refreshed by heartbeat loop
  • On disconnect, worker unregisters by deleting the routing key

Direct Routing

RPC clients (Celery workers) use the routing registry to send requests directly to the correct worker:

rpc_client.py
# STEP 1: Look up which worker has the connection
routing_key = f"ws:routing:{project_id}:{environment}"
worker_id = await redis.get(routing_key)

if not worker_id:
    # No worker registered - SDK is disconnected
    return {"error": "sdk_disconnected"}

# STEP 2: Route request directly to that worker's queue
worker_channel = f"ws:rpc:{worker_id}"
await redis.rpush(worker_channel, json.dumps(request))

# STEP 3: Wait for response on dedicated channel
response_channel = f"ws:rpc:response:{test_run_id}"
result = await wait_for_response(response_channel, timeout=30)

Benefits of direct routing:

  • No broadcast to all workers - only the correct worker receives the request
  • Fail-fast behavior when SDK is disconnected (no worker registered)
  • No race conditions or spurious errors from workers without connections

Worker Request Processing

Each backend worker runs a listener loop for its dedicated channel:

manager.py
async def _listen_for_rpc_requests(self):
    """Listen for RPC requests on worker-specific channel."""
    worker_channel = f"ws:rpc:{self.worker_id}"
    
    while True:
        # BLPOP: blocking list pop with 1s timeout
        result = await redis.blpop(worker_channel, timeout=1)
        
        if result:
            _, message = result
            request = json.loads(message)
            
            # Direct routing guarantees we have the connection
            await self._handle_rpc_request(request)

async def _handle_rpc_request(self, request: Dict[str, Any]):
    """Handle RPC request - connection is guaranteed by direct routing."""
    key = self.get_connection_key(project_id, environment)
    
    if key not in self._connections:
        # Rare race condition: routing stale but connection closed
        await self._publish_error_response(
            request_id, key, "Worker routing mismatch"
        )
        return
    
    # Forward to SDK via WebSocket
    await self._forward_to_sdk(request_id, key, function_name, inputs)

Why this works:

  • Each worker only listens to its own channel ws:rpc:{worker_id}
  • Requests are queued (Redis list), so no messages are lost
  • BLPOP is blocking but efficient (1s timeout allows graceful shutdown)

Heartbeat Mechanism

Worker registrations expire after 30 seconds to prevent stale entries. A heartbeat loop refreshes the registration every 10 seconds:

manager.py
async def _heartbeat_loop(self):
    """Refresh worker registration every 10s to maintain routing."""
    while True:
        await asyncio.sleep(10)
        
        for connection_id in self._connections.keys():
            routing_key = f"ws:routing:{connection_id}"
            await redis.setex(routing_key, 30, self.worker_id)

This ensures:

  • Crashed workers are automatically unregistered after 30s
  • Active workers maintain their routing entries
  • Clients don’t route to dead workers

Monitoring

Expected Log Patterns

RPC Client (Celery worker):

code.txt
DEBUG - Routed RPC request invoke_abc123 to worker backend@server1-a1b2c3d4 (chat)

Backend Worker (the one with connection):

code.txt
INFO - 🎧 RPC LISTENER STARTED - Listening on 'ws:rpc:backend@server1-a1b2c3d4' channel
DEBUG - RPC request received: invoke_abc123 - chat
INFO - Forwarding RPC request invoke_abc123 (chat) to SDK

Other backend workers:

These workers receive no requests for this connection - their listeners are idle. They only process requests for connections they own.

Problematic Patterns

SDK disconnected but routing key exists:

ERROR - SDK connection project123:prod is not available (no worker registered)

If this happens frequently, check:

  • Worker heartbeat is running (should refresh routing every 10s)
  • Redis connectivity between workers and Redis
  • Worker registration on connection establishment

Worker routing mismatch:

ERROR - Worker routing mismatch: received RPC for project123:prod but connection not found

This indicates a race condition where:

  • Routing key points to this worker
  • But WebSocket connection was closed/cleaned up
  • Usually resolves when routing key expires (30s) or client retries

Testing

To verify multi-worker routing:

  1. Start multiple workers: --workers 4
  2. Enable DEBUG logging: Set log level to DEBUG
  3. Connect SDK: Establish WebSocket connection
  4. Check routing registration:
    redis-cli GET ws:routing:project_id:environment # Should return: backend@hostname-uuid
  5. Execute test: Trigger SDK function invocation
  6. Verify logs:
    • RPC client logs show routing to specific worker
    • Only that worker logs Forwarding RPC request ... to SDK
    • Other workers have no logs for this request

Key Implementation Files

  • manager.py: Worker registration, RPC listener loop, request handling, heartbeat
  • rpc_client.py: Routing lookup, direct request routing, response handling
  • redis_client.py: Redis connection management and pub/sub infrastructure

Architecture Diagram

code.txt
┌─────────────────────────────────────────────────────┐
│                    Redis                            │
│                                                     │
│  Routing Registry:                                  │
│  ws:routing:project123:prod = "backend@srv1-abc"   │
│                                                     │
│  Worker Queues (Lists):                            │
│  ws:rpc:backend@srv1-abc = [request1, request2...] │
│  ws:rpc:backend@srv2-def = [request3, request4...] │
│                                                     │
│  Response Channels (Pub/Sub):                      │
│  ws:rpc:response:{test_run_id}                     │
└─────────────────────────────────────────────────────┘
       ▲                           ▲
       │                           │
  ┌────┴──────┐               ┌───┴─────┐
  │ 1. SETEX  │               │ 2. GET  │
  │ Register  │               │ Lookup  │
  │           │               │         │
  └───────────┘               └─────────┘
       │                           │
┌────────▼──────────┐       ┌────────▼──────────┐
│ Backend Worker 1  │       │  Celery Worker    │
│ (Has WebSocket)   │       │                   │
│                   │       │ 3. RPUSH request  │
│ ws:rpc:backend@   │◄──────┤    to worker's    │
│   srv1-abc        │       │    queue          │
│                   │       │                   │
│ 4. BLPOP (poll)   │       │ 5. Subscribe to   │
│                   │       │    response       │
│ 6. Forward to SDK │       │    channel        │
└─────────┬─────────┘       └────────┬──────────┘
        │                          ▲
        │                          │
        ▼                          │
   ┌─────────┐              ┌──────┴───────┐
   │   SDK   │──────────────│ 7. Publish   │
   │ (Client)│   Result     │    response  │
   └─────────┘              └──────────────┘

┌────────────────────────────────────────────────┐
│ Backend Worker 2 (Idle for this connection)   │
│                                                │
│ ws:rpc:backend@srv2-def                       │
│   - No requests for project123:prod           │
│   - Handles connections for other projects    │
└────────────────────────────────────────────────┘

Flow:

  1. Backend worker registers itself when SDK connects
  2. Celery worker looks up which backend worker has the connection
  3. Request is pushed directly to that worker’s queue
  4. Worker polls its queue and receives the request
  5. Celery worker subscribes to response channel
  6. Backend worker forwards request to SDK via WebSocket
  7. SDK result is published to response channel
  8. Celery worker receives result and returns it

Benefits of Direct Routing

Compared to broadcast-based approaches:

  • Efficiency: Only the relevant worker receives requests (no wasted processing)
  • Fail-fast: Immediate error if SDK is disconnected (no timeout waiting)
  • No race conditions: Single source of truth for which worker has the connection
  • Scalability: O(1) routing lookup regardless of number of workers
  • Simplicity: No complex coordination logic or connection location checking

Reliability:

  • Heartbeat keeps routing fresh (30s TTL, 10s refresh)
  • Stale entries expire automatically
  • BLPOP queuing ensures no dropped requests
  • Worker-specific queues prevent message interference