Got it—this code explains why you’re seeing “one after another.” ### What’s causing the serialization 1. **Per-message `await` in a loop** ```ts for (const msg of batch.messages) { ... await agent.run(); // <- serial } ``` Even if your queue allows high concurrency, this loop processes **one message at a time** within the same invocation. 2. **`max_batch_size = 1`** With batches of 1, a single invocation won’t have anything to parallelize internally. You’re relying solely on the platform to spin up parallel invocations, which may scale gradually. 3. **Dynamic import inside the loop** ```ts const { EngageAgentRunner } = await import('../dist/jobs/worker/engage-runner'); ``` Doing this **per message** adds latency and can serialize the flow. Import once. --- ### Fix: run messages concurrently within an invocation * Increase batch size (e.g., 10–50) and **process the batch with `Promise.all`**. * Import once at the top (or once per handler before the map). ```ts // import at top-level so it’s cached once per isolate import { EngageAgentRunner } from '../dist/jobs/worker/engage-runner'; export default { async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) { switch (event.cron) { case '*/30 * * * *': await handleEngageJobs(env); break; } }, async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise { console.log(`[${new Date().toISOString()}] Received batch of ${batch.messages.length}`); // Optional: cap in-flight concurrency to avoid exhausting downstreams const limit = pLimit(10); // run up to 10 jobs at once inside this invocation await Promise.all( batch.messages.map((msg) => limit(async () => { const { projectId, type } = msg.body; console.log(`[${new Date().toISOString()}] Start message for ${type}: ${projectId}`); try { switch (type) { case 'ENGAGE': { const agent = new EngageAgentRunner(projectId); await agent.run(); break; } default: console.warn(`[${new Date().toISOString()}] Unknown job type: ${type}`); } } catch (err) { console.error(`[${new Date().toISOString()}] Failed to process ${type} for ${projectId}:`, err); // Rethrow if you want the entire batch retried; otherwise swallow and optionally DLQ here. } finally { console.log(`[${new Date().toISOString()}] End message for ${type}: ${projectId}`); } }) ) ); }, }; // tiny in-file p-limit to avoid an extra dep function pLimit(n: number) { const queue: Array<() => void> = []; let active = 0; const next = () => { if (active >= n || queue.length === 0) return; active++; const run = queue.shift()!; run(); }; return (fn: () => Promise): Promise => new Promise((resolve, reject) => { queue.push(() => { fn().then(resolve, reject).finally(() => { active--; next(); }); }); next(); }); } async function handleEngageJobs(env: Env) { const { getAllRunnableProjects } = await import('../dist/services/db'); const projects = await getAllRunnableProjects(); if (!projects || projects.length === 0) { console.log('No projects for ENGAGE jobs.'); return; } const messages = projects.map((projectId) => ({ body: { projectId, type: 'ENGAGE' } })); console.log('messages', messages); await env.ENGAGE_QUEUE.sendBatch(messages); console.log(`Enqueued ${messages.length} ENGAGE jobs`); } ``` **Config tips** * Set `max_batch_size` to something >1 (e.g., 25 or 50) so each invocation has enough work to parallelize. * Keep `max_concurrency` (e.g., 50) so multiple invocations can run in parallel *across batches* when backlog is high. * Verify the consumer binding actually has those values in `wrangler.toml` (it’s easy to set them on the producer by mistake). **Operational guidance** * Your 90s jobs are fine if they’re mostly I/O. If they’re CPU-bound, Workers won’t “multithread” them—break the job into smaller async steps, or offload heavy compute elsewhere. * If you need **per-project isolation** plus automatic ordering for the same `projectId`, consider dispatching to a **Durable Object per project** from the queue handler; different projects run concurrently, same project serializes. Do the two knobs together—**in-invocation parallelism** (`Promise.all` + batch size) and **platform concurrency** (`max_concurrency` + enough backlog)—and you’ll see true concurrency instead of the current single-file line.