Skip to main content

Overview

The @tinyclaw/queue package provides per-session task serialization for Tiny Claw. It ensures messages from the same user are processed sequentially while allowing different users to run in parallel.

Design

  • Per-session locking — Messages for the same sessionKey run sequentially
  • Parallel sessions — Different sessionKey values run in parallel
  • Promise-chain pattern — No external dependencies (no BullMQ, no Redis)
  • Automatic cleanup — Chains are removed when queue empties
  • Error resilience — Task failures don’t block the queue

Installation

bun add @tinyclaw/queue

Usage

Basic Setup

import { createSessionQueue } from '@tinyclaw/queue';

const queue = createSessionQueue();

// Enqueue a task for user123
const result = await queue.enqueue('user123', async () => {
  // This runs sequentially with other user123 tasks
  return await processMessage('Hello!');
});

console.log(result);

Multiple Users in Parallel

const queue = createSessionQueue();

// These run in parallel (different users)
const [result1, result2] = await Promise.all([
  queue.enqueue('user123', async () => {
    await new Promise(resolve => setTimeout(resolve, 1000));
    return 'User 123 done';
  }),
  queue.enqueue('user456', async () => {
    await new Promise(resolve => setTimeout(resolve, 1000));
    return 'User 456 done';
  }),
]);

// Both complete in ~1 second (parallel execution)

Same User Sequential Processing

const queue = createSessionQueue();

// These run sequentially (same user)
const start = Date.now();

const promises = [
  queue.enqueue('user123', async () => {
    await new Promise(resolve => setTimeout(resolve, 1000));
    return 'Task 1';
  }),
  queue.enqueue('user123', async () => {
    await new Promise(resolve => setTimeout(resolve, 1000));
    return 'Task 2';
  }),
];

const results = await Promise.all(promises);

const duration = Date.now() - start;
console.log(duration); // ~2000ms (sequential execution)

Check Pending Tasks

const queue = createSessionQueue();

// Queue some tasks
queue.enqueue('user123', async () => {
  await new Promise(resolve => setTimeout(resolve, 1000));
});

queue.enqueue('user123', async () => {
  await new Promise(resolve => setTimeout(resolve, 1000));
});

// Check how many are pending
console.log(queue.pending('user123')); // 2
console.log(queue.pending('user456')); // 0

Error Handling

const queue = createSessionQueue();

// Task 1 fails, but task 2 still runs
try {
  await queue.enqueue('user123', async () => {
    throw new Error('Task 1 failed');
  });
} catch (error) {
  console.error(error); // Task 1 failed
}

// Task 2 runs despite task 1 failing
const result = await queue.enqueue('user123', async () => {
  return 'Task 2 success';
});

console.log(result); // 'Task 2 success'

Cleanup

const queue = createSessionQueue();

// Stop accepting new tasks
queue.stop();

// Trying to enqueue after stop throws an error
try {
  await queue.enqueue('user123', async () => {
    return 'This will fail';
  });
} catch (error) {
  console.error(error.message); // 'Queue has been stopped'
}

API Reference

createSessionQueue()

Create a new session queue instance. Returns: SessionQueue

SessionQueue

enqueue(sessionKey, task)

Enqueue a task for a specific session.
sessionKey
string
required
Session identifier (typically user ID). Tasks with the same sessionKey run sequentially.
task
() => Promise<T>
required
Async function to execute. Must return a Promise.
Returns: Promise<T> - Resolves with the task’s return value The promise resolves/rejects when the task completes. Tasks for the same sessionKey are guaranteed to run sequentially in the order they were enqueued.

pending(sessionKey)

Get the number of pending tasks for a session.
sessionKey
string
required
Session identifier to query.
Returns: number - Count of pending tasks (including currently running task)

stop()

Stop the queue and reject all new tasks. After calling stop(), any new enqueue() calls will reject with an error. Already-queued tasks are not affected.

How It Works

Promise Chain Pattern

The queue uses a promise chain for each session:
// Session "user123" chain:
Promise.resolve()
  .then(() => task1())
  .then(() => task2())
  .then(() => task3())
  // ...
Each new task extends the chain for its session, ensuring sequential execution.

Automatic Cleanup

When a session’s queue empties (pending count reaches 0), the promise chain is removed from memory:
// After the last task completes:
chains.delete(sessionKey);
counts.delete(sessionKey);

Error Resilience

Task failures don’t break the chain:
.then(
  () => task(),  // Success path
  () => task(),  // Error path (still runs task)
)
This ensures subsequent tasks run even if previous tasks throw.

Examples

Message Processing

import { createSessionQueue } from '@tinyclaw/queue';

const queue = createSessionQueue();

async function handleMessage(userId: string, message: string) {
  return queue.enqueue(userId, async () => {
    // Load user context
    const context = await loadContext(userId);
    
    // Process message
    const response = await agent.chat(context, message);
    
    // Save updated context
    await saveContext(userId, context);
    
    return response;
  });
}

// Messages from same user are processed sequentially
await handleMessage('user123', 'Hello');
await handleMessage('user123', 'How are you?');

// Messages from different users run in parallel
Promise.all([
  handleMessage('user123', 'Message 1'),
  handleMessage('user456', 'Message 2'),
]);

Rate Limiting

const queue = createSessionQueue();

async function rateLimitedOperation(userId: string, operation: () => Promise<void>) {
  const pending = queue.pending(userId);
  
  if (pending > 10) {
    throw new Error('Too many pending operations');
  }
  
  return queue.enqueue(userId, operation);
}

Task Monitoring

const queue = createSessionQueue();
const stats = { completed: 0, failed: 0 };

async function monitoredTask<T>(sessionKey: string, task: () => Promise<T>): Promise<T> {
  try {
    const result = await queue.enqueue(sessionKey, task);
    stats.completed++;
    return result;
  } catch (error) {
    stats.failed++;
    throw error;
  }
}

// Use it
await monitoredTask('user123', async () => {
  return await processMessage();
});

console.log(stats); // { completed: 1, failed: 0 }

Graceful Shutdown

const queue = createSessionQueue();

process.on('SIGTERM', () => {
  console.log('Stopping queue...');
  queue.stop();
  
  // Wait for pending tasks to complete
  const checkInterval = setInterval(() => {
    const totalPending = /* sum of all pending */;
    if (totalPending === 0) {
      clearInterval(checkInterval);
      console.log('All tasks completed');
      process.exit(0);
    }
  }, 100);
});

Best Practices

  1. Use consistent session keys:
    // Good: consistent user ID
    queue.enqueue(userId, task);
    
    // Avoid: inconsistent keys for same user
    queue.enqueue(`user-${userId}`, task);
    queue.enqueue(userId, task);
    
  2. Keep tasks focused:
    // Good: single responsibility
    await queue.enqueue(userId, async () => {
      return await processMessage(message);
    });
    
    // Avoid: multiple unrelated operations
    await queue.enqueue(userId, async () => {
      await processMessage(message);
      await updateProfile(userId);
      await sendEmail(userId);
    });
    
  3. Handle errors in tasks:
    queue.enqueue(userId, async () => {
      try {
        return await operation();
      } catch (error) {
        logger.error('Task failed', { userId, error });
        throw error; // Re-throw to propagate
      }
    });
    
  4. Don’t await enqueue inside tasks:
    // Bad: creates nested chains (deadlock risk)
    queue.enqueue('user123', async () => {
      await queue.enqueue('user123', async () => {
        // This waits for itself to complete!
      });
    });
    
  5. Monitor queue depth:
    const MAX_PENDING = 50;
    
    const pending = queue.pending(userId);
    if (pending > MAX_PENDING) {
      throw new Error('Queue overflow');
    }
    

Performance Characteristics

  • Memory: O(1) per active session + O(N) for queued tasks
  • Enqueue: O(1) — constant time to add to chain
  • Execution: Sequential per session, parallel across sessions
  • Cleanup: Automatic when session queue empties

When to Use

Use @tinyclaw/queue when you need:
  • ✅ Sequential processing per user/session
  • ✅ Parallel processing across users
  • ✅ In-memory, lightweight queue
  • ✅ No external dependencies (Redis, etc.)
Consider alternatives when you need:
  • ❌ Persistent queue (survives process restart)
  • ❌ Distributed queue (across multiple servers)
  • ❌ Advanced features (retries, delays, priorities)
  • ❌ Visibility into queue state (dashboard, metrics)
For these cases, consider BullMQ or similar solutions.