Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions packages/opencode/src/worktree/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { errorMessage } from "../util/error"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
import { Git } from "@/git"
import { Effect, Layer, Path, Scope, Context, Stream, Semaphore } from "effect"
import { Deferred, Effect, Layer, Path, Scope, Context, Stream, Semaphore } from "effect"
import { ensureWorktreesIgnored, restoreWorktreesIgnored } from "./gitignore-guard"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { NodePath } from "@effect/platform-node"
Expand Down Expand Up @@ -446,10 +446,7 @@ export namespace Worktree {
},
})

yield* runStartScripts(info.directory, { projectID, extra }).pipe(
Effect.catchCause((cause) => Effect.sync(() => log.error("worktree start task failed", { cause }))),
Effect.forkIn(scope),
)
yield* launchStartScripts(info.directory, { projectID, extra })
})

const createFromInfo = Effect.fn("Worktree.createFromInfo")(function* (info: Info, startCommand?: string) {
Expand Down Expand Up @@ -608,28 +605,42 @@ export namespace Worktree {
return result
})

const signalStartLaunch = (launch: Deferred.Deferred<void> | undefined) => {
if (!launch) return Effect.void
return Deferred.succeed(launch, undefined).pipe(Effect.ignore)
}

const runStartCommand = Effect.fnUntraced(
function* (directory: string, cmd: string) {
const [shell, args] = process.platform === "win32" ? ["cmd", ["/c", cmd]] : ["bash", ["-lc", cmd]]
const handle = yield* spawner.spawn(
ChildProcess.make(shell, args, { cwd: directory, extendEnv: true, stdin: "ignore" }),
function* (directory: string, cmd: string, launch?: Deferred.Deferred<void>) {
return yield* Effect.gen(function* () {
const [shell, args] = process.platform === "win32" ? ["cmd", ["/c", cmd]] : ["bash", ["-lc", cmd]]
const handle = yield* spawner.spawn(
ChildProcess.make(shell, args, { cwd: directory, extendEnv: true, stdin: "ignore" }),
)
yield* signalStartLaunch(launch)
// Drain stdout, capture stderr for error reporting
const [, stderr] = yield* Effect.all(
[Stream.runDrain(handle.stdout), Stream.mkString(Stream.decodeText(handle.stderr))],
{ concurrency: 2 },
).pipe(Effect.orDie)
const code = yield* handle.exitCode
return { code, stderr }
}).pipe(
Effect.scoped,
Effect.catch(() => signalStartLaunch(launch).pipe(Effect.as({ code: 1, stderr: "" }))),
)
// Drain stdout, capture stderr for error reporting
const [, stderr] = yield* Effect.all(
[Stream.runDrain(handle.stdout), Stream.mkString(Stream.decodeText(handle.stderr))],
{ concurrency: 2 },
).pipe(Effect.orDie)
const code = yield* handle.exitCode
return { code, stderr }
},
Effect.scoped,
Effect.catch(() => Effect.succeed({ code: 1, stderr: "" })),
)

const runStartScript = Effect.fnUntraced(function* (directory: string, cmd: string, kind: string) {
const runStartScript = Effect.fnUntraced(function* (
directory: string,
cmd: string,
kind: string,
launch?: Deferred.Deferred<void>,
) {
const text = cmd.trim()
if (!text) return true
const result = yield* runStartCommand(directory, text)
const result = yield* runStartCommand(directory, text, launch)
if (result.code === 0) return true
log.error("worktree start command failed", { kind, directory, message: result.stderr })
return false
Expand All @@ -638,18 +649,40 @@ export namespace Worktree {
const runStartScripts = Effect.fnUntraced(function* (
directory: string,
input: { projectID: ProjectID; extra?: string },
launch?: Deferred.Deferred<void>,
) {
const row = yield* Effect.sync(() =>
Database.use((db) => db.select().from(ProjectTable).where(eq(ProjectTable.id, input.projectID)).get()),
)
const project = row ? Project.fromRow(row) : undefined
const startup = project?.commands?.start?.trim() ?? ""
const ok = yield* runStartScript(directory, startup, "project")
const extra = input.extra?.trim() ?? ""
if (!startup && !extra) {
yield* signalStartLaunch(launch)
return true
}
const ok = yield* runStartScript(directory, startup, "project", startup ? launch : undefined)
if (!ok) return false
yield* runStartScript(directory, input.extra ?? "", "worktree")
yield* runStartScript(directory, extra, "worktree", startup ? undefined : launch)
return true
})

const launchStartScripts = Effect.fnUntraced(function* (
directory: string,
input: { projectID: ProjectID; extra?: string },
) {
const launch = yield* Deferred.make<void>()
yield* runStartScripts(directory, input, launch).pipe(
Effect.catchCause((cause) =>
signalStartLaunch(launch).pipe(
Effect.andThen(Effect.sync(() => log.error("worktree start task failed", { cause }))),
),
),
Effect.forkIn(scope),
)
return yield* Deferred.await(launch)
})

const prune = Effect.fnUntraced(function* (root: string, entries: string[]) {
const base = yield* canonical(root)
yield* Effect.forEach(
Expand Down Expand Up @@ -766,10 +799,7 @@ export namespace Worktree {
yield* upsertRegistry(Info.parse({ ...registered, branch }))
}

yield* runStartScripts(worktreePath, { projectID: Instance.project.id }).pipe(
Effect.catchCause((cause) => Effect.sync(() => log.error("worktree start task failed", { cause }))),
Effect.forkIn(scope),
)
yield* launchStartScripts(worktreePath, { projectID: Instance.project.id })

return true
})
Expand Down
88 changes: 75 additions & 13 deletions packages/opencode/test/project/worktree.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,81 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"

const wintest = process.platform !== "win32" ? test : test.skip
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import * as CrossSpawnSpawner from "@opencode-ai/core/cross-spawn-spawner"
import { NodePath } from "@effect/platform-node"
import { Effect, Layer, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import fs from "fs/promises"
import path from "path"
import { Git } from "../../src/git"
import { Instance } from "../../src/project/instance"
import { Project } from "../../src/project/project"
import { ProjectTable } from "../../src/project/project.sql"
import { Database, eq } from "../../src/storage/db"
import { Worktree } from "../../src/worktree"
import { tmpdir } from "../fixture/fixture"

const encoder = new TextEncoder()
type StartCommandProbe = { spawned: boolean; released: boolean; release?: () => void }

function withInstance(directory: string, fn: () => Promise<any>) {
return Instance.provide({ directory, fn })
}

function startCommandSpawnProbe(probe: StartCommandProbe) {
return Layer.effect(
ChildProcessSpawner.ChildProcessSpawner,
Effect.gen(function* () {
const real = yield* ChildProcessSpawner.ChildProcessSpawner
return ChildProcessSpawner.make(
Effect.fnUntraced(function* (command) {
const std = ChildProcess.isStandardCommand(command) ? command : undefined
const text = std ? [std.command, ...std.args].join(" ") : ""
if (text.includes("start-spawn-probe")) {
probe.spawned = true
let released = false
let finish: (code: ChildProcessSpawner.ExitCode) => void = () => undefined
const exit = new Promise<ChildProcessSpawner.ExitCode>((resolve) => {
finish = resolve
})
probe.release = () => {
if (released) return
released = true
probe.released = true
finish(ChildProcessSpawner.ExitCode(0))
}
return ChildProcessSpawner.makeHandle({
pid: ChildProcessSpawner.ProcessId(0),
exitCode: Effect.promise(() => exit),
isRunning: Effect.sync(() => !released),
kill: () => Effect.sync(() => probe.release?.()),
stdin: { [Symbol.for("effect/Sink/TypeId")]: Symbol.for("effect/Sink/TypeId") } as any,
stdout: Stream.empty,
stderr: Stream.make(encoder.encode("")),
all: Stream.empty,
getInputFd: () => ({ [Symbol.for("effect/Sink/TypeId")]: Symbol.for("effect/Sink/TypeId") }) as any,
getOutputFd: () => Stream.empty,
unref: Effect.succeed(Effect.void),
})
}
return yield* real.spawn(command)
}),
)
}),
).pipe(Layer.provide(CrossSpawnSpawner.defaultLayer))
}

function worktreeLayerWithStartCommandProbe(probe: StartCommandProbe) {
return Worktree.layer.pipe(
Layer.provide(Git.defaultLayer),
Layer.provide(Project.defaultLayer),
Layer.provide(startCommandSpawnProbe(probe)),
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(NodePath.layer),
)
}

function normalize(input: string) {
return input.replace(/\\/g, "/").toLowerCase()
}
Expand Down Expand Up @@ -230,36 +292,36 @@ describe("Worktree", () => {
})

describe("reset", () => {
test("starts project start command without waiting for it to exit", async () => {
test("starts project start command before returning without waiting for it to exit", async () => {
await using tmp = await tmpdir({ git: true })
const probe: StartCommandProbe = { spawned: false, released: false }

await withInstance(tmp.path, async () => {
const info = await Worktree.createReady({ name: "reset-start-command" })
const info = await Worktree.createReady({ name: "reset-start-spawn-probe" })
await Project.update({
projectID: Instance.project.id,
commands: {
start:
"bun -e \"await Bun.write('.reset-start-began', 'ready'); while (!(await Bun.file('.reset-start-release').exists())) await Bun.sleep(20)\"",
start: "start-spawn-probe",
},
})

const reset = Worktree.reset({ directory: info.directory })
const layer = worktreeLayerWithStartCommandProbe(probe)
const reset = Effect.runPromise(
Worktree.Service.use((svc) => svc.reset({ directory: info.directory })).pipe(Effect.provide(layer)),
)
const result = await Promise.race([
reset.then(() => "done" as const),
Bun.sleep(2_000).then(() => "timeout" as const),
Bun.sleep(10_000).then(() => "timeout" as const),
])
if (result === "timeout") {
await Bun.write(path.join(info.directory, ".reset-start-release"), "done")
probe.release?.()
await reset.catch(() => undefined)
throw new Error("Worktree.reset waited for the project start command to exit")
}

const deadline = Date.now() + 1_000
while (!(await Bun.file(path.join(info.directory, ".reset-start-began")).exists()) && Date.now() < deadline) {
await Bun.sleep(20)
}
expect(await Bun.file(path.join(info.directory, ".reset-start-began")).text()).toBe("ready")
await Bun.write(path.join(info.directory, ".reset-start-release"), "done")
expect(probe.spawned).toBe(true)
expect(probe.released).toBe(false)
probe.release?.()
await Worktree.remove({ directory: info.directory })
})
})
Expand Down
20 changes: 7 additions & 13 deletions packages/opencode/test/server/automation-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { Flock } from "../../src/util/flock"
import { Worktree } from "../../src/worktree"
import { tmpdir } from "../fixture/fixture"

const RUN_WAIT_TIMEOUT_MS = 10_000

afterEach(async () => {
await Instance.disposeAll()
})
Expand Down Expand Up @@ -46,7 +48,7 @@ function input(projectID: ProjectID, overrides: Partial<Extract<Automation.Creat
}

async function waitForRun(automationID: string, state: Automation.Run["state"]) {
const deadline = Date.now() + 2_000
const deadline = Date.now() + RUN_WAIT_TIMEOUT_MS
while (Date.now() < deadline) {
const run = Automation.runs({ automationID }).items.find((item) => item.state === state)
if (run?.state === state) return run
Expand All @@ -56,7 +58,7 @@ async function waitForRun(automationID: string, state: Automation.Run["state"])
}

async function waitForRunCount(automationID: string, count: number) {
const deadline = Date.now() + 2_000
const deadline = Date.now() + RUN_WAIT_TIMEOUT_MS
while (Date.now() < deadline) {
const items = Automation.runs({ automationID, limit: 100 }).items
if (items.length >= count) return items
Expand All @@ -66,7 +68,7 @@ async function waitForRunCount(automationID: string, count: number) {
}

async function waitForSucceededRunCount(automationID: string, count: number) {
const deadline = Date.now() + 2_000
const deadline = Date.now() + RUN_WAIT_TIMEOUT_MS
while (Date.now() < deadline) {
const items = Automation.runs({ automationID, limit: 100 }).items
const succeeded = items.filter((run) => run.state === "succeeded")
Expand All @@ -77,7 +79,7 @@ async function waitForSucceededRunCount(automationID: string, count: number) {
}

async function waitForTerminalRun(automationID: string) {
const deadline = Date.now() + 2_000
const deadline = Date.now() + RUN_WAIT_TIMEOUT_MS
while (Date.now() < deadline) {
const run = Automation.runs({ automationID }).items.find((item) =>
item.state === "succeeded" || item.state === "failed" || item.state === "stopped"
Expand Down Expand Up @@ -671,7 +673,7 @@ describe("automation runNow execution", () => {
await Automation.runNowExecuting(definition.id, { executor: sessionPromptExecutor })
const result = await Promise.race([
waitForRun(definition.id, "succeeded").then((run) => ({ state: "succeeded" as const, run })),
Bun.sleep(2_000).then(() => ({ state: "timeout" as const })),
Bun.sleep(RUN_WAIT_TIMEOUT_MS).then(() => ({ state: "timeout" as const })),
])
if (result.state === "timeout") {
const worktree = await Worktree.lookupBySlug("long-start")
Expand All @@ -684,14 +686,6 @@ describe("automation runNow execution", () => {
expect(providerCalls).toBe(1)
const worktree = await Worktree.lookupBySlug("long-start")
if (!worktree) throw new Error("expected worktree placement")
const deadline = Date.now() + 1_000
while (
!(await Bun.file(path.join(worktree.directory, ".automation-start-began")).exists()) &&
Date.now() < deadline
) {
await Bun.sleep(20)
}
expect(await Bun.file(path.join(worktree.directory, ".automation-start-began")).text()).toBe("ready")
await Bun.write(path.join(worktree.directory, ".automation-start-release"), "done")
},
})
Expand Down
Loading