Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions apps/dokploy/__test__/queues/global-state.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { afterEach, describe, expect, it } from "vitest";

/**
* Regression test for the Next.js double-load concurrency bug.
*
* Symptom: with `deploymentConcurrency=2` set, peak active jobs in BullMQ was 4.
* Root cause: `workers`/`inflight`/`queues` Maps in `deployments-queue.ts` and
* `queueSetup.ts` were module-scoped, not pinned to `globalThis`. Next.js
* loaded each file from both the workspace package AND its own runtime,
* producing two `workers` Maps in the same Node process. Each constructed an
* independent BullMQ Worker against the same queue, doubling effective
* concurrency.
*
* Fix: pin queue runtime state behind `Symbol.for(...)` keys on globalThis,
* matching the pattern already used by `packages/server/src/utils/process/
* job-context.ts` for the AsyncLocalStorage child registry.
*
* These tests verify the *pattern itself* — the bare module imports trigger
* heavy side-effects (BullMQ, DB barrel) that are hard to mock cleanly in unit
* tests, but the symbol-keyed pinning is a pure pattern we can lock in here.
*/

const QUEUE_STATE_KEY = Symbol.for("dokploy.deploymentQueue.state");
const QUEUES_KEY = Symbol.for("dokploy.deploymentQueue.queues");

type GlobalAny = typeof globalThis & Record<symbol, unknown>;

describe("queue runtime state pinning pattern", () => {
afterEach(() => {
delete (globalThis as GlobalAny)[QUEUE_STATE_KEY];
delete (globalThis as GlobalAny)[QUEUES_KEY];
});

it("Symbol.for() returns the same key across calls (cross-realm sharing primitive)", () => {
const a = Symbol.for("dokploy.deploymentQueue.state");
const b = Symbol.for("dokploy.deploymentQueue.state");
expect(a).toBe(b);
// Sanity: a fresh local symbol is NOT shared.
const local = Symbol("dokploy.deploymentQueue.state");
expect(local).not.toBe(a);
});

it("nullish-coalesce assignment (??=) preserves the first writer's object", () => {
// Simulate the deployments-queue.ts pattern:
// const queueState = g[KEY] ?? (g[KEY] = { workers: new Map(), ... });
const g = globalThis as GlobalAny;
const initial = { workers: new Map(), inflight: new Map() };
const a =
(g[QUEUE_STATE_KEY] as typeof initial | undefined) ??
((g[QUEUE_STATE_KEY] = initial), initial);
const b =
(g[QUEUE_STATE_KEY] as typeof initial | undefined) ??
((g[QUEUE_STATE_KEY] = { workers: new Map(), inflight: new Map() }),
g[QUEUE_STATE_KEY] as typeof initial);
expect(a).toBe(b);
expect(a.workers).toBe(b.workers);
});

it("a second module load (with cleared cache) reuses globalThis-pinned Maps", () => {
// First "module load" puts state on globalThis.
const moduleA = (() => {
const g = globalThis as GlobalAny;
return ((g[QUEUE_STATE_KEY] as { workers: Map<unknown, unknown> }) ??
(g[QUEUE_STATE_KEY] = {
workers: new Map(),
inflight: new Map(),
cancelSubscriber: null,
started: false,
})) as { workers: Map<string, string> };
})();
moduleA.workers.set("srv-1", "worker-instance-A");

// Second "module load" — different lexical scope, same globalThis.
const moduleB = (() => {
const g = globalThis as GlobalAny;
return ((g[QUEUE_STATE_KEY] as { workers: Map<unknown, unknown> }) ??
(g[QUEUE_STATE_KEY] = {
workers: new Map(),
inflight: new Map(),
cancelSubscriber: null,
started: false,
})) as { workers: Map<string, string> };
})();

// CRITICAL: same workers Map across both "loads".
expect(moduleB.workers).toBe(moduleA.workers);
expect(moduleB.workers.get("srv-1")).toBe("worker-instance-A");
});

it("without pinning, two module instances would each have their own Map", () => {
// Sanity: prove the bug shape this fix prevents.
const moduleA = { workers: new Map<string, string>() };
const moduleB = { workers: new Map<string, string>() };
moduleA.workers.set("srv-1", "worker-A");
moduleB.workers.set("srv-1", "worker-B");
// Two separate Maps. BullMQ would construct two Workers for the same
// queue, doubling effective concurrency.
expect(moduleA.workers).not.toBe(moduleB.workers);
expect(moduleA.workers.get("srv-1")).not.toEqual(
moduleB.workers.get("srv-1"),
);
});

it("symbol keys are stable (cross-package Realm-safe)", () => {
// Symbol.for() keys are interned in the global symbol registry, which
// is shared across realms. This is what makes the fix work for
// Next.js's workspace-package double-load: both copies of the module
// resolve to the SAME symbol via Symbol.for(), so they hit the same
// globalThis property.
expect(QUEUE_STATE_KEY).toBe(Symbol.for("dokploy.deploymentQueue.state"));
expect(QUEUES_KEY).toBe(Symbol.for("dokploy.deploymentQueue.queues"));
expect(QUEUE_STATE_KEY).not.toBe(QUEUES_KEY);
});
});
43 changes: 43 additions & 0 deletions apps/dokploy/__test__/queues/routing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { describe, expect, it } from "vitest";
import {
CANCEL_CHANNEL,
getQueueName,
getTargetKey,
LOCAL_TARGET,
} from "../../server/queues/queue-routing";

describe("queue routing", () => {
it("routes jobs without a serverId to LOCAL_TARGET", () => {
expect(getTargetKey({ serverId: undefined })).toBe(LOCAL_TARGET);
expect(getTargetKey({} as { serverId?: string })).toBe(LOCAL_TARGET);
});

it("routes jobs with a serverId to that server's key", () => {
expect(getTargetKey({ serverId: "srv-abc" })).toBe("srv-abc");
});

it("preserves canary's 'deployments' queue name for LOCAL_TARGET (upgrade-safe)", () => {
expect(getQueueName(LOCAL_TARGET)).toBe("deployments");
});

it("namespaces remote queues per server (uses __ since BullMQ rejects :)", () => {
expect(getQueueName("srv-abc")).toBe("deployments__srv-abc");
expect(getQueueName("srv-xyz")).toBe("deployments__srv-xyz");
});

it("never produces a queue name containing : (BullMQ requirement)", () => {
expect(getQueueName(LOCAL_TARGET)).not.toContain(":");
expect(getQueueName("srv-with-colons-disallowed")).not.toContain(":");
});

it("uses a stable cross-process cancel channel name", () => {
expect(CANCEL_CHANNEL).toBe("dokploy:deployments:cancel");
});

it("treats null/undefined serverId identically (no NaN keys)", () => {
expect(getTargetKey({ serverId: undefined })).toBe(LOCAL_TARGET);
expect(getTargetKey({ serverId: null as unknown as undefined })).toBe(
LOCAL_TARGET,
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const baseSettings: WebServerSettings = {
cleanupCacheApplications: false,
cleanupCacheOnCompose: false,
cleanupCacheOnPreviews: false,
deploymentConcurrency: 1,
createdAt: null,
updatedAt: new Date(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ export const ShowDeployments = ({
},
);

const { data: isCloud } = api.settings.isCloud.useQuery();

const { mutateAsync: rollback, isPending: isRollingBack } =
api.rollback.rollback.useMutation();
const { mutateAsync: killProcess, isPending: isKillingProcess } =
Expand Down Expand Up @@ -121,7 +119,7 @@ export const ShowDeployments = ({

// Check for stuck deployment (more than 9 minutes) - only for the most recent deployment
const stuckDeployment = useMemo(() => {
if (!isCloud || !deployments || deployments.length === 0) return null;
if (!deployments || deployments.length === 0) return null;

const now = Date.now();
const NINE_MINUTES = 10 * 60 * 1000; // 9 minutes in milliseconds
Expand All @@ -141,7 +139,7 @@ export const ShowDeployments = ({
const elapsed = now - startTime;

return elapsed > NINE_MINUTES ? mostRecentDeployment : null;
}, [isCloud, deployments]);
}, [deployments]);
useEffect(() => {
setUrl(document.location.origin);
}, []);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export function ShowQueueTable(props: { embedded?: boolean }) {
undefined,
{ refetchInterval: 3000 },
);
const { data: isCloud } = api.settings.isCloud.useQuery();
const utils = api.useUtils();
const {
mutateAsync: cancelApplicationDeployment,
Expand Down Expand Up @@ -157,8 +156,7 @@ export function ShowQueueTable(props: { embedded?: boolean }) {
</span>
)}
{isCloud &&
row.state === "active" &&
{row.state === "active" &&
(d?.applicationId != null ||
d?.composeId != null) && (
<Button
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { Info, Loader2 } from "lucide-react";
import { useEffect, useMemo, useState } from "react";
import { toast } from "sonner";
import { Alert, AlertDescription } from "@/components/ui/alert";
import { Button } from "@/components/ui/button";
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
DialogTrigger,
} from "@/components/ui/dialog";
import { DropdownMenuItem } from "@/components/ui/dropdown-menu";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { api } from "@/utils/api";

const MIN = 1;
const MAX = 10;

interface Props {
// `null` targets the local Dokploy host (persisted on webServerSettings).
serverId: string | null;
}

export const ChangeConcurrencyModal = ({ serverId }: Props) => {
const isLocal = serverId === null;
const [open, setOpen] = useState(false);

const serverQuery = api.server.one.useQuery(
{ serverId: serverId ?? "" },
{ enabled: !isLocal },
);
const webServerQuery = api.settings.getWebServerSettings.useQuery(undefined, {
enabled: isLocal,
});

const updateRemote = api.server.updateDeploymentConcurrency.useMutation();
const updateLocal = api.settings.updateDeploymentConcurrency.useMutation();
const update = isLocal ? updateLocal : updateRemote;

const currentConcurrency = isLocal
? webServerQuery.data?.deploymentConcurrency
: serverQuery.data?.deploymentConcurrency;

const queueQuery = api.deployment.queueList.useQuery(undefined, {
enabled: open,
refetchInterval: open ? 3000 : false,
});

const liveCounts = useMemo(() => {
const snapshots = queueQuery.data ?? [];
const matchesTarget = (job: {
serverId?: string | null;
}) => {
const target = job.serverId ?? null;
return target === (serverId ?? null);
};
let active = 0;
let pending = 0;
for (const s of snapshots) {
if (!matchesTarget(s.data)) continue;
if (s.state === "active") active++;
else pending++;
}
return { active, pending };
}, [queueQuery.data, serverId]);

const [value, setValue] = useState<number>(currentConcurrency ?? 1);

useEffect(() => {
if (open && typeof currentConcurrency === "number") {
setValue(currentConcurrency);
}
}, [open, currentConcurrency]);

const save = async () => {
if (!Number.isInteger(value) || value < MIN || value > MAX) {
toast.error(`Concurrency must be an integer between ${MIN} and ${MAX}`);
return;
}
try {
if (isLocal) {
await updateLocal.mutateAsync({ deploymentConcurrency: value });
await webServerQuery.refetch();
} else {
await updateRemote.mutateAsync({
serverId: serverId as string,
deploymentConcurrency: value,
});
await serverQuery.refetch();
}
toast.success("Deployment concurrency updated");
setOpen(false);
} catch (error) {
toast.error(
error instanceof Error ? error.message : "Failed to update concurrency",
);
}
};

const description = isLocal
? "Number of deployments the Dokploy host can run in parallel. Remote servers are unaffected."
: "Number of deployments this server may run in parallel. Other servers are unaffected.";

return (
<Dialog open={open} onOpenChange={setOpen}>
<DialogTrigger asChild>
<DropdownMenuItem
className="w-full cursor-pointer"
onSelect={(e) => e.preventDefault()}
>
<span>Deployment Concurrency</span>
</DropdownMenuItem>
</DialogTrigger>
<DialogContent className="sm:max-w-md">
<DialogHeader>
<DialogTitle>Deployment concurrency</DialogTitle>
<DialogDescription>{description}</DialogDescription>
</DialogHeader>

<div className="flex flex-col gap-3 py-2">
<div className="flex items-center justify-between">
<Label htmlFor="deployment-concurrency">
Concurrent deployments
</Label>
<span className="text-xs text-muted-foreground">
Live: {liveCounts.active} active · {liveCounts.pending} queued
</span>
</div>
<Input
id="deployment-concurrency"
type="number"
min={MIN}
max={MAX}
step={1}
value={value}
onChange={(event) => setValue(Number(event.target.value))}
/>
<p className="text-xs text-muted-foreground">
<span className="font-medium">1</span> = one deploy at a time
(safest). <span className="font-medium">2–{MAX}</span> = run that
many side-by-side; only raise if the host has spare CPU/RAM.
</p>
<Alert>
<Info className="h-4 w-4" />
<AlertDescription>
Each concurrent build uses roughly one CPU core and 2&nbsp;GB of
RAM while active. Saved value applies immediately; in-flight
deployments are not interrupted.
</AlertDescription>
</Alert>
</div>

<DialogFooter>
<Button
variant="ghost"
onClick={() => setOpen(false)}
disabled={update.isPending}
>
Cancel
</Button>
<Button onClick={save} disabled={update.isPending}>
{update.isPending ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Saving…
</>
) : (
"Save"
)}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
);
};
Loading
Loading