diff --git a/Makefile b/Makefile index 24bbb3a23..c39c704c6 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ gateway: docker push localhost:5001/beta9-gateway:$(tag) worker: - docker build . --target final --build-arg BASE_STAGE=dev -f ./docker/Dockerfile.worker -t localhost:5001/beta9-worker:$(workerTag) + docker build . --target final --build-arg BASE_STAGE=dev -f ./docker/Dockerfile.worker -t localhost:5001/beta9-worker:$(workerTag) --build-arg CEDANA_BASE_URL=$(CEDANA_URL) --build-arg CEDANA_TOKEN=$(CEDANA_AUTH_TOKEN) docker push localhost:5001/beta9-worker:$(workerTag) bin/delete_workers.sh diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway index 515fe8389..43efc7df5 100644 --- a/docker/Dockerfile.gateway +++ b/docker/Dockerfile.gateway @@ -1,4 +1,4 @@ -FROM golang:1.22.10-bullseye AS base +FROM golang:1.23.5-bullseye AS base RUN apt-get update && \ apt-get install -y --no-install-recommends fuse3 diff --git a/docker/Dockerfile.worker b/docker/Dockerfile.worker index e15c76f9d..907fa8700 100644 --- a/docker/Dockerfile.worker +++ b/docker/Dockerfile.worker @@ -1,7 +1,7 @@ # syntax=docker/dockerfile:1.7-labs ARG BASE_STAGE=dev -FROM golang:1.22.10-bookworm AS golang +FROM golang:1.23.5-bookworm AS golang RUN apt-get update && apt-get install -y curl git @@ -120,38 +120,21 @@ RUN mkdir -p /etc/containers/registries.conf.d RUN curl -L https://raw.githubusercontent.com/containers/shortnames/refs/heads/main/shortnames.conf \ -o /etc/containers/registries.conf.d/shortnames.conf -# XXX: Remove once cedana starts shipping with a compatible binary +ARG CEDANA_VERSION=0.9.238-pre +ARG CEDANA_TOKEN +ARG CEDANA_BASE_URL RUN < 0 { - defer daemon.Process.Kill() - defer taskConn.Close() - - if err != nil { - return nil, fmt.Errorf("cedana health check failed: %v", err) - } - - if len(details.UnhealthyReasons) > 0 { - return nil, fmt.Errorf( - "cedana health failed with reasons: %v", - details.UnhealthyReasons, - ) + resp, err := client.HealthCheck(ctx, &cedanadaemon.HealthCheckReq{Full: false}, grpc.WaitForReady(true)) + if err != nil { + return nil, fmt.Errorf("cedana health check failed: %w", err) + } + + errorsFound := false + for _, result := range resp.Results { + for _, component := range result.Components { + for _, errs := range component.Errors { + log.Error().Str("name", component.Name).Str("data", component.Data).Msgf("cedana health check error: %v", errs) + errorsFound = true + } + for _, warning := range component.Warnings { + log.Warn().Str("name", component.Name).Str("data", component.Data).Msgf("cedana health check warning: %v", warning) + } } } + if errorsFound { + return nil, fmt.Errorf("cedana health check failed") + } - return client, nil -} + log.Info().Msg("cedana client initialized") -func (c *CedanaClient) Close() { - c.conn.Close() - c.daemon.Process.Kill() + return &CedanaClient{client: client}, nil } -// Updates the runc container spec to make the shared library available -// as well as the shared memory that is used for communication -func (c *CedanaClient) PrepareContainerSpec(spec *specs.Spec, containerId string, containerHostname string, gpuEnabled bool) error { - os.MkdirAll(checkpointSignalDir(containerId), os.ModePerm) // Add a mount point for the checkpoint signal file - - spec.Mounts = append(spec.Mounts, specs.Mount{ - Type: "bind", - Source: checkpointSignalDir(containerId), - Destination: "/cedana", - Options: []string{ - "rbind", - "rprivate", - "nosuid", - "nodev", +// Spawn a runc container using cedana, creating a 'job' in cedana +func (c *CedanaClient) Run(ctx context.Context, containerId string, bundle string, gpuEnabled bool, runcOpts *runc.CreateOpts) (chan int, error) { + // If config path provided directly, derive bundle from it + if runcOpts.ConfigPath != "" { + bundle = strings.TrimRight(runcOpts.ConfigPath, filepath.Base(runcOpts.ConfigPath)) + } + + args := &cedanadaemon.RunReq{ + Action: cedanadaemon.RunAction_START_NEW, + JID: containerId, // just use containerId for convenience + GPUEnabled: gpuEnabled, + Attachable: true, + Type: "runc", + Details: &cedanadaemon.Details{ + Runc: &cedanarunc.Runc{ + ID: containerId, + Bundle: bundle, + Root: runcRoot, + }, }, - }) - - containerIdPath := filepath.Join(checkpointSignalDir(containerId), checkpointContainerIdFileName) - err := os.WriteFile(containerIdPath, []byte(containerId), 0644) - if err != nil { - return err } - containerHostnamePath := filepath.Join(checkpointSignalDir(containerId), checkpointContainerHostnameFileName) - err = os.WriteFile(containerHostnamePath, []byte(containerHostname), 0644) + resp, profilingData, err := c.client.Run(ctx, args) if err != nil { - return err - } - - if !gpuEnabled { - return nil // No need to do anything else if GPU is not enabled - } - - // First check if shared library is on worker - if _, err := os.Stat(cedanaSharedLibPath); os.IsNotExist(err) { - return fmt.Errorf( - "%s not found on worker. Was the daemon started with GPU enabled?", - cedanaSharedLibPath, - ) + return nil, fmt.Errorf("failed to run runc container: %w", err) } - // Remove nvidia prestart hook as we don't need actual device mounts - spec.Hooks.Prestart = nil - - // Add shared memory mount from worker instead, remove existing /dev/shm mount - for i, m := range spec.Mounts { - if m.Destination == "/dev/shm" { - spec.Mounts = append(spec.Mounts[:i], spec.Mounts[i+1:]...) - break - } + if runcOpts.Started != nil { + runcOpts.Started <- int(resp.PID) } - // Add shared memory mount from worker - spec.Mounts = append(spec.Mounts, specs.Mount{ - Destination: "/dev/shm", - Source: "/dev/shm", - Type: "bind", - Options: []string{ - "rbind", - "rprivate", - "nosuid", - "nodev", - "rw", - }, - }) - - // Add the shared library to the container - spec.Mounts = append(spec.Mounts, specs.Mount{ - Destination: cedanaSharedLibPath, - Source: cedanaSharedLibPath, - Type: "bind", - Options: []string{ - "rbind", - "rprivate", - "nosuid", - "nodev", - "rw", - }, - }) + _ = profilingData - // XXX: Remove /usr/lib/worker/x86_64-linux-gnu from mounts - for i, m := range spec.Mounts { - if m.Destination == "/usr/lib/worker/x86_64-linux-gnu" { - spec.Mounts = append(spec.Mounts[:i], spec.Mounts[i+1:]...) - break - } + _, stdout, stderr, exitCode, _, err := c.client.AttachIO(ctx, &cedanadaemon.AttachReq{PID: resp.PID}) + if err != nil { + return nil, fmt.Errorf("failed to attach to runc container: %w", err) } - spec.Process.Env = append(spec.Process.Env, "CEDANA_JID="+containerId, "LD_PRELOAD="+cedanaSharedLibPath) - return nil -} - -// Start managing a runc container -func (c *CedanaClient) Manage(ctx context.Context, containerId string, gpuEnabled bool) error { - ctx, cancel := context.WithTimeout(ctx, defaultManageDeadline) - defer cancel() + go io.Copy(runcOpts.OutputWriter, stdout) + go io.Copy(runcOpts.OutputWriter, stderr) - args := &cedanaproto.RuncManageArgs{ - ContainerID: containerId, - GPU: gpuEnabled, - Root: runcRoot, - } - _, err := c.service.RuncManage(ctx, args) - if err != nil { - return err - } - return nil + return exitCode, nil } -// Checkpoint a runc container, returns the path to the checkpoint func (c *CedanaClient) Checkpoint(ctx context.Context, containerId string) (string, error) { - ctx, cancel := context.WithTimeout(ctx, defaultCheckpointDeadline) - defer cancel() - - args := cedanaproto.JobDumpArgs{ - JID: containerId, - CriuOpts: &cedanaproto.CriuOpts{ - TcpClose: true, - TcpEstablished: true, - LeaveRunning: true, - TcpSkipInFlight: true, + args := &cedanadaemon.DumpReq{ + Name: containerId, + Type: "job", + Criu: &criu.CriuOpts{ + TcpSkipInFlight: proto.Bool(true), + TcpEstablished: proto.Bool(true), + LeaveRunning: proto.Bool(true), + LinkRemap: proto.Bool(true), }, - Dir: fmt.Sprintf("%s/%s", checkpointPathBase, containerId), + Details: &cedanadaemon.Details{JID: &containerId}, } - res, err := c.service.JobDump(ctx, &args) + + resp, profilingData, err := c.client.Dump(ctx, args) if err != nil { - return "", err + return "", fmt.Errorf("failed to dump runc container: %w", err) } + _ = profilingData - log.Info().Str("container_id", containerId).Interface("dump_stats", res.GetDumpStats()).Msg("dump stats") - return res.GetState().GetCheckpointPath(), nil + return resp.Path, nil } type cedanaRestoreOpts struct { @@ -267,17 +167,7 @@ type cedanaRestoreOpts struct { cacheFunc func(string, string) (string, error) } -// Restore a runc container. If a checkpoint path is provided, it will be used as the checkpoint. -// If empty path is provided, the latest checkpoint path from DB will be used. -func (c *CedanaClient) Restore( - ctx context.Context, - restoreOpts cedanaRestoreOpts, - runcOpts *runc.CreateOpts, -) (*cedanaproto.ProcessState, error) { - ctx, cancel := context.WithTimeout(ctx, defaultCheckpointDeadline) - defer cancel() - - // NOTE: Cedana uses bundle path to find the config.json +func (c *CedanaClient) Restore(ctx context.Context, restoreOpts cedanaRestoreOpts, runcOpts *runc.CreateOpts) (chan int, error) { bundle := strings.TrimRight(runcOpts.ConfigPath, filepath.Base(runcOpts.ConfigPath)) // If a cache function is provided, attempt to cache the checkpoint nearby @@ -291,46 +181,42 @@ func (c *CedanaClient) Restore( } } - args := &cedanaproto.JobRestoreArgs{ - JID: restoreOpts.jobId, - RuncOpts: &cedanaproto.RuncOpts{ - Root: runcRoot, - Bundle: bundle, - Detach: true, - ConsoleSocket: runcOpts.ConsoleSocket.Path(), - ContainerID: restoreOpts.containerId, + args := &cedanadaemon.RestoreReq{ + Path: restoreOpts.checkpointPath, + Type: "job", + Attachable: true, + Criu: &criu.CriuOpts{ + TcpClose: proto.Bool(true), + TcpEstablished: proto.Bool(true), + }, + Details: &cedanadaemon.Details{ + JID: &restoreOpts.jobId, + Runc: &cedanarunc.Runc{ + ID: restoreOpts.containerId, + Bundle: bundle, + Root: runcRoot, + }, }, - CriuOpts: &cedanaproto.CriuOpts{TcpClose: true, TcpEstablished: true}, - CheckpointPath: restoreOpts.checkpointPath, } - res, err := c.service.JobRestore(ctx, args) + + resp, profilingData, err := c.client.Restore(ctx, args) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to restore runc container: %w", err) } - log.Info().Str("container_id", restoreOpts.containerId).Interface("restore_stats", res.GetRestoreStats()).Msg("restore stats") - if runcOpts.Started != nil { - runcOpts.Started <- int(res.GetState().GetPID()) + runcOpts.Started <- int(resp.PID) } - return res.State, nil -} - -// Perform a detailed health check of cedana C/R capabilities -func (c *CedanaClient) DetailedHealthCheckWait( - ctx context.Context, -) (*cedanaproto.DetailedHealthCheckResponse, error) { - ctx, cancel := context.WithTimeout(ctx, defaultHealthCheckDeadline) - defer cancel() + _ = profilingData - opts := []grpc.CallOption{} - opts = append(opts, grpc.WaitForReady(true)) - - res, err := c.service.DetailedHealthCheck(ctx, &cedanaproto.DetailedHealthCheckRequest{}, opts...) + _, stdout, stderr, exitCode, _, err := c.client.AttachIO(ctx, &cedanadaemon.AttachReq{PID: resp.PID}) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to attach to runc container: %w", err) } - return res, nil + go io.Copy(runcOpts.OutputWriter, stdout) + go io.Copy(runcOpts.OutputWriter, stderr) + + return exitCode, nil } diff --git a/pkg/worker/cr.go b/pkg/worker/cr.go index 24722cc47..224c57a19 100644 --- a/pkg/worker/cr.go +++ b/pkg/worker/cr.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" "fmt" + "io" "log/slog" "os" "path/filepath" @@ -15,7 +16,9 @@ import ( "github.com/rs/zerolog/log" ) -func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.ContainerRequest, consoleWriter *ConsoleWriter, startedChan chan int, configPath string) (bool, string, error) { +const defaultCheckpointDeadline = 10 * time.Minute + +func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.ContainerRequest, outputWriter io.Writer, startedChan chan int, configPath string) (chan int, string, error) { state, createCheckpoint := s.shouldCreateCheckpoint(request) // If checkpointing is enabled, attempt to create a checkpoint @@ -31,14 +34,14 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types. os.Create(filepath.Join(checkpointSignalDir(request.ContainerId), checkpointCompleteFileName)) - _, err := s.cedanaClient.Restore(ctx, cedanaRestoreOpts{ + exitCodeChan, err := s.cedanaClient.Restore(ctx, cedanaRestoreOpts{ checkpointPath: checkpointPath, jobId: state.ContainerId, containerId: request.ContainerId, cacheFunc: s.cacheCheckpoint, }, &runc.CreateOpts{ Detach: true, - ConsoleSocket: consoleWriter, + OutputWriter: outputWriter, ConfigPath: configPath, Started: startedChan, }) @@ -52,14 +55,14 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types. StubId: request.StubId, }) - return false, "", err + return nil, "", err } else { log.Info().Str("container_id", request.ContainerId).Msg("checkpoint found and restored") - return true, state.ContainerId, nil + return exitCodeChan, request.ContainerId, nil } } - return false, "", nil + return nil, "", nil } // Waits for the container to be ready to checkpoint at the desired point in execution, ie. @@ -68,8 +71,6 @@ func (s *Worker) createCheckpoint(ctx context.Context, request *types.ContainerR os.MkdirAll(filepath.Join(s.config.Worker.CRIU.Storage.MountPath, request.Workspace.Name), os.ModePerm) timeout := defaultCheckpointDeadline - managing := false - gpuEnabled := request.RequiresGPU() ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -89,18 +90,6 @@ waitForReady: continue } - // Start managing the container with Cedana - if !managing { - err := s.cedanaClient.Manage(ctx, instance.Id, gpuEnabled) - if err == nil { - managing = true - } else { - log.Error().Str("container_id", instance.Id).Msgf("cedana manage failed, container may not be started yet: %+v", err) - } - - continue - } - // Check if the container is ready for checkpoint by verifying the existence of a signal file readyFilePath := filepath.Join(checkpointSignalDir(instance.Id), checkpointSignalFileName) if _, err := os.Stat(readyFilePath); err == nil { diff --git a/pkg/worker/lifecycle.go b/pkg/worker/lifecycle.go index 4cd325cd3..dfce3a8e4 100644 --- a/pkg/worker/lifecycle.go +++ b/pkg/worker/lifecycle.go @@ -209,7 +209,7 @@ func (s *Worker) RunContainer(ctx context.Context, request *types.ContainerReque err = s.containerMountManager.SetupContainerMounts(request) if err != nil { - s.containerLogger.Log(request.ContainerId, request.StubId, fmt.Sprintf("failed to setup container mounts: %v", err)) + s.containerLogger.Log(request.ContainerId, request.StubId, "failed to setup container mounts: %v", err) } // Generate dynamic runc spec for this container @@ -297,7 +297,7 @@ func (s *Worker) specFromRequest(request *types.ContainerRequest, options *Conta spec.Process.Cwd = defaultContainerDirectory spec.Process.Args = request.EntryPoint - spec.Process.Terminal = true // NOTE: This is since we are using a console writer for logging + spec.Process.Terminal = false if s.config.Worker.RunCResourcesEnforced { spec.Linux.Resources.CPU = getLinuxCPU(request) @@ -328,10 +328,46 @@ func (s *Worker) specFromRequest(request *types.ContainerRequest, options *Conta spec.Hooks.Prestart = nil } - // We need to modify the spec to support Cedana C/R if enabled + // We need to include the checkpoint signal files in the container spec if s.IsCRIUAvailable() && request.CheckpointEnabled { + err = os.MkdirAll(checkpointSignalDir(request.ContainerId), os.ModePerm) // Add a mount point for the checkpoint signal file + if err != nil { + return nil, err + } + + spec.Mounts = append(spec.Mounts, specs.Mount{ + Type: "bind", + Source: checkpointSignalDir(request.ContainerId), + Destination: "/cedana", + Options: []string{ + "rbind", + "rprivate", + "nosuid", + "nodev", + }, + }) + + // XXX: Remove /usr/lib/worker/x86_64-linux-gnu from mounts + // as CRIU is unable to find its root mount + for i, m := range spec.Mounts { + if m.Destination == "/usr/lib/worker/x86_64-linux-gnu" { + spec.Mounts = append(spec.Mounts[:i], spec.Mounts[i+1:]...) + break + } + } + + containerIdPath := filepath.Join(checkpointSignalDir(request.ContainerId), checkpointContainerIdFileName) + err := os.WriteFile(containerIdPath, []byte(request.ContainerId), 0644) + if err != nil { + return nil, err + } + containerHostname := fmt.Sprintf("%s:%d", s.podAddr, options.BindPort) - s.cedanaClient.PrepareContainerSpec(spec, request.ContainerId, containerHostname, request.RequiresGPU()) + containerHostnamePath := filepath.Join(checkpointSignalDir(request.ContainerId), checkpointContainerHostnameFileName) + err = os.WriteFile(containerHostnamePath, []byte(containerHostname), 0644) + if err != nil { + return nil, err + } } spec.Process.Env = append(spec.Process.Env, env...) @@ -538,11 +574,7 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output return } - consoleWriter, err := NewConsoleWriter(containerInstance.OutputWriter) - if err != nil { - log.Error().Str("container_id", containerId).Msgf("failed to create console writer: %v", err) - return - } + outputWriter := containerInstance.OutputWriter // Log metrics go s.workerMetrics.EmitContainerUsage(ctx, request) @@ -550,46 +582,44 @@ func (s *Worker) spawn(request *types.ContainerRequest, spec *specs.Spec, output defer func() { go s.eventRepo.PushContainerStoppedEvent(containerId, s.workerId, request) }() startedChan := make(chan int, 1) + var exitCodeChan chan int // Handle checkpoint creation & restore if applicable if s.IsCRIUAvailable() && request.CheckpointEnabled { - restored, restoredContainerId, err := s.attemptCheckpointOrRestore(ctx, request, consoleWriter, startedChan, configPath) + restoredExitCodeChan, restoredContainerId, err := s.attemptCheckpointOrRestore(ctx, request, outputWriter, startedChan, configPath) if err != nil { log.Error().Str("container_id", containerId).Msgf("C/R failed: %v", err) } - if restored { - // HOTFIX: If we restored from a checkpoint, we need to use the container ID of the restored container - // instead of the original container ID - containerInstance, exists := s.containerInstances.Get(request.ContainerId) - if exists { - containerInstance.Id = restoredContainerId - s.containerInstances.Set(containerId, containerInstance) - containerId = restoredContainerId - } - - exitCode = s.waitForRestoredContainer(ctx, containerId, startedChan, outputLogger, request, spec) - return + if restoredExitCodeChan != nil { + containerId = restoredContainerId + exitCodeChan = restoredExitCodeChan + } else { + exitCodeChan, err = s.cedanaClient.Run(ctx, containerId, opts.BundlePath, request.RequiresGPU(), &runc.CreateOpts{ + Detach: true, + OutputWriter: outputWriter, + ConfigPath: configPath, + Started: startedChan, + }) } + } else { + _, err = s.runcHandle.Run(s.ctx, containerId, opts.BundlePath, &runc.CreateOpts{ + Detach: true, + OutputWriter: outputWriter, + ConfigPath: configPath, + Started: startedChan, + }) } - - // Invoke runc process (launch the container) - _, err = s.runcHandle.Run(s.ctx, containerId, opts.BundlePath, &runc.CreateOpts{ - Detach: true, - ConsoleSocket: consoleWriter, - ConfigPath: configPath, - Started: startedChan, - }) if err != nil { log.Error().Str("container_id", containerId).Msgf("failed to run container: %v", err) return } - exitCode = s.wait(ctx, containerId, startedChan, outputLogger, request, spec) + exitCode = s.wait(ctx, containerId, startedChan, exitCodeChan, outputLogger, request, spec) } // Wait for a container to exit and return the exit code -func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan int, outputLogger *slog.Logger, request *types.ContainerRequest, spec *specs.Spec) int { +func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan int, exitCodeChan chan int, outputLogger *slog.Logger, request *types.ContainerRequest, spec *specs.Spec) int { <-startedChan // Clean up runc container state and send final output message @@ -623,12 +653,18 @@ func (s *Worker) wait(ctx context.Context, containerId string, startedChan chan } // Wait for the container to exit - processState, err := process.Wait() - if err != nil { - return cleanup(-1, err) + var exitCode int + if exitCodeChan != nil { + exitCode = <-exitCodeChan + } else { + processState, err := process.Wait() + if err != nil { + return cleanup(-1, err) + } + exitCode = processState.ExitCode() } - return cleanup(processState.ExitCode(), nil) + return cleanup(exitCode, nil) } func (s *Worker) createOverlay(request *types.ContainerRequest, bundlePath string) *common.ContainerOverlay { diff --git a/pkg/worker/runc_console.go b/pkg/worker/runc_console.go index 91ffbaa19..eb7e64c51 100644 --- a/pkg/worker/runc_console.go +++ b/pkg/worker/runc_console.go @@ -51,7 +51,7 @@ func NewConsoleWriter(writer io.Writer) (*ConsoleWriter, error) { defer socket.Close() // Get the master file descriptor from runC. - master, err := utils.RecvFd(socket) + master, err := utils.RecvFile(socket) if err != nil { return } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6c0f709c7..3fad83f3e 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -161,7 +161,7 @@ func NewWorker() (*Worker, error) { var cedanaClient *CedanaClient = nil var checkpointStorage storage.Storage = nil if pool, ok := config.Worker.Pools[workerPoolName]; ok && pool.CRIUEnabled { - cedanaClient, err = NewCedanaClient(context.Background(), config.Worker.CRIU.Cedana, gpuType != "") + cedanaClient, err = InitializeCedana(context.Background(), config.Worker.CRIU.Cedana) if err != nil { log.Warn().Str("worker_id", workerId).Msgf("C/R unavailable, failed to create cedana client: %v", err) }