diff --git a/apps/dokploy/__test__/queues/global-state.test.ts b/apps/dokploy/__test__/queues/global-state.test.ts new file mode 100644 index 0000000000..a0ad191c31 --- /dev/null +++ b/apps/dokploy/__test__/queues/global-state.test.ts @@ -0,0 +1,114 @@ +import { afterEach, describe, expect, it } from "vitest"; + +/** + * Regression test for the Next.js double-load concurrency bug. + * + * Symptom: with `deploymentConcurrency=2` set, peak active jobs in BullMQ was 4. + * Root cause: `workers`/`inflight`/`queues` Maps in `deployments-queue.ts` and + * `queueSetup.ts` were module-scoped, not pinned to `globalThis`. Next.js + * loaded each file from both the workspace package AND its own runtime, + * producing two `workers` Maps in the same Node process. Each constructed an + * independent BullMQ Worker against the same queue, doubling effective + * concurrency. + * + * Fix: pin queue runtime state behind `Symbol.for(...)` keys on globalThis, + * matching the pattern already used by `packages/server/src/utils/process/ + * job-context.ts` for the AsyncLocalStorage child registry. + * + * These tests verify the *pattern itself* — the bare module imports trigger + * heavy side-effects (BullMQ, DB barrel) that are hard to mock cleanly in unit + * tests, but the symbol-keyed pinning is a pure pattern we can lock in here. + */ + +const QUEUE_STATE_KEY = Symbol.for("dokploy.deploymentQueue.state"); +const QUEUES_KEY = Symbol.for("dokploy.deploymentQueue.queues"); + +type GlobalAny = typeof globalThis & Record; + +describe("queue runtime state pinning pattern", () => { + afterEach(() => { + delete (globalThis as GlobalAny)[QUEUE_STATE_KEY]; + delete (globalThis as GlobalAny)[QUEUES_KEY]; + }); + + it("Symbol.for() returns the same key across calls (cross-realm sharing primitive)", () => { + const a = Symbol.for("dokploy.deploymentQueue.state"); + const b = Symbol.for("dokploy.deploymentQueue.state"); + expect(a).toBe(b); + // Sanity: a fresh local symbol is NOT shared. + const local = Symbol("dokploy.deploymentQueue.state"); + expect(local).not.toBe(a); + }); + + it("nullish-coalesce assignment (??=) preserves the first writer's object", () => { + // Simulate the deployments-queue.ts pattern: + // const queueState = g[KEY] ?? (g[KEY] = { workers: new Map(), ... }); + const g = globalThis as GlobalAny; + const initial = { workers: new Map(), inflight: new Map() }; + const a = + (g[QUEUE_STATE_KEY] as typeof initial | undefined) ?? + ((g[QUEUE_STATE_KEY] = initial), initial); + const b = + (g[QUEUE_STATE_KEY] as typeof initial | undefined) ?? + ((g[QUEUE_STATE_KEY] = { workers: new Map(), inflight: new Map() }), + g[QUEUE_STATE_KEY] as typeof initial); + expect(a).toBe(b); + expect(a.workers).toBe(b.workers); + }); + + it("a second module load (with cleared cache) reuses globalThis-pinned Maps", () => { + // First "module load" puts state on globalThis. + const moduleA = (() => { + const g = globalThis as GlobalAny; + return ((g[QUEUE_STATE_KEY] as { workers: Map }) ?? + (g[QUEUE_STATE_KEY] = { + workers: new Map(), + inflight: new Map(), + cancelSubscriber: null, + started: false, + })) as { workers: Map }; + })(); + moduleA.workers.set("srv-1", "worker-instance-A"); + + // Second "module load" — different lexical scope, same globalThis. + const moduleB = (() => { + const g = globalThis as GlobalAny; + return ((g[QUEUE_STATE_KEY] as { workers: Map }) ?? + (g[QUEUE_STATE_KEY] = { + workers: new Map(), + inflight: new Map(), + cancelSubscriber: null, + started: false, + })) as { workers: Map }; + })(); + + // CRITICAL: same workers Map across both "loads". + expect(moduleB.workers).toBe(moduleA.workers); + expect(moduleB.workers.get("srv-1")).toBe("worker-instance-A"); + }); + + it("without pinning, two module instances would each have their own Map", () => { + // Sanity: prove the bug shape this fix prevents. + const moduleA = { workers: new Map() }; + const moduleB = { workers: new Map() }; + moduleA.workers.set("srv-1", "worker-A"); + moduleB.workers.set("srv-1", "worker-B"); + // Two separate Maps. BullMQ would construct two Workers for the same + // queue, doubling effective concurrency. + expect(moduleA.workers).not.toBe(moduleB.workers); + expect(moduleA.workers.get("srv-1")).not.toEqual( + moduleB.workers.get("srv-1"), + ); + }); + + it("symbol keys are stable (cross-package Realm-safe)", () => { + // Symbol.for() keys are interned in the global symbol registry, which + // is shared across realms. This is what makes the fix work for + // Next.js's workspace-package double-load: both copies of the module + // resolve to the SAME symbol via Symbol.for(), so they hit the same + // globalThis property. + expect(QUEUE_STATE_KEY).toBe(Symbol.for("dokploy.deploymentQueue.state")); + expect(QUEUES_KEY).toBe(Symbol.for("dokploy.deploymentQueue.queues")); + expect(QUEUE_STATE_KEY).not.toBe(QUEUES_KEY); + }); +}); diff --git a/apps/dokploy/__test__/queues/routing.test.ts b/apps/dokploy/__test__/queues/routing.test.ts new file mode 100644 index 0000000000..cb0e837987 --- /dev/null +++ b/apps/dokploy/__test__/queues/routing.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it } from "vitest"; +import { + CANCEL_CHANNEL, + getQueueName, + getTargetKey, + LOCAL_TARGET, +} from "../../server/queues/queue-routing"; + +describe("queue routing", () => { + it("routes jobs without a serverId to LOCAL_TARGET", () => { + expect(getTargetKey({ serverId: undefined })).toBe(LOCAL_TARGET); + expect(getTargetKey({} as { serverId?: string })).toBe(LOCAL_TARGET); + }); + + it("routes jobs with a serverId to that server's key", () => { + expect(getTargetKey({ serverId: "srv-abc" })).toBe("srv-abc"); + }); + + it("preserves canary's 'deployments' queue name for LOCAL_TARGET (upgrade-safe)", () => { + expect(getQueueName(LOCAL_TARGET)).toBe("deployments"); + }); + + it("namespaces remote queues per server (uses __ since BullMQ rejects :)", () => { + expect(getQueueName("srv-abc")).toBe("deployments__srv-abc"); + expect(getQueueName("srv-xyz")).toBe("deployments__srv-xyz"); + }); + + it("never produces a queue name containing : (BullMQ requirement)", () => { + expect(getQueueName(LOCAL_TARGET)).not.toContain(":"); + expect(getQueueName("srv-with-colons-disallowed")).not.toContain(":"); + }); + + it("uses a stable cross-process cancel channel name", () => { + expect(CANCEL_CHANNEL).toBe("dokploy:deployments:cancel"); + }); + + it("treats null/undefined serverId identically (no NaN keys)", () => { + expect(getTargetKey({ serverId: undefined })).toBe(LOCAL_TARGET); + expect(getTargetKey({ serverId: null as unknown as undefined })).toBe( + LOCAL_TARGET, + ); + }); +}); diff --git a/apps/dokploy/__test__/traefik/server/update-server-config.test.ts b/apps/dokploy/__test__/traefik/server/update-server-config.test.ts index e07f34ade9..6e87a86ff9 100644 --- a/apps/dokploy/__test__/traefik/server/update-server-config.test.ts +++ b/apps/dokploy/__test__/traefik/server/update-server-config.test.ts @@ -65,6 +65,7 @@ const baseSettings: WebServerSettings = { cleanupCacheApplications: false, cleanupCacheOnCompose: false, cleanupCacheOnPreviews: false, + deploymentConcurrency: 1, createdAt: null, updatedAt: new Date(), }; diff --git a/apps/dokploy/components/dashboard/application/deployments/show-deployments.tsx b/apps/dokploy/components/dashboard/application/deployments/show-deployments.tsx index ccf2564b06..b8f3aab4f8 100644 --- a/apps/dokploy/components/dashboard/application/deployments/show-deployments.tsx +++ b/apps/dokploy/components/dashboard/application/deployments/show-deployments.tsx @@ -75,8 +75,6 @@ export const ShowDeployments = ({ }, ); - const { data: isCloud } = api.settings.isCloud.useQuery(); - const { mutateAsync: rollback, isPending: isRollingBack } = api.rollback.rollback.useMutation(); const { mutateAsync: killProcess, isPending: isKillingProcess } = @@ -121,7 +119,7 @@ export const ShowDeployments = ({ // Check for stuck deployment (more than 9 minutes) - only for the most recent deployment const stuckDeployment = useMemo(() => { - if (!isCloud || !deployments || deployments.length === 0) return null; + if (!deployments || deployments.length === 0) return null; const now = Date.now(); const NINE_MINUTES = 10 * 60 * 1000; // 9 minutes in milliseconds @@ -141,7 +139,7 @@ export const ShowDeployments = ({ const elapsed = now - startTime; return elapsed > NINE_MINUTES ? mostRecentDeployment : null; - }, [isCloud, deployments]); + }, [deployments]); useEffect(() => { setUrl(document.location.origin); }, []); diff --git a/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx b/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx index 22b132f166..150636143c 100644 --- a/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx +++ b/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx @@ -71,7 +71,6 @@ export function ShowQueueTable(props: { embedded?: boolean }) { undefined, { refetchInterval: 3000 }, ); - const { data: isCloud } = api.settings.isCloud.useQuery(); const utils = api.useUtils(); const { mutateAsync: cancelApplicationDeployment, @@ -157,8 +156,7 @@ export function ShowQueueTable(props: { embedded?: boolean }) { — )} - {isCloud && - row.state === "active" && + {row.state === "active" && (d?.applicationId != null || d?.composeId != null) && ( + + + + + ); +}; diff --git a/apps/dokploy/components/dashboard/settings/servers/actions/show-dokploy-actions.tsx b/apps/dokploy/components/dashboard/settings/servers/actions/show-dokploy-actions.tsx index 7d63de210d..3937a4a0d2 100644 --- a/apps/dokploy/components/dashboard/settings/servers/actions/show-dokploy-actions.tsx +++ b/apps/dokploy/components/dashboard/settings/servers/actions/show-dokploy-actions.tsx @@ -14,6 +14,7 @@ import { api } from "@/utils/api"; import { ShowModalLogs } from "../../web-server/show-modal-logs"; import { TerminalModal } from "../../web-server/terminal-modal"; import { GPUSupportModal } from "../gpu-support-modal"; +import { ChangeConcurrencyModal } from "./change-concurrency-modal"; export const ShowDokployActions = () => { const { mutateAsync: reloadServer, isPending } = @@ -61,6 +62,7 @@ export const ShowDokployActions = () => { + { + const serverQuery = api.server.one.useQuery( + { serverId }, + { enabled: !!serverId }, + ); + const update = api.server.updateDeploymentConcurrency.useMutation(); + + const currentConcurrency = serverQuery.data?.deploymentConcurrency; + // Only poll while the section is actually mounted; stops the moment a parent unmounts the tab. + const [isMounted, setIsMounted] = useState(false); + useEffect(() => { + setIsMounted(true); + return () => setIsMounted(false); + }, []); + const queueQuery = api.deployment.queueList.useQuery(undefined, { + enabled: isMounted, + refetchInterval: isMounted ? 3000 : false, + }); + const liveCounts = useMemo(() => { + const snapshots = queueQuery.data ?? []; + let active = 0; + let pending = 0; + for (const s of snapshots) { + const target = s.data.serverId ?? null; + if (target !== serverId) continue; + if (s.state === "active") active++; + else pending++; + } + return { active, pending }; + }, [queueQuery.data, serverId]); + + const [value, setValue] = useState(currentConcurrency ?? 1); + + useEffect(() => { + if (typeof currentConcurrency === "number") { + setValue(currentConcurrency); + } + }, [currentConcurrency]); + + const isDirty = + typeof currentConcurrency === "number" && value !== currentConcurrency; + const isValid = Number.isInteger(value) && value >= MIN && value <= MAX; + + const save = async () => { + if (!isValid) { + toast.error(`Concurrency must be an integer between ${MIN} and ${MAX}`); + return; + } + try { + await update.mutateAsync({ + serverId, + deploymentConcurrency: value, + }); + await serverQuery.refetch(); + toast.success("Deployment concurrency updated"); + } catch (error) { + toast.error( + error instanceof Error ? error.message : "Failed to update concurrency", + ); + } + }; + + return ( + + + Deployment Concurrency + + Number of deployments this server may run in parallel. Other servers + are unaffected. + + + +
+ + + Live: {liveCounts.active} active · {liveCounts.pending} queued + +
+ setValue(Number(event.target.value))} + className="max-w-xs" + /> +

+ 1 = one deploy at a time + (safest). 2–{MAX} = run that many + side-by-side; only raise if the host has spare CPU/RAM. +

+ + + + Each concurrent build uses roughly one CPU core and 2 GB of RAM + while active. Saved value applies immediately; in-flight deployments + are not interrupted. + + +
+ +
+
+
+ ); +}; diff --git a/apps/dokploy/components/dashboard/settings/servers/setup-server.tsx b/apps/dokploy/components/dashboard/settings/servers/setup-server.tsx index 570955fc8e..a5ce326505 100644 --- a/apps/dokploy/components/dashboard/settings/servers/setup-server.tsx +++ b/apps/dokploy/components/dashboard/settings/servers/setup-server.tsx @@ -27,6 +27,7 @@ import { cn } from "@/lib/utils"; import { api } from "@/utils/api"; import { ShowDeployment } from "../../application/deployments/show-deployment"; import { type LogLine, parseLogs } from "../../docker/logs/utils"; +import { DeploymentConcurrencySection } from "./deployment-concurrency-section"; import { EditScript } from "./edit-script"; import { GPUSupport } from "./gpu-support"; import { SecurityAudit } from "./security-audit"; @@ -320,6 +321,10 @@ export const SetupServer = ({ serverId, asButton = false }: Props) => { /> + + {!isBuildServer && ( + + )} diff --git a/apps/dokploy/drizzle/0167_concurrent_deployments.sql b/apps/dokploy/drizzle/0167_concurrent_deployments.sql new file mode 100644 index 0000000000..23815932d1 --- /dev/null +++ b/apps/dokploy/drizzle/0167_concurrent_deployments.sql @@ -0,0 +1 @@ +ALTER TABLE "server" ADD COLUMN "deploymentConcurrency" integer DEFAULT 1 NOT NULL; \ No newline at end of file diff --git a/apps/dokploy/drizzle/0168_local_deployment_concurrency.sql b/apps/dokploy/drizzle/0168_local_deployment_concurrency.sql new file mode 100644 index 0000000000..d0839dbda9 --- /dev/null +++ b/apps/dokploy/drizzle/0168_local_deployment_concurrency.sql @@ -0,0 +1 @@ +ALTER TABLE "webServerSettings" ADD COLUMN "deploymentConcurrency" integer DEFAULT 1 NOT NULL; \ No newline at end of file diff --git a/apps/dokploy/drizzle/meta/0167_snapshot.json b/apps/dokploy/drizzle/meta/0167_snapshot.json new file mode 100644 index 0000000000..60b41a8a8d Binary files /dev/null and b/apps/dokploy/drizzle/meta/0167_snapshot.json differ diff --git a/apps/dokploy/drizzle/meta/_journal.json b/apps/dokploy/drizzle/meta/_journal.json index 6fa26fd795..95d2b2ca0c 100644 --- a/apps/dokploy/drizzle/meta/_journal.json +++ b/apps/dokploy/drizzle/meta/_journal.json @@ -1170,6 +1170,20 @@ "when": 1778303519111, "tag": "0166_nosy_slapstick", "breakpoints": true + }, + { + "idx": 167, + "version": "6", + "when": 1746100000000, + "tag": "0167_concurrent_deployments", + "breakpoints": true + }, + { + "idx": 168, + "version": "6", + "when": 1746100100000, + "tag": "0168_local_deployment_concurrency", + "breakpoints": true } ] -} \ No newline at end of file +} diff --git a/apps/dokploy/esbuild.config.ts b/apps/dokploy/esbuild.config.ts index fa053d726c..bee31c5c61 100644 --- a/apps/dokploy/esbuild.config.ts +++ b/apps/dokploy/esbuild.config.ts @@ -3,12 +3,20 @@ import esbuild from "esbuild"; const result = dotenv.config({ path: ".env.production" }); +// Keys whose value must be read from process.env at runtime, not baked in at +// build time. Add a key here when an operator should be able to tune it from +// docker-compose env / systemd unit / shell without rebuilding the bundle. +const RUNTIME_ONLY_ENV_KEYS = new Set([ + "DATABASE_URL", + "DEPLOYMENT_QUEUE_CONCURRENCY", + "DEPLOYMENT_SHUTDOWN_GRACE_MS", +]); + function prepareDefine(config: DotenvParseOutput | undefined) { const define = {}; // @ts-ignore for (const [key, value] of Object.entries(config)) { - // Skip DATABASE_URL to allow runtime environment variable override - if (key === "DATABASE_URL") { + if (RUNTIME_ONLY_ENV_KEYS.has(key)) { continue; } // @ts-ignore diff --git a/apps/dokploy/server/api/routers/application.ts b/apps/dokploy/server/api/routers/application.ts index 228c986567..ea8b53dc40 100644 --- a/apps/dokploy/server/api/routers/application.ts +++ b/apps/dokploy/server/api/routers/application.ts @@ -70,9 +70,9 @@ import { import { deploymentWorker } from "@/server/queues/deployments-queue"; import type { DeploymentJob } from "@/server/queues/queue-types"; import { + cancelDeploymentsByApplication, cleanQueuesByApplication, getJobsByApplicationId, - killDockerBuild, myQueue, } from "@/server/queues/queueSetup"; import { cancelDeployment, deploy } from "@/server/utils/deploy"; @@ -338,8 +338,11 @@ export const applicationRouter = createTRPCRouter({ server: !!application.serverId, }; - if (IS_CLOUD && application.serverId) { + if (application.serverId) { jobData.serverId = application.serverId; + } + + if (IS_CLOUD && application.serverId) { deploy(jobData).catch((error) => { console.error("Background deployment failed:", error); }); @@ -705,8 +708,11 @@ export const applicationRouter = createTRPCRouter({ applicationType: "application", server: !!application.serverId, }; - if (IS_CLOUD && application.serverId) { + if (application.serverId) { jobData.serverId = application.serverId; + } + + if (IS_CLOUD && application.serverId) { deploy(jobData).catch((error) => { console.error("Background deployment failed:", error); }); @@ -765,7 +771,7 @@ export const applicationRouter = createTRPCRouter({ deployment: ["cancel"], }); const application = await findApplicationById(input.applicationId); - await killDockerBuild("application", application.serverId); + await cancelDeploymentsByApplication(input.applicationId); await audit(ctx, { action: "stop", resourceType: "application", @@ -824,8 +830,11 @@ export const applicationRouter = createTRPCRouter({ applicationType: "application", server: !!app.serverId, }; - if (IS_CLOUD && app.serverId) { + if (app.serverId) { jobData.serverId = app.serverId; + } + + if (IS_CLOUD && app.serverId) { deploy(jobData).catch((error) => { console.error("Background deployment failed:", error); }); @@ -929,46 +938,43 @@ export const applicationRouter = createTRPCRouter({ }); const application = await findApplicationById(input.applicationId); - if (IS_CLOUD && application.serverId) { - try { - await updateApplicationStatus(input.applicationId, "idle"); - - if (application.deployments[0]) { - await updateDeploymentStatus( - application.deployments[0].deploymentId, - "done", - ); - } + try { + await updateApplicationStatus(input.applicationId, "idle"); + if (application.deployments[0]) { + await updateDeploymentStatus( + application.deployments[0].deploymentId, + "error", + ); + } + if (IS_CLOUD && application.serverId) { await cancelDeployment({ applicationId: input.applicationId, applicationType: "application", }); - await audit(ctx, { - action: "stop", - resourceType: "application", - resourceId: application.applicationId, - resourceName: application.appName, - }); - return { - success: true, - message: "Deployment cancellation requested", - }; - } catch (error) { - throw new TRPCError({ - code: "INTERNAL_SERVER_ERROR", - message: - error instanceof Error - ? error.message - : "Failed to cancel deployment", - }); + } else { + await cancelDeploymentsByApplication(input.applicationId); } - } - throw new TRPCError({ - code: "BAD_REQUEST", - message: "Deployment cancellation only available in cloud version", - }); + await audit(ctx, { + action: "stop", + resourceType: "application", + resourceId: application.applicationId, + resourceName: application.appName, + }); + return { + success: true, + message: "Deployment cancellation requested", + }; + } catch (error) { + throw new TRPCError({ + code: "INTERNAL_SERVER_ERROR", + message: + error instanceof Error + ? error.message + : "Failed to cancel deployment", + }); + } }), search: protectedProcedure diff --git a/apps/dokploy/server/api/routers/compose.ts b/apps/dokploy/server/api/routers/compose.ts index d1fbb14d3a..a8f1fb98de 100644 --- a/apps/dokploy/server/api/routers/compose.ts +++ b/apps/dokploy/server/api/routers/compose.ts @@ -71,9 +71,9 @@ import { import { deploymentWorker } from "@/server/queues/deployments-queue"; import type { DeploymentJob } from "@/server/queues/queue-types"; import { + cancelDeploymentsByCompose, cleanQueuesByCompose, getJobsByComposeId, - killDockerBuild, myQueue, } from "@/server/queues/queueSetup"; import { cancelDeployment, deploy } from "@/server/utils/deploy"; @@ -309,8 +309,8 @@ export const composeRouter = createTRPCRouter({ await checkServicePermissionAndAccess(ctx, input.composeId, { deployment: ["cancel"], }); - const compose = await findComposeById(input.composeId); - await killDockerBuild("compose", compose.serverId); + await findComposeById(input.composeId); + await cancelDeploymentsByCompose(input.composeId); }), loadServices: protectedProcedure @@ -430,8 +430,11 @@ export const composeRouter = createTRPCRouter({ server: !!compose.serverId, }; - if (IS_CLOUD && compose.serverId) { + if (compose.serverId) { jobData.serverId = compose.serverId; + } + + if (IS_CLOUD && compose.serverId) { deploy(jobData).catch((error) => { console.error("Background deployment failed:", error); }); @@ -478,8 +481,11 @@ export const composeRouter = createTRPCRouter({ descriptionLog: input.description || "", server: !!compose.serverId, }; - if (IS_CLOUD && compose.serverId) { + if (compose.serverId) { jobData.serverId = compose.serverId; + } + + if (IS_CLOUD && compose.serverId) { deploy(jobData).catch((error) => { console.error("Background deployment failed:", error); }); @@ -1052,49 +1058,45 @@ export const composeRouter = createTRPCRouter({ }); const compose = await findComposeById(input.composeId); - if (IS_CLOUD && compose.serverId) { - try { - await updateCompose(input.composeId, { - composeStatus: "idle", - }); - - if (compose.deployments[0]) { - await updateDeploymentStatus( - compose.deployments[0].deploymentId, - "done", - ); - } + try { + await updateCompose(input.composeId, { + composeStatus: "idle", + }); + if (compose.deployments[0]) { + await updateDeploymentStatus( + compose.deployments[0].deploymentId, + "error", + ); + } + if (IS_CLOUD && compose.serverId) { await cancelDeployment({ composeId: input.composeId, applicationType: "compose", }); - - await audit(ctx, { - action: "stop", - resourceType: "compose", - resourceId: input.composeId, - resourceName: compose.name, - }); - return { - success: true, - message: "Deployment cancellation requested", - }; - } catch (error) { - throw new TRPCError({ - code: "INTERNAL_SERVER_ERROR", - message: - error instanceof Error - ? error.message - : "Failed to cancel deployment", - }); + } else { + await cancelDeploymentsByCompose(input.composeId); } - } - throw new TRPCError({ - code: "BAD_REQUEST", - message: "Deployment cancellation only available in cloud version", - }); + await audit(ctx, { + action: "stop", + resourceType: "compose", + resourceId: input.composeId, + resourceName: compose.name, + }); + return { + success: true, + message: "Deployment cancellation requested", + }; + } catch (error) { + throw new TRPCError({ + code: "INTERNAL_SERVER_ERROR", + message: + error instanceof Error + ? error.message + : "Failed to cancel deployment", + }); + } }), search: protectedProcedure diff --git a/apps/dokploy/server/api/routers/preview-deployment.ts b/apps/dokploy/server/api/routers/preview-deployment.ts index a45ef80c53..fbe7a03a06 100644 --- a/apps/dokploy/server/api/routers/preview-deployment.ts +++ b/apps/dokploy/server/api/routers/preview-deployment.ts @@ -88,8 +88,11 @@ export const previewDeploymentRouter = createTRPCRouter({ server: !!application.serverId, }; - if (IS_CLOUD && application.serverId) { + if (application.serverId) { jobData.serverId = application.serverId; + } + + if (IS_CLOUD && application.serverId) { deploy(jobData).catch((error) => { console.error("Background deployment failed:", error); }); diff --git a/apps/dokploy/server/api/routers/server.ts b/apps/dokploy/server/api/routers/server.ts index 310363efdd..409ed93b7a 100644 --- a/apps/dokploy/server/api/routers/server.ts +++ b/apps/dokploy/server/api/routers/server.ts @@ -34,6 +34,7 @@ import { apiFindOneServer, apiRemoveServer, apiUpdateServer, + apiUpdateServerDeploymentConcurrency, apiUpdateServerMonitoring, applications, compose, @@ -45,6 +46,7 @@ import { redis, server, } from "@/server/db/schema"; +import { deploymentQueueManager } from "@/server/queues/queueSetup"; export const serverRouter = createTRPCRouter({ create: withPermission("server", "create") @@ -467,6 +469,45 @@ export const serverRouter = createTRPCRouter({ throw error; } }), + updateDeploymentConcurrency: withPermission("server", "create") + .input(apiUpdateServerDeploymentConcurrency) + .mutation(async ({ input, ctx }) => { + const target = await findServerById(input.serverId); + if (target.organizationId !== ctx.session.activeOrganizationId) { + throw new TRPCError({ + code: "UNAUTHORIZED", + message: "You are not authorized to update this server", + }); + } + + const updated = await updateServerById(input.serverId, { + deploymentConcurrency: input.deploymentConcurrency, + }); + + // Apply immediately to the in-process queue. In-flight jobs keep + // running; pending jobs stay queued. + if (!IS_CLOUD) { + try { + await deploymentQueueManager.updateConcurrency( + input.serverId, + input.deploymentConcurrency, + ); + } catch (error) { + console.error( + "Failed to propagate concurrency change to queue manager", + error, + ); + } + } + + await audit(ctx, { + action: "update", + resourceType: "server", + resourceId: input.serverId, + resourceName: target.name, + }); + return updated; + }), publicIp: protectedProcedure.query(async () => { if (IS_CLOUD) { return ""; diff --git a/apps/dokploy/server/api/routers/settings.ts b/apps/dokploy/server/api/routers/settings.ts index 9965643f72..0d1b37ca22 100644 --- a/apps/dokploy/server/api/routers/settings.ts +++ b/apps/dokploy/server/api/routers/settings.ts @@ -67,10 +67,15 @@ import { apiServerSchema, apiTraefikConfig, apiUpdateDockerCleanup, + apiUpdateWebServerDeploymentConcurrency, projects, server, } from "@/server/db/schema"; -import { cleanAllDeploymentQueue } from "@/server/queues/queueSetup"; +import { + cleanAllDeploymentQueue, + deploymentQueueManager, + LOCAL_TARGET, +} from "@/server/queues/queueSetup"; import { removeJob, schedule } from "@/server/utils/backup"; import packageInfo from "../../../package.json"; import { appRouter } from "../root"; @@ -360,6 +365,39 @@ export const settingsRouter = createTRPCRouter({ }); return true; }), + /** + * Local counterpart of `server.updateDeploymentConcurrency`: sets how many + * deployments can run in parallel on the Dokploy host itself. Persisted + * on `webServerSettings` and applied to the in-process queue on the same + * tick — no rebuild or service restart needed. + */ + updateDeploymentConcurrency: adminProcedure + .input(apiUpdateWebServerDeploymentConcurrency) + .mutation(async ({ input, ctx }) => { + if (IS_CLOUD) { + return true; + } + await updateWebServerSettings({ + deploymentConcurrency: input.deploymentConcurrency, + }); + try { + await deploymentQueueManager.updateConcurrency( + LOCAL_TARGET, + input.deploymentConcurrency, + ); + } catch (error) { + console.error( + "Failed to propagate local concurrency change to queue manager", + error, + ); + } + await audit(ctx, { + action: "update", + resourceType: "settings", + resourceName: "deployment-concurrency", + }); + return true; + }), updateDockerCleanup: adminProcedure .input(apiUpdateDockerCleanup) .mutation(async ({ input, ctx }) => { diff --git a/apps/dokploy/server/queues/deployments-queue.ts b/apps/dokploy/server/queues/deployments-queue.ts index e92f4c1921..dd68d8821b 100644 --- a/apps/dokploy/server/queues/deployments-queue.ts +++ b/apps/dokploy/server/queues/deployments-queue.ts @@ -10,87 +10,319 @@ import { updateCompose, updatePreviewDeployment, } from "@dokploy/server"; +import { + dokployJobContext, + killJobProcesses, +} from "@dokploy/server/utils/process/job-context"; import { type Job, Worker } from "bullmq"; +import { + CANCEL_CHANNEL, + getQueueName, + getTargetKey, + LOCAL_TARGET, +} from "./queue-routing"; import type { DeploymentJob } from "./queue-types"; import { redisConfig } from "./redis-connection"; -const createDeploymentWorker = () => - new Worker( - "deployments", - async (job: Job) => { - try { - if (job.data.applicationType === "application") { - await updateApplicationStatus(job.data.applicationId, "running"); +export { CANCEL_CHANNEL, getQueueName, getTargetKey, LOCAL_TARGET }; + +const MAX_CONCURRENCY = 10; +const DEFAULT_CONCURRENCY = 1; + +const clampConcurrency = (n: number): number => + Math.max(1, Math.min(Math.floor(n) || DEFAULT_CONCURRENCY, MAX_CONCURRENCY)); + +// Pin queue runtime state to globalThis. Without this, Next.js loading this +// module from both the workspace package and its own runtime produces two +// `workers` Maps, each spinning up its own BullMQ Worker for the same queue. +// Effective concurrency then becomes (concurrency × moduleCopies), and cancel +// can target a job tracked in the other copy's `inflight`. Same reasoning as +// `job-context.ts`'s child-registry pinning. +const QUEUE_STATE_KEY = Symbol.for("dokploy.deploymentQueue.state"); +type QueueState = { + workers: Map>; + inflight: Map; + cancelSubscriber: { quit: () => Promise } | null; + started: boolean; +}; +type GlobalShared = typeof globalThis & { + [QUEUE_STATE_KEY]?: QueueState; +}; +const g = globalThis as GlobalShared; +const queueState: QueueState = + g[QUEUE_STATE_KEY] ?? + (g[QUEUE_STATE_KEY] = { + workers: new Map>(), + inflight: new Map(), + cancelSubscriber: null, + started: false, + }); +const workers = queueState.workers; +const inflight = queueState.inflight; + +const killJobOnAbort = (jobId: string): void => { + if (!jobId) return; + try { + killJobProcesses(jobId); + } catch (error) { + console.error("[deployments] failed to kill job", jobId, error); + } +}; + +const handleJob = async ( + job: Job, + signal: AbortSignal, +): Promise => { + const data = job.data; + const jobId = job.id ?? ""; + const serverId = data.serverId ?? null; + + // On abort, kill *only* this job's process tree — never a global + // `pkill -f "docker build"` (which would kill unrelated concurrent + // builds on the same host). + const onAbort = () => { + killJobOnAbort(jobId); + }; + signal.addEventListener("abort", onAbort, { once: true }); - if (job.data.type === "redeploy") { - await rebuildApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } else if (job.data.type === "deploy") { - await deployApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } - } else if (job.data.applicationType === "compose") { - await updateCompose(job.data.composeId, { - composeStatus: "running", + try { + await dokployJobContext.run({ jobId, serverId }, async () => { + if (data.applicationType === "application") { + await updateApplicationStatus(data.applicationId, "running"); + if (data.type === "redeploy") { + await rebuildApplication({ + applicationId: data.applicationId, + titleLog: data.titleLog, + descriptionLog: data.descriptionLog, }); - if (job.data.type === "deploy") { - await deployCompose({ - composeId: job.data.composeId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } else if (job.data.type === "redeploy") { - await rebuildCompose({ - composeId: job.data.composeId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } - } else if (job.data.applicationType === "application-preview") { - await updatePreviewDeployment(job.data.previewDeploymentId, { - previewStatus: "running", + } else if (data.type === "deploy") { + await deployApplication({ + applicationId: data.applicationId, + titleLog: data.titleLog, + descriptionLog: data.descriptionLog, + }); + } + } else if (data.applicationType === "compose") { + await updateCompose(data.composeId, { composeStatus: "running" }); + if (data.type === "deploy") { + await deployCompose({ + composeId: data.composeId, + titleLog: data.titleLog, + descriptionLog: data.descriptionLog, + }); + } else if (data.type === "redeploy") { + await rebuildCompose({ + composeId: data.composeId, + titleLog: data.titleLog, + descriptionLog: data.descriptionLog, + }); + } + } else if (data.applicationType === "application-preview") { + await updatePreviewDeployment(data.previewDeploymentId, { + previewStatus: "running", + }); + if (data.type === "redeploy") { + await rebuildPreviewApplication({ + applicationId: data.applicationId, + titleLog: data.titleLog, + descriptionLog: data.descriptionLog, + previewDeploymentId: data.previewDeploymentId, + }); + } else if (data.type === "deploy") { + await deployPreviewApplication({ + applicationId: data.applicationId, + titleLog: data.titleLog, + descriptionLog: data.descriptionLog, + previewDeploymentId: data.previewDeploymentId, }); - - if (job.data.type === "redeploy") { - await rebuildPreviewApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - previewDeploymentId: job.data.previewDeploymentId, - }); - } else if (job.data.type === "deploy") { - await deployPreviewApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - previewDeploymentId: job.data.previewDeploymentId, - }); - } } - } catch (error) { - console.log("Error", error); + } + }); + } catch (error) { + if (signal.aborted) { + try { + if (data.applicationType === "application") { + await updateApplicationStatus(data.applicationId, "error"); + } else if (data.applicationType === "compose") { + await updateCompose(data.composeId, { composeStatus: "error" }); + } else if (data.applicationType === "application-preview") { + await updatePreviewDeployment(data.previewDeploymentId, { + previewStatus: "error", + }); + } + } catch {} + throw new Error("Deployment aborted by user"); + } + throw error; + } finally { + signal.removeEventListener("abort", onAbort); + } + if (signal.aborted) { + throw new Error("Deployment aborted by user"); + } +}; + +const createWorker = ( + targetKey: string, + concurrency: number, +): Worker => { + const worker = new Worker( + getQueueName(targetKey), + async (job) => { + const ac = new AbortController(); + const key = job.id ?? `${job.queueQualifiedName}:${job.timestamp}`; + inflight.set(key, ac); + try { + await handleJob(job, ac.signal); + } finally { + inflight.delete(key); } }, { autorun: false, + concurrency: clampConcurrency(concurrency), connection: redisConfig, }, ); + worker.on("error", (err) => { + if ((err as { code?: string })?.code === "ECONNREFUSED") { + console.error( + "Make sure you have installed Redis and it is running.", + err, + ); + } + }); + workers.set(targetKey, worker); + return worker; +}; + +/** + * BullMQ's `Worker.run()` only resolves when the worker is closed (it awaits + * the internal main loop). We can't `await` it during boot or every subsequent + * step (bootstrap, shutdown handlers) hangs forever. Fire-and-forget here. + */ +const startIfStopped = (w: Worker): void => { + const running = (w as { isRunning?: () => boolean }).isRunning?.(); + if (running) return; + void w.run().catch((err) => { + console.error("[deployments] worker.run() error", err); + }); +}; + +const ensureWorker = ( + targetKey: string, + concurrency: number = DEFAULT_CONCURRENCY, +): Worker => { + const existing = workers.get(targetKey); + if (existing) return existing; + const worker = createWorker(targetKey, concurrency); + if (queueState.started) startIfStopped(worker); + return worker; +}; + +const startCancelSubscriber = async (): Promise => { + if (queueState.cancelSubscriber) return; + const seedWorker = workers.get(LOCAL_TARGET) ?? createWorker(LOCAL_TARGET, 1); + const client = await seedWorker.client; + const sub = ( + client as unknown as { duplicate: () => unknown } + ).duplicate() as { + subscribe: (channel: string) => Promise; + on: ( + event: "message", + cb: (channel: string, payload: string) => void, + ) => void; + quit: () => Promise; + }; + await sub.subscribe(CANCEL_CHANNEL); + sub.on("message", (_chan, payload) => { + try { + const { jobId } = JSON.parse(payload) as { jobId: string }; + const ac = inflight.get(jobId); + if (ac) ac.abort(); + } catch (err) { + console.error("Cancel subscriber: bad payload", err); + } + }); + queueState.cancelSubscriber = sub; +}; + +const realDeploymentWorker = { + run: async (): Promise => { + queueState.started = true; + ensureWorker(LOCAL_TARGET); + for (const w of workers.values()) startIfStopped(w); + void startCancelSubscriber().catch((err) => { + console.error("[deployments] cancel subscriber failed to start", err); + }); + }, + close: async (_reason?: string): Promise => { + queueState.started = false; + // Stop pulling new jobs and drain in-flight ones gracefully. BullMQ's + // `worker.close()` waits for active handlers to resolve. We deliberately + // do NOT call `ac.abort()` here — that would kill every running deploy + // on a routine Dokploy upgrade (swarm rolling restart sends SIGTERM, + // which calls this path). If the orchestrator follows up with SIGKILL + // after its grace period, the OS handles termination; the spawned + // builds run with `detached:true` so they have their own process group + // and can complete on the host even if our process is gone. + await Promise.all(Array.from(workers.values()).map((w) => w.close())); + workers.clear(); + if (queueState.cancelSubscriber) { + await queueState.cancelSubscriber.quit().catch(() => {}); + queueState.cancelSubscriber = null; + } + }, + cancelJob: async (jobId: string, _reason?: string): Promise => { + const ac = inflight.get(jobId); + if (ac) ac.abort(); + }, + cancelAllJobs: async (_reason?: string): Promise => { + for (const ac of inflight.values()) ac.abort(); + }, + /** + * Live-update a worker's concurrency. The new value applies to the next + * job-fetch decision: BullMQ honours `worker.concurrency` mutation on its + * main loop, so raising the limit lets queued jobs become active + * immediately, and lowering it stops new fetches until in-flight jobs + * drain. **In-flight jobs are NOT interrupted** by this call — they keep + * running to completion. To stop a running job, use `cancelJob`. + */ + setConcurrency: (targetKey: string, concurrency: number): void => { + const safe = clampConcurrency(concurrency); + const worker = ensureWorker(targetKey, safe); + worker.concurrency = safe; + }, + ensureWorker: ( + targetKey: string, + concurrency: number = DEFAULT_CONCURRENCY, + ): void => { + ensureWorker(targetKey, concurrency); + }, + removeWorker: async (targetKey: string): Promise => { + const w = workers.get(targetKey); + if (!w) return; + workers.delete(targetKey); + try { + await w.close(); + } catch (err) { + console.error("[deployments] worker.close error", targetKey, err); + } + }, + hasInflight: (jobId: string): boolean => inflight.has(jobId), +}; -/** No-op worker when Redis is disabled (e.g. IS_CLOUD). Avoids BullMQ connection errors. */ -const noopWorker = { +const noopDeploymentWorker = { run: () => Promise.resolve(), - close: () => Promise.resolve(), - cancelJob: () => Promise.resolve(), - cancelAllJobs: () => Promise.resolve(), + close: (_reason?: string) => Promise.resolve(), + cancelJob: (_jobId: string, _reason?: string) => Promise.resolve(), + cancelAllJobs: (_reason?: string) => Promise.resolve(), + setConcurrency: (_targetKey: string, _concurrency: number) => {}, + ensureWorker: (_targetKey: string, _concurrency?: number) => {}, + removeWorker: (_targetKey: string) => Promise.resolve(), + hasInflight: (_jobId: string) => false, }; export const deploymentWorker = !IS_CLOUD - ? createDeploymentWorker() - : (noopWorker as unknown as Worker); + ? realDeploymentWorker + : noopDeploymentWorker; diff --git a/apps/dokploy/server/queues/queue-routing.ts b/apps/dokploy/server/queues/queue-routing.ts new file mode 100644 index 0000000000..0ee047666f --- /dev/null +++ b/apps/dokploy/server/queues/queue-routing.ts @@ -0,0 +1,16 @@ +import type { DeploymentJob } from "./queue-types"; + +export const LOCAL_TARGET = "__local__"; +export const CANCEL_CHANNEL = "dokploy:deployments:cancel"; + +export const getTargetKey = (data: Pick): string => + data.serverId ?? LOCAL_TARGET; + +/** + * LOCAL_TARGET keeps canary's queue name `deployments` so an upgrade from a + * BullMQ-only canary doesn't strand pending jobs in an orphaned queue. + * Per-server queues use `__` as the separator because BullMQ rejects queue + * names containing `:`. + */ +export const getQueueName = (targetKey: string): string => + targetKey === LOCAL_TARGET ? "deployments" : `deployments__${targetKey}`; diff --git a/apps/dokploy/server/queues/queueSetup.ts b/apps/dokploy/server/queues/queueSetup.ts index 75d59e0795..a1a3761e6f 100644 --- a/apps/dokploy/server/queues/queueSetup.ts +++ b/apps/dokploy/server/queues/queueSetup.ts @@ -1,104 +1,317 @@ -import { IS_CLOUD } from "@dokploy/server"; import { - execAsync, - execAsyncRemote, -} from "@dokploy/server/utils/process/execAsync"; -import type { Job } from "bullmq"; -import { Queue } from "bullmq"; -import { deploymentWorker } from "./deployments-queue"; + findServerById, + getAllServers, + getWebServerSettings, + IS_CLOUD, +} from "@dokploy/server"; +import { type Job, type JobsOptions, type JobType, Queue } from "bullmq"; +import { + CANCEL_CHANNEL, + deploymentWorker, + getQueueName, + getTargetKey, + LOCAL_TARGET, +} from "./deployments-queue"; +import type { DeploymentJob } from "./queue-types"; import { redisConfig } from "./redis-connection"; -/** No-op queue when Redis is disabled (e.g. IS_CLOUD). Avoids BullMQ connection errors. */ -const createNoopQueue = () => ({ - getJobs: () => Promise.resolve([] as Job[]), - add: () => - Promise.resolve({ id: "noop", remove: () => Promise.resolve() } as Job), - close: () => Promise.resolve(), - on: () => {}, -}); +export { LOCAL_TARGET, getTargetKey }; -const myQueue = !IS_CLOUD - ? new Queue("deployments", { connection: redisConfig }) - : (createNoopQueue() as unknown as Queue); +// Pin to globalThis: see deployments-queue.ts QUEUE_STATE_KEY for rationale. +// Without pinning, Next.js loading this module twice produces two `queues` +// Maps + double the Redis connections per target. +const QUEUES_KEY = Symbol.for("dokploy.deploymentQueue.queues"); +const queues: Map> = ( + globalThis as { [QUEUES_KEY]?: Map> } +)[QUEUES_KEY] ?? +((globalThis as { [QUEUES_KEY]?: Map> })[ + QUEUES_KEY +] = new Map>()); -export const getJobsByApplicationId = async (applicationId: string) => { - const jobs = await myQueue.getJobs(); - return jobs.filter((job) => job?.data?.applicationId === applicationId); +const createNoopQueue = () => + ({ + getJobs: () => Promise.resolve([] as Job[]), + getJob: () => Promise.resolve(undefined), + add: () => + Promise.resolve({ id: "noop", remove: () => Promise.resolve() } as Job), + client: Promise.resolve(undefined as unknown as never), + close: () => Promise.resolve(), + on: () => {}, + }) as unknown as Queue; + +const getOrCreateQueue = (targetKey: string): Queue => { + let q = queues.get(targetKey); + if (q) return q; + if (IS_CLOUD) { + q = createNoopQueue(); + } else { + q = new Queue(getQueueName(targetKey), { + connection: redisConfig, + }); + q.on("error", (err) => { + if ((err as { code?: string })?.code === "ECONNREFUSED") { + console.error( + "Make sure you have installed Redis and it is running.", + err, + ); + } + }); + } + queues.set(targetKey, q); + return q; }; -export const getJobsByComposeId = async (composeId: string) => { - const jobs = await myQueue.getJobs(); - return jobs.filter((job) => job?.data?.composeId === composeId); +const getAllQueues = (): Queue[] => Array.from(queues.values()); + +const aggregateJobs = async (states?: JobType[]): Promise => { + const all: Job[] = []; + for (const q of getAllQueues()) { + all.push(...((await q.getJobs(states)) as Job[])); + } + return all; +}; + +/** + * Look up the persisted deploymentConcurrency for a target. Falls back to 1 + * when the row is missing (e.g. a server was deleted between enqueue and + * worker spin-up). Errors are swallowed — concurrency is a tuning knob, not + * a correctness boundary. + */ +const resolveConcurrencyForTarget = async ( + targetKey: string, +): Promise => { + if (IS_CLOUD) return 1; + try { + if (targetKey === LOCAL_TARGET) { + const settings = await getWebServerSettings(); + return settings?.deploymentConcurrency ?? 1; + } + const server = await findServerById(targetKey); + return server?.deploymentConcurrency ?? 1; + } catch { + return 1; + } +}; + +const ensureWorkerForTarget = async (targetKey: string): Promise => { + if (IS_CLOUD) return; + const concurrency = await resolveConcurrencyForTarget(targetKey); + deploymentWorker.ensureWorker(targetKey, concurrency); + deploymentWorker.setConcurrency(targetKey, concurrency); }; -if (!IS_CLOUD) { - process.on("SIGTERM", () => { - myQueue.close(); - process.exit(0); - }); +/** + * Legacy facade preserved for webhook callers (pages/api/deploy/*) that still + * `myQueue.add(...)`. Routes by `serverId` to the correct per-target queue; + * `getJobs` aggregates across all known per-target queues. + */ +export const myQueue = { + add: async (name: string, data: DeploymentJob, opts?: JobsOptions) => { + const targetKey = getTargetKey(data); + const q = getOrCreateQueue(targetKey); + await ensureWorkerForTarget(targetKey); + return q.add(name, data, opts); + }, + getJobs: (states?: JobType[]) => aggregateJobs(states), + close: async () => { + await Promise.all(getAllQueues().map((q) => q.close())); + queues.clear(); + }, + on: () => {}, +} as unknown as Queue; + +const publishCancel = async ( + jobId: string, + targetKey: string, +): Promise => { + const q = getOrCreateQueue(targetKey); + const client = await (q as unknown as { client: Promise }).client; + const publisher = client as { + publish: (channel: string, payload: string) => Promise; + }; + await publisher.publish(CANCEL_CHANNEL, JSON.stringify({ jobId, targetKey })); +}; - myQueue.on("error", (error) => { - if ((error as any).code === "ECONNREFUSED") { +export const deploymentQueueManager = { + enqueue: async (data: DeploymentJob, opts?: JobsOptions) => { + const targetKey = getTargetKey(data); + const q = getOrCreateQueue(targetKey); + await ensureWorkerForTarget(targetKey); + return q.add("deploy", data, opts); + }, + updateConcurrency: async ( + targetKey: string, + concurrency: number, + ): Promise => { + if (IS_CLOUD) return; + deploymentWorker.setConcurrency(targetKey, concurrency); + }, + cancel: async ( + jobId: string, + targetKey: string, + ): Promise<{ canceled: boolean; wasActive: boolean }> => { + if (IS_CLOUD) return { canceled: false, wasActive: false }; + const q = queues.get(targetKey); + if (q) { + const job = await q.getJob(jobId); + if (job) { + const state = await job.getState(); + if (state === "waiting" || state === "delayed") { + await job.remove(); + return { canceled: true, wasActive: false }; + } + } + } + await deploymentWorker.cancelJob(jobId); + await publishCancel(jobId, targetKey); + return { canceled: true, wasActive: true }; + }, + /** + * Drop the in-process Worker + Queue for `targetKey` (called when a + * server is deleted). Cancels any active/pending jobs so cleanup work + * stops, then closes and removes the BullMQ handles. Without this the + * Worker keeps running forever pointed at an orphaned Redis queue. + */ + removeTarget: async (targetKey: string): Promise => { + if (IS_CLOUD) return; + if (targetKey === LOCAL_TARGET) return; + const q = queues.get(targetKey); + if (q) { + try { + const jobs = await q.getJobs([ + "active", + "waiting", + "delayed", + "prioritized", + ]); + for (const job of jobs) { + if (!job?.id) continue; + try { + await deploymentQueueManager.cancel(job.id, targetKey); + } catch {} + } + await q.close(); + } catch (error) { + console.error( + "[deployments] failed to drain queue for removed target", + targetKey, + error, + ); + } + queues.delete(targetKey); + } + try { + await deploymentWorker.removeWorker(targetKey); + } catch (error) { console.error( - "Make sure you have installed Redis and it is running.", + "[deployments] failed to close worker for removed target", + targetKey, error, ); } - }); -} + }, + /** + * Pre-warm queues + workers at boot. LOCAL is always warmed. Each server + * is also warmed so that any jobs persisted in `deployments:` + * Redis queues from a previous process get picked up by their target's + * worker — without this, per-server queues stay orphaned until the user + * triggers a fresh deploy on that server. + */ + bootstrap: async (): Promise => { + if (IS_CLOUD) return; + getOrCreateQueue(LOCAL_TARGET); + await ensureWorkerForTarget(LOCAL_TARGET); + try { + const servers = await getAllServers(); + for (const s of servers) { + if (!s?.serverId) continue; + getOrCreateQueue(s.serverId); + await ensureWorkerForTarget(s.serverId); + } + console.log( + `Deployment queue ready (LOCAL + ${servers.length} server queue(s))`, + ); + } catch (error) { + console.error("Failed to pre-warm per-server deployment workers", error); + } + }, +}; -export const cleanQueuesByApplication = async (applicationId: string) => { - const jobs = await myQueue.getJobs(["waiting", "delayed"]); +export const getJobsByApplicationId = async (applicationId: string) => { + const jobs = await aggregateJobs(); + return jobs.filter((job) => job?.data?.applicationId === applicationId); +}; + +export const getJobsByComposeId = async (composeId: string) => { + const jobs = await aggregateJobs(); + return jobs.filter((job) => job?.data?.composeId === composeId); +}; +export const cleanQueuesByApplication = async (applicationId: string) => { + const jobs = await aggregateJobs(["waiting", "delayed"]); for (const job of jobs) { if (job?.data?.applicationId === applicationId) { await job.remove(); - console.log(`Removed job ${job.id} for application ${applicationId}`); } } }; -export const cleanAllDeploymentQueue = async () => { - deploymentWorker.cancelAllJobs("User requested cancellation"); - return true; -}; - export const cleanQueuesByCompose = async (composeId: string) => { - const jobs = await myQueue.getJobs(["waiting", "delayed"]); - + const jobs = await aggregateJobs(["waiting", "delayed"]); for (const job of jobs) { if (job?.data?.composeId === composeId) { await job.remove(); - console.log(`Removed job ${job.id} for compose ${composeId}`); } } }; -export const killDockerBuild = async ( - type: "application" | "compose", - serverId: string | null, -) => { - try { - if (type === "application") { - const command = `pkill -2 -f "docker build"`; - - if (serverId) { - await execAsyncRemote(serverId, command); - } else { - await execAsync(command); - } - } else if (type === "compose") { - const command = `pkill -2 -f "docker compose"`; +/** + * Self-hosted cancel: abort any in-flight job for this application AND drop + * pending ones. The worker's abort listener tears down only this job's + * process group (LOCAL) or SSH session (REMOTE), so concurrent builds for + * other resources on the same host are untouched. + */ +export const cancelDeploymentsByApplication = async (applicationId: string) => { + const jobs = await aggregateJobs([ + "active", + "waiting", + "delayed", + "prioritized", + ]); + let canceled = 0; + for (const job of jobs) { + if (job?.data?.applicationId !== applicationId) continue; + if (!job.id) continue; + const targetKey = getTargetKey(job.data); + await deploymentQueueManager.cancel(job.id, targetKey); + canceled++; + } + return canceled; +}; - if (serverId) { - await execAsyncRemote(serverId, command); - } else { - await execAsync(command); - } - } - } catch (error) { - console.error(error); +export const cancelDeploymentsByCompose = async (composeId: string) => { + const jobs = await aggregateJobs([ + "active", + "waiting", + "delayed", + "prioritized", + ]); + let canceled = 0; + for (const job of jobs) { + if (job?.data?.composeId !== composeId) continue; + if (!job.id) continue; + const targetKey = getTargetKey(job.data); + await deploymentQueueManager.cancel(job.id, targetKey); + canceled++; } + return canceled; }; -export { myQueue }; +export const cleanAllDeploymentQueue = async () => { + await deploymentWorker.cancelAllJobs("User requested cancellation"); + for (const q of getAllQueues()) { + const jobs = await q.getJobs(["waiting", "delayed"]); + for (const job of jobs) await job.remove(); + } + return true; +}; diff --git a/apps/dokploy/server/server.ts b/apps/dokploy/server/server.ts index 4de4d76897..efa60e1754 100644 --- a/apps/dokploy/server/server.ts +++ b/apps/dokploy/server/server.ts @@ -16,6 +16,7 @@ import { import { config } from "dotenv"; import next from "next"; import packageInfo from "../package.json"; +import { deploymentWorker } from "./queues/deployments-queue"; import { setupDockerContainerLogsWebSocketServer } from "./wss/docker-container-logs"; import { setupDockerContainerTerminalWebSocketServer } from "./wss/docker-container-terminal"; import { setupDockerStatsMonitoringSocketServer } from "./wss/docker-stats"; @@ -71,8 +72,51 @@ void app.prepare().then(async () => { if (!IS_CLOUD) { console.log("Starting Deployment Worker"); - const { deploymentWorker } = await import("./queues/deployments-queue"); await deploymentWorker.run(); + const { deploymentQueueManager } = await import("./queues/queueSetup"); + await deploymentQueueManager.bootstrap(); + + // Graceful shutdown: abort in-flight deployments so no rows are left + // stuck in "running". `initCancelDeployments` sweeps on the *next* + // boot — this closes the current process's window cleanly. + // + // We bound the drain to DEPLOYMENT_SHUTDOWN_GRACE_MS so a build that + // ignores its abort signal can't prevent the process from exiting + // (systemd / Docker will SIGKILL us after their own grace window; + // we want to exit before that and let initCancelDeployments pick up + // anything still marked running). + const DRAIN_GRACE_MS = Number.parseInt( + process.env.DEPLOYMENT_SHUTDOWN_GRACE_MS ?? "8000", + 10, + ); + const shutdown = async (signal: string) => { + console.log(`Received ${signal}, draining deployment queue…`); + try { + await Promise.race([ + deploymentWorker.close(`${signal} received`), + new Promise((_, reject) => + setTimeout( + () => reject(new Error("drain timeout")), + Number.isFinite(DRAIN_GRACE_MS) + ? Math.max(1000, DRAIN_GRACE_MS) + : 8000, + ).unref?.(), + ), + ]); + } catch (error) { + console.error( + "Deployment drain exceeded grace window or failed; exiting anyway.", + error, + ); + } + process.exit(0); + }; + process.once("SIGTERM", () => { + void shutdown("SIGTERM"); + }); + process.once("SIGINT", () => { + void shutdown("SIGINT"); + }); } } catch (e) { console.error("Main Server Error", e); diff --git a/packages/server/src/db/schema/server.ts b/packages/server/src/db/schema/server.ts index 4c8f1fc948..7ad54b9b09 100644 --- a/packages/server/src/db/schema/server.ts +++ b/packages/server/src/db/schema/server.ts @@ -41,6 +41,7 @@ export const server = pgTable("server", { .notNull() .$defaultFn(() => generateAppName("server")), enableDockerCleanup: boolean("enableDockerCleanup").notNull().default(false), + deploymentConcurrency: integer("deploymentConcurrency").notNull().default(1), createdAt: text("createdAt").notNull(), organizationId: text("organizationId") .notNull() @@ -176,6 +177,11 @@ export const apiUpdateServer = createSchema command: z.string().optional(), }); +export const apiUpdateServerDeploymentConcurrency = z.object({ + serverId: z.string().min(1), + deploymentConcurrency: z.number().int().min(1).max(10), +}); + export const apiUpdateServerMonitoring = createSchema .pick({ serverId: true, diff --git a/packages/server/src/db/schema/web-server-settings.ts b/packages/server/src/db/schema/web-server-settings.ts index f36f75660a..db880eba78 100644 --- a/packages/server/src/db/schema/web-server-settings.ts +++ b/packages/server/src/db/schema/web-server-settings.ts @@ -1,5 +1,12 @@ import { relations } from "drizzle-orm"; -import { boolean, jsonb, pgTable, text, timestamp } from "drizzle-orm/pg-core"; +import { + boolean, + integer, + jsonb, + pgTable, + text, + timestamp, +} from "drizzle-orm/pg-core"; import { createInsertSchema } from "drizzle-zod"; import { nanoid } from "nanoid"; import { z } from "zod"; @@ -106,6 +113,11 @@ export const webServerSettings = pgTable("webServerSettings", { cleanupCacheOnCompose: boolean("cleanupCacheOnCompose") .notNull() .default(false), + // How many deployments can run in parallel on the local Dokploy host. + // Kept in the DB so operators can tune it from the Web Server settings + // UI without a rebuild or service restart. The DEPLOYMENT_QUEUE_CONCURRENCY + // env var still acts as a cold-boot default on a fresh install. + deploymentConcurrency: integer("deploymentConcurrency").notNull().default(1), createdAt: timestamp("created_at").defaultNow(), updatedAt: timestamp("updated_at").notNull().defaultNow(), }); @@ -128,6 +140,7 @@ export const apiUpdateWebServerSettings = createSchema.partial().extend({ sshPrivateKey: z.string().optional(), enableDockerCleanup: z.boolean().optional(), logCleanupCron: z.string().optional().nullable(), + deploymentConcurrency: z.number().int().min(1).max(10).optional(), metricsConfig: z .object({ server: z.object({ @@ -211,6 +224,10 @@ export const apiUpdateWhitelabeling = z.object({ whitelabelingConfig: whitelabelingConfigSchema, }); +export const apiUpdateWebServerDeploymentConcurrency = z.object({ + deploymentConcurrency: z.number().int().min(1).max(10), +}); + export const apiUpdateWebServerMonitoring = z.object({ metricsConfig: z .object({ diff --git a/packages/server/src/utils/process/execAsync.ts b/packages/server/src/utils/process/execAsync.ts index d44e3ccaf8..39de6c9f2c 100644 --- a/packages/server/src/utils/process/execAsync.ts +++ b/packages/server/src/utils/process/execAsync.ts @@ -1,43 +1,66 @@ import { exec, execFile } from "node:child_process"; -import util from "node:util"; import { findServerById } from "@dokploy/server/services/server"; import { Client } from "ssh2"; import { ExecError } from "./ExecError"; +import { + getCurrentJob, + jobMarker, + trackLocalChild, + trackSshClient, +} from "./job-context"; // Re-export ExecError for easier imports export { ExecError } from "./ExecError"; -const execAsyncBase = util.promisify(exec); +/** + * When we're inside a deployment job, prepend a marker (`: DOKPLOY_JOB_ID=…`) + * so the spawned shell's argv carries an identifier we can grep for, and + * spawn the child detached (own process group) so cancel can `kill -PGID` + * the entire build tree — shell, docker compose, docker build — in one shot. + */ +const tagCommand = (command: string): string => { + const ctx = getCurrentJob(); + if (!ctx) return command; + return `: ${jobMarker(ctx.jobId)};\n${command}`; +}; -export const execAsync = async ( +export const execAsync = ( command: string, options?: { cwd?: string; env?: NodeJS.ProcessEnv; shell?: string }, ): Promise<{ stdout: string; stderr: string }> => { - try { - const result = await execAsyncBase(command, options); - return { - stdout: result.stdout.toString(), - stderr: result.stderr.toString(), - }; - } catch (error) { - if (error instanceof Error) { - // @ts-ignore - exec error has these properties - const exitCode = error.code; - // @ts-ignore - const stdout = error.stdout?.toString() || ""; - // @ts-ignore - const stderr = error.stderr?.toString() || ""; - - throw new ExecError(`Command execution failed: ${error.message}`, { - command, - stdout, - stderr, - exitCode, - originalError: error, - }); - } - throw error; - } + const ctx = getCurrentJob(); + const tagged = tagCommand(command); + return new Promise((resolve, reject) => { + const child = exec( + tagged, + { + ...options, + // Own process group so cancel can `kill -PGID` the whole tree. + detached: !!ctx, + } as Parameters[1], + (error, stdout, stderr) => { + if (error) { + const codeRaw = (error as { code?: unknown }).code; + const exitCode = typeof codeRaw === "number" ? codeRaw : undefined; + reject( + new ExecError(`Command execution failed: ${error.message}`, { + command, + stdout: typeof stdout === "string" ? stdout : stdout.toString(), + stderr: typeof stderr === "string" ? stderr : stderr.toString(), + exitCode, + originalError: error, + }), + ); + return; + } + resolve({ + stdout: typeof stdout === "string" ? stdout : stdout.toString(), + stderr: typeof stderr === "string" ? stderr : stderr.toString(), + }); + }, + ); + if (ctx) trackLocalChild(ctx.jobId, child); + }); }; interface ExecOptions { @@ -61,8 +84,7 @@ export const execAsyncStream = ( command, stdout: stdoutComplete, stderr: stderrComplete, - // @ts-ignore - exitCode: error.code, + exitCode: (error as { code?: number }).code, originalError: error, }), ); @@ -88,7 +110,7 @@ export const execAsyncStream = ( }); childProcess.on("error", (error) => { - console.log(error); + console.error("execAsyncStream error", error); reject( new ExecError(`Command execution error: ${error.message}`, { command, @@ -150,13 +172,15 @@ export const execAsyncRemote = async ( let stdout = ""; let stderr = ""; + const tagged = tagCommand(command); + const ctx = getCurrentJob(); return new Promise((resolve, reject) => { const conn = new Client(); + if (ctx) trackSshClient(ctx.jobId, conn); - sleep(1000); conn .once("ready", () => { - conn.exec(command, (err, stream) => { + conn.exec(tagged, (err, stream) => { if (err) { onData?.(err.message); reject( @@ -188,6 +212,14 @@ export const execAsyncRemote = async ( ); } }) + .on("error", (err: Error) => { + reject( + new ExecError( + `Remote stream error: ${err.message}`, + { command, stdout, stderr, serverId, originalError: err }, + ), + ); + }) .on("data", (data: string) => { stdout += data.toString(); onData?.(data.toString()); diff --git a/packages/server/src/utils/process/job-context.ts b/packages/server/src/utils/process/job-context.ts new file mode 100644 index 0000000000..758f104f18 --- /dev/null +++ b/packages/server/src/utils/process/job-context.ts @@ -0,0 +1,138 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import type { ChildProcess } from "node:child_process"; +import type { Client } from "ssh2"; + +/** + * Per-deployment job context that flows through every async call + * (deployApplication → getBuildCommand → execAsync / execAsyncRemote) + * via Node's AsyncLocalStorage. + * + * NOTE on the `Symbol.for(...)` singleton dance below: the dokploy app + * bundles `@dokploy/server` as an *external* dependency, so this file + * gets loaded twice in production — once for the app's own code, once + * for the workspace package. Without pinning the instances on + * `globalThis`, the AsyncLocalStorage / child registries would be two + * separate copies, and context set in the worker handler would be + * invisible to `execAsync`. Cancel would then have nothing to kill. + */ +export interface JobContext { + jobId: string; + /** null = LOCAL (Dokploy host); otherwise the remote serverId. */ + serverId: string | null; +} + +const STORE_KEY = Symbol.for("dokploy.jobContext.store"); +const LOCAL_KEY = Symbol.for("dokploy.jobContext.localChildren"); +const REMOTE_KEY = Symbol.for("dokploy.jobContext.remoteSshClients"); + +type GlobalShared = typeof globalThis & { + [STORE_KEY]?: AsyncLocalStorage; + [LOCAL_KEY]?: Map>; + [REMOTE_KEY]?: Map>; +}; + +const g = globalThis as GlobalShared; + +export const dokployJobContext: AsyncLocalStorage = + g[STORE_KEY] ?? (g[STORE_KEY] = new AsyncLocalStorage()); + +const localChildren: Map> = g[LOCAL_KEY] ?? +(g[LOCAL_KEY] = new Map()); + +const remoteSshClients: Map> = g[REMOTE_KEY] ?? +(g[REMOTE_KEY] = new Map()); + +export const getCurrentJob = (): JobContext | undefined => + dokployJobContext.getStore(); + +/** Marker injected into the deploy command line for `pkill -f` matching. */ +export const jobMarker = (jobId: string): string => `DOKPLOY_JOB_ID=${jobId}`; + +export const trackLocalChild = (jobId: string, child: ChildProcess): void => { + let set = localChildren.get(jobId); + if (!set) { + set = new Set(); + localChildren.set(jobId, set); + } + set.add(child); + const cleanup = () => { + const s = localChildren.get(jobId); + s?.delete(child); + if (s && s.size === 0) localChildren.delete(jobId); + }; + child.once("exit", cleanup); + child.once("close", cleanup); +}; + +export const trackSshClient = (jobId: string, client: Client): void => { + let set = remoteSshClients.get(jobId); + if (!set) { + set = new Set(); + remoteSshClients.set(jobId, set); + } + set.add(client); + const cleanup = () => { + const s = remoteSshClients.get(jobId); + s?.delete(client); + if (s && s.size === 0) remoteSshClients.delete(jobId); + }; + client.once("end", cleanup); + client.once("close", cleanup); +}; + +/** + * Kill every process this job spawned. + * + * LOCAL: SIGKILL the process group (PGID = child.pid; the shell was + * spawned with `detached: true`), tearing down sh, `docker compose` + * and `docker build` in one shot. + * + * REMOTE: destroy the ssh client (force-close the socket); the + * stream error triggers promise rejection in execAsyncRemote, + * propagating cancellation through the handler. + * + * Limitation: BuildKit RUN-step containers are owned by dockerd and + * live outside our process group, so an in-flight RUN step runs to + * natural completion before the build aborts. Sub-second RUN-step + * cancel requires BuildKit's gRPC Cancel API with session-ID + * tracking, which is out of scope. + */ +export const killJobProcesses = ( + jobId: string, +): { local: number; remote: number } => { + let local = 0; + let remote = 0; + const localSet = localChildren.get(jobId); + const sshSet = remoteSshClients.get(jobId); + if (localSet) { + for (const child of localSet) { + if (!child.pid) continue; + try { + // Try the process group first (works when the child was + // spawned with detached:true so PGID = child.pid). If the + // group doesn't exist (ESRCH), fall back to killing the + // shell's PID — that closes its stdio, which docker + // compose / docker build CLIs treat as a cancel signal. + try { + process.kill(-child.pid, "SIGKILL"); + } catch { + child.kill("SIGKILL"); + } + local++; + } catch { + // Process already gone — nothing more to do. + } + } + localChildren.delete(jobId); + } + if (sshSet) { + for (const client of sshSet) { + try { + client.destroy(); + remote++; + } catch {} + } + remoteSshClients.delete(jobId); + } + return { local, remote }; +};