Overview
The @tinyclaw/queue package provides per-session task serialization for Tiny Claw. It ensures messages from the same user are processed sequentially while allowing different users to run in parallel.
Design
- Per-session locking — Messages for the same
sessionKey run sequentially
- Parallel sessions — Different
sessionKey values run in parallel
- Promise-chain pattern — No external dependencies (no BullMQ, no Redis)
- Automatic cleanup — Chains are removed when queue empties
- Error resilience — Task failures don’t block the queue
Installation
Usage
Basic Setup
import { createSessionQueue } from '@tinyclaw/queue';
const queue = createSessionQueue();
// Enqueue a task for user123
const result = await queue.enqueue('user123', async () => {
// This runs sequentially with other user123 tasks
return await processMessage('Hello!');
});
console.log(result);
Multiple Users in Parallel
const queue = createSessionQueue();
// These run in parallel (different users)
const [result1, result2] = await Promise.all([
queue.enqueue('user123', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
return 'User 123 done';
}),
queue.enqueue('user456', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
return 'User 456 done';
}),
]);
// Both complete in ~1 second (parallel execution)
Same User Sequential Processing
const queue = createSessionQueue();
// These run sequentially (same user)
const start = Date.now();
const promises = [
queue.enqueue('user123', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
return 'Task 1';
}),
queue.enqueue('user123', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
return 'Task 2';
}),
];
const results = await Promise.all(promises);
const duration = Date.now() - start;
console.log(duration); // ~2000ms (sequential execution)
Check Pending Tasks
const queue = createSessionQueue();
// Queue some tasks
queue.enqueue('user123', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
});
queue.enqueue('user123', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
});
// Check how many are pending
console.log(queue.pending('user123')); // 2
console.log(queue.pending('user456')); // 0
Error Handling
const queue = createSessionQueue();
// Task 1 fails, but task 2 still runs
try {
await queue.enqueue('user123', async () => {
throw new Error('Task 1 failed');
});
} catch (error) {
console.error(error); // Task 1 failed
}
// Task 2 runs despite task 1 failing
const result = await queue.enqueue('user123', async () => {
return 'Task 2 success';
});
console.log(result); // 'Task 2 success'
Cleanup
const queue = createSessionQueue();
// Stop accepting new tasks
queue.stop();
// Trying to enqueue after stop throws an error
try {
await queue.enqueue('user123', async () => {
return 'This will fail';
});
} catch (error) {
console.error(error.message); // 'Queue has been stopped'
}
API Reference
createSessionQueue()
Create a new session queue instance.
Returns: SessionQueue
SessionQueue
enqueue(sessionKey, task)
Enqueue a task for a specific session.
Session identifier (typically user ID). Tasks with the same sessionKey run sequentially.
Async function to execute. Must return a Promise.
Returns: Promise<T> - Resolves with the task’s return value
The promise resolves/rejects when the task completes. Tasks for the same sessionKey are guaranteed to run sequentially in the order they were enqueued.
pending(sessionKey)
Get the number of pending tasks for a session.
Session identifier to query.
Returns: number - Count of pending tasks (including currently running task)
stop()
Stop the queue and reject all new tasks.
After calling stop(), any new enqueue() calls will reject with an error. Already-queued tasks are not affected.
How It Works
Promise Chain Pattern
The queue uses a promise chain for each session:
// Session "user123" chain:
Promise.resolve()
.then(() => task1())
.then(() => task2())
.then(() => task3())
// ...
Each new task extends the chain for its session, ensuring sequential execution.
Automatic Cleanup
When a session’s queue empties (pending count reaches 0), the promise chain is removed from memory:
// After the last task completes:
chains.delete(sessionKey);
counts.delete(sessionKey);
Error Resilience
Task failures don’t break the chain:
.then(
() => task(), // Success path
() => task(), // Error path (still runs task)
)
This ensures subsequent tasks run even if previous tasks throw.
Examples
Message Processing
import { createSessionQueue } from '@tinyclaw/queue';
const queue = createSessionQueue();
async function handleMessage(userId: string, message: string) {
return queue.enqueue(userId, async () => {
// Load user context
const context = await loadContext(userId);
// Process message
const response = await agent.chat(context, message);
// Save updated context
await saveContext(userId, context);
return response;
});
}
// Messages from same user are processed sequentially
await handleMessage('user123', 'Hello');
await handleMessage('user123', 'How are you?');
// Messages from different users run in parallel
Promise.all([
handleMessage('user123', 'Message 1'),
handleMessage('user456', 'Message 2'),
]);
Rate Limiting
const queue = createSessionQueue();
async function rateLimitedOperation(userId: string, operation: () => Promise<void>) {
const pending = queue.pending(userId);
if (pending > 10) {
throw new Error('Too many pending operations');
}
return queue.enqueue(userId, operation);
}
Task Monitoring
const queue = createSessionQueue();
const stats = { completed: 0, failed: 0 };
async function monitoredTask<T>(sessionKey: string, task: () => Promise<T>): Promise<T> {
try {
const result = await queue.enqueue(sessionKey, task);
stats.completed++;
return result;
} catch (error) {
stats.failed++;
throw error;
}
}
// Use it
await monitoredTask('user123', async () => {
return await processMessage();
});
console.log(stats); // { completed: 1, failed: 0 }
Graceful Shutdown
const queue = createSessionQueue();
process.on('SIGTERM', () => {
console.log('Stopping queue...');
queue.stop();
// Wait for pending tasks to complete
const checkInterval = setInterval(() => {
const totalPending = /* sum of all pending */;
if (totalPending === 0) {
clearInterval(checkInterval);
console.log('All tasks completed');
process.exit(0);
}
}, 100);
});
Best Practices
-
Use consistent session keys:
// Good: consistent user ID
queue.enqueue(userId, task);
// Avoid: inconsistent keys for same user
queue.enqueue(`user-${userId}`, task);
queue.enqueue(userId, task);
-
Keep tasks focused:
// Good: single responsibility
await queue.enqueue(userId, async () => {
return await processMessage(message);
});
// Avoid: multiple unrelated operations
await queue.enqueue(userId, async () => {
await processMessage(message);
await updateProfile(userId);
await sendEmail(userId);
});
-
Handle errors in tasks:
queue.enqueue(userId, async () => {
try {
return await operation();
} catch (error) {
logger.error('Task failed', { userId, error });
throw error; // Re-throw to propagate
}
});
-
Don’t await enqueue inside tasks:
// Bad: creates nested chains (deadlock risk)
queue.enqueue('user123', async () => {
await queue.enqueue('user123', async () => {
// This waits for itself to complete!
});
});
-
Monitor queue depth:
const MAX_PENDING = 50;
const pending = queue.pending(userId);
if (pending > MAX_PENDING) {
throw new Error('Queue overflow');
}
- Memory: O(1) per active session + O(N) for queued tasks
- Enqueue: O(1) — constant time to add to chain
- Execution: Sequential per session, parallel across sessions
- Cleanup: Automatic when session queue empties
When to Use
Use @tinyclaw/queue when you need:
- ✅ Sequential processing per user/session
- ✅ Parallel processing across users
- ✅ In-memory, lightweight queue
- ✅ No external dependencies (Redis, etc.)
Consider alternatives when you need:
- ❌ Persistent queue (survives process restart)
- ❌ Distributed queue (across multiple servers)
- ❌ Advanced features (retries, delays, priorities)
- ❌ Visibility into queue state (dashboard, metrics)
For these cases, consider BullMQ or similar solutions.