diff --git a/cli/injector/main.go b/cli/injector/main.go index 69d91f603b..b2bed3daf1 100644 --- a/cli/injector/main.go +++ b/cli/injector/main.go @@ -602,7 +602,7 @@ func injectAndWait(cmd *cobra.Command, args []string) { break // those disruptions should not watch target to re-inject on container restart case v1beta1.DisruptionIsNotReinjectable((chaostypes.DisruptionKindName)(cmd.Name())): - case disruptionArgs.Level == chaostypes.DisruptionLevelNode: + case parentPID == 0 && disruptionArgs.Level == chaostypes.DisruptionLevelNode: if disruptionArgs.PulseActiveDuration > 0 && disruptionArgs.PulseDormantDuration > 0 { var ( err error diff --git a/command/background_cmd_mock.go b/command/background_cmd_mock.go index a1ebe625cd..e081b52d2e 100644 --- a/command/background_cmd_mock.go +++ b/command/background_cmd_mock.go @@ -22,6 +22,53 @@ func (_m *BackgroundCmdMock) EXPECT() *BackgroundCmdMock_Expecter { return &BackgroundCmdMock_Expecter{mock: &_m.Mock} } +// Done provides a mock function with no fields +func (_m *BackgroundCmdMock) Done() <-chan struct{} { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Done") + } + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// BackgroundCmdMock_Done_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Done' +type BackgroundCmdMock_Done_Call struct { + *mock.Call +} + +// Done is a helper method to define mock.On call +func (_e *BackgroundCmdMock_Expecter) Done() *BackgroundCmdMock_Done_Call { + return &BackgroundCmdMock_Done_Call{Call: _e.mock.On("Done")} +} + +func (_c *BackgroundCmdMock_Done_Call) Run(run func()) *BackgroundCmdMock_Done_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *BackgroundCmdMock_Done_Call) Return(_a0 <-chan struct{}) *BackgroundCmdMock_Done_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *BackgroundCmdMock_Done_Call) RunAndReturn(run func() <-chan struct{}) *BackgroundCmdMock_Done_Call { + _c.Call.Return(run) + return _c +} + // DryRun provides a mock function with no fields func (_m *BackgroundCmdMock) DryRun() bool { ret := _m.Called() diff --git a/command/command.go b/command/command.go index 690e6e6db8..2d746c2be2 100644 --- a/command/command.go +++ b/command/command.go @@ -51,6 +51,8 @@ type BackgroundCmd interface { KeepAlive() Stop() error DryRun() bool + // Done returns a channel that is closed when the background process exits. + Done() <-chan struct{} } type cmd struct { @@ -88,6 +90,7 @@ type backgroundCmd struct { err chan error // Used to monitor the exit of the command keepAliveQuit chan int // Used to kill the keepAlive goroutine pid int // PID of the process + done chan struct{} // Closed when the process exits } type factory struct { @@ -114,23 +117,26 @@ func (f factory) NewCmd(ctx context.Context, name string, args []string) Cmd { func NewBackgroundCmd(cmd Cmd, log *zap.SugaredLogger, processManager process.Manager) BackgroundCmd { return &backgroundCmd{ - cmd, - sync.Mutex{}, - log, - processManager, - nil, - nil, - nil, - process.NotFoundProcessPID, + Cmd: cmd, + log: log, + processManager: processManager, + pid: process.NotFoundProcessPID, + done: make(chan struct{}), } } +// Done returns a channel that is closed when the background process exits. +func (w *backgroundCmd) Done() <-chan struct{} { + return w.done +} + func (w *backgroundCmd) DryRun() bool { return w.Cmd == nil || w.Cmd.DryRun() } func (w *backgroundCmd) Start() error { if w.DryRun() { + close(w.done) return nil } @@ -171,6 +177,8 @@ func (w *backgroundCmd) Start() error { } else { w.log.Info("background command exited successfully") } + + close(w.done) }() return nil diff --git a/controllers/cpu_pressure_test.go b/controllers/cpu_pressure_test.go index 31ed16f78a..e4fe2208ad 100644 --- a/controllers/cpu_pressure_test.go +++ b/controllers/cpu_pressure_test.go @@ -6,6 +6,8 @@ package controllers import ( + "strings" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -46,6 +48,61 @@ var _ = Describe("CPU Pressure", func() { cpuStress, targetPod, _ = InjectPodsAndDisruption(ctx, cpuStress, true) }) + Context("pulse mode", func() { + BeforeEach(func() { + cpuStress.Spec.Duration = "2m" + cpuStress.Spec.Pulse = &chaosv1beta1.DisruptionPulse{ + ActiveDuration: chaosv1beta1.DisruptionDuration("15s"), + DormantDuration: chaosv1beta1.DisruptionDuration("10s"), + } + }) + + It("should inject with pulse arguments and cycle through active/dormant phases", func(ctx SpecContext) { + ExpectDisruptionStatus(ctx, cpuStress, chaostypes.DisruptionInjectionStatusInjected) + + By("Ensuring chaos pod is created") + ExpectChaosPods(ctx, cpuStress, 1) + + By("Verifying chaos pod has pulse arguments") + Eventually(func(g Gomega, ctx SpecContext) { + chaosPods, err := listChaosPods(ctx, cpuStress) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(chaosPods.Items).To(HaveLen(1)) + + args := strings.Join(chaosPods.Items[0].Spec.Containers[0].Args, " ") + g.Expect(args).To(ContainSubstring("cpu-pressure")) + g.Expect(args).To(ContainSubstring("--pulse-active-duration")) + g.Expect(args).To(ContainSubstring("--pulse-dormant-duration")) + }).WithContext(ctx).Within(calcDisruptionGoneTimeout(cpuStress)).ProbeEvery(disruptionPotentialChangesEvery).Should(Succeed()) + + By("Ensuring disruption stays healthy throughout pulse cycle") + ExpectDisruptionStatusUntilExpired(ctx, cpuStress, chaostypes.DisruptionInjectionStatusInjected) + }) + + When("initial delay is configured", func() { + BeforeEach(func() { + cpuStress.Spec.Pulse.InitialDelay = chaosv1beta1.DisruptionDuration("5s") + }) + + It("should inject with initial delay argument and remain healthy", func(ctx SpecContext) { + ExpectDisruptionStatus(ctx, cpuStress, chaostypes.DisruptionInjectionStatusInjected) + + By("Verifying chaos pod has initial delay argument") + Eventually(func(g Gomega, ctx SpecContext) { + chaosPods, err := listChaosPods(ctx, cpuStress) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(chaosPods.Items).To(HaveLen(1)) + + args := strings.Join(chaosPods.Items[0].Spec.Containers[0].Args, " ") + g.Expect(args).To(ContainSubstring("--pulse-initial-delay")) + }).WithContext(ctx).Within(calcDisruptionGoneTimeout(cpuStress)).ProbeEvery(disruptionPotentialChangesEvery).Should(Succeed()) + + By("Ensuring disruption stays healthy throughout pulse cycle with initial delay") + ExpectDisruptionStatusUntilExpired(ctx, cpuStress, chaostypes.DisruptionInjectionStatusInjected) + }) + }) + }) + DescribeTable("targeted container is stopped", func(ctx SpecContext, forced bool) { ExpectDisruptionStatus(ctx, cpuStress, chaostypes.DisruptionInjectionStatusInjected) diff --git a/controllers/memory_pressure_test.go b/controllers/memory_pressure_test.go index 9471915ee8..6be8f2edf2 100644 --- a/controllers/memory_pressure_test.go +++ b/controllers/memory_pressure_test.go @@ -181,6 +181,29 @@ var _ = Describe("Memory Pressure", func() { By("Ensuring disruption stays healthy throughout pulse cycle") ExpectDisruptionStatusUntilExpired(ctx, memoryStress, chaostypes.DisruptionInjectionStatusInjected) }) + + When("initial delay is configured", func() { + BeforeEach(func() { + memoryStress.Spec.Pulse.InitialDelay = chaosv1beta1.DisruptionDuration("5s") + }) + + It("should inject with initial delay argument and remain healthy", func(ctx SpecContext) { + ExpectDisruptionStatus(ctx, memoryStress, chaostypes.DisruptionInjectionStatusInjected) + + By("Verifying chaos pod has initial delay argument") + Eventually(func(g Gomega, ctx SpecContext) { + chaosPods, err := listChaosPods(ctx, memoryStress) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(chaosPods.Items).To(HaveLen(1)) + + args := strings.Join(chaosPods.Items[0].Spec.Containers[0].Args, " ") + g.Expect(args).To(ContainSubstring("--pulse-initial-delay")) + }).WithContext(ctx).Within(calcDisruptionGoneTimeout(memoryStress)).ProbeEvery(disruptionPotentialChangesEvery).Should(Succeed()) + + By("Ensuring disruption stays healthy throughout pulse cycle with initial delay") + ExpectDisruptionStatusUntilExpired(ctx, memoryStress, chaostypes.DisruptionInjectionStatusInjected) + }) + }) }) DescribeTable("targeted container is stopped", func(ctx SpecContext, forced bool) { diff --git a/injector/cpu_pressure.go b/injector/cpu_pressure.go index 989726ddb3..72ffcee394 100644 --- a/injector/cpu_pressure.go +++ b/injector/cpu_pressure.go @@ -7,10 +7,8 @@ package injector import ( "context" - "errors" "fmt" "math" - "os" "github.com/DataDog/chaos-controller/api/v1beta1" "github.com/DataDog/chaos-controller/command" @@ -113,15 +111,5 @@ func (i *cpuPressureInjector) UpdateConfig(config Config) { } func (i *cpuPressureInjector) Clean() error { - if i.backgroundCmd == nil { - return nil - } - - defer i.cancel() - - if err := i.backgroundCmd.Stop(); err != nil && !errors.Is(err, os.ErrProcessDone) { - return fmt.Errorf("unable to stop background process: %w", err) - } - - return nil + return stopAndWaitForBackgroundCmd(i.config.Log, i.backgroundCmd, i.cancel) } diff --git a/injector/cpu_pressure_test.go b/injector/cpu_pressure_test.go index 953c3286dc..aeac0a7690 100644 --- a/injector/cpu_pressure_test.go +++ b/injector/cpu_pressure_test.go @@ -120,9 +120,13 @@ var _ = Describe("CPU pressure", func() { }) It("succeed and call stop after proper inject", func() { + doneCh := make(chan struct{}) + close(doneCh) + background.EXPECT().Start().Return(nil).Once() background.EXPECT().KeepAlive().Once() background.EXPECT().Stop().Return(nil).Once() + background.EXPECT().Done().Return((<-chan struct{})(doneCh)).Once() inj := NewCPUPressureInjector(config, "100%", factory, args) diff --git a/injector/factory.go b/injector/factory.go index 84f91d07b5..f0e271607f 100644 --- a/injector/factory.go +++ b/injector/factory.go @@ -65,12 +65,21 @@ func (i injectorCmdFactory) NewInjectorBackgroundCmd(deadline time.Time, disrupt disruptionArgs.TargetContainers = map[string]string{target: targetContainerID} } + // Child subprocesses should not inherit pulse args — the parent process handles pulsing. + // Passing pulse args to children causes: (1) double-pulsing at node level where both + // parent and child independently pulse, and (2) re-spawned children unnecessarily + // waiting for pulse initial delay on every re-inject cycle. + childArgs := disruptionArgs + childArgs.PulseActiveDuration = 0 + childArgs.PulseDormantDuration = 0 + childArgs.PulseInitialDelay = 0 + args = append( args, ParentPIDFlag.String(), strconv.Itoa(i.processManager.ProcessID()), DeadlineFlag.String(), deadline.Format(time.RFC3339), ) - args = disruptionArgs.CreateCmdArgs(args) + args = childArgs.CreateCmdArgs(args) ctx, cancel := context.WithDeadline(context.Background(), deadline) cmd := i.cmdBuilder.NewCmd(ctx, ChaosInjectorBinaryLocation, args) diff --git a/injector/injector.go b/injector/injector.go index 01976dcc7e..3e4f7b7430 100644 --- a/injector/injector.go +++ b/injector/injector.go @@ -7,10 +7,14 @@ package injector import ( "context" + "errors" + "fmt" + "os" "time" chaosapi "github.com/DataDog/chaos-controller/api" "github.com/DataDog/chaos-controller/cgroup" + "github.com/DataDog/chaos-controller/command" "github.com/DataDog/chaos-controller/container" "github.com/DataDog/chaos-controller/netns" "github.com/DataDog/chaos-controller/o11y/metrics" @@ -65,3 +69,25 @@ func (c Config) TargetName() string { return UnknownTargetName } + +// stopAndWaitForBackgroundCmd stops a background command and waits for the process to fully exit +// before returning. This prevents cgroup race conditions during pulse mode re-injection. +func stopAndWaitForBackgroundCmd(log *zap.SugaredLogger, backgroundCmd command.BackgroundCmd, cancel context.CancelFunc) error { + if backgroundCmd == nil { + return nil + } + + defer cancel() + + if err := backgroundCmd.Stop(); err != nil && !errors.Is(err, os.ErrProcessDone) { + return fmt.Errorf("unable to stop background process: %w", err) + } + + select { + case <-backgroundCmd.Done(): + case <-time.After(5 * time.Second): + log.Warnw("timed out waiting for background process to exit") + } + + return nil +} diff --git a/injector/memory_pressure.go b/injector/memory_pressure.go index 1ec434d924..672923bfa8 100644 --- a/injector/memory_pressure.go +++ b/injector/memory_pressure.go @@ -7,9 +7,7 @@ package injector import ( "context" - "errors" "fmt" - "os" "time" "github.com/DataDog/chaos-controller/api/v1beta1" @@ -97,15 +95,5 @@ func (i *memoryPressureInjector) UpdateConfig(config Config) { } func (i *memoryPressureInjector) Clean() error { - if i.backgroundCmd == nil { - return nil - } - - defer i.cancel() - - if err := i.backgroundCmd.Stop(); err != nil && !errors.Is(err, os.ErrProcessDone) { - return fmt.Errorf("unable to stop background process: %w", err) - } - - return nil + return stopAndWaitForBackgroundCmd(i.config.Log, i.backgroundCmd, i.cancel) } diff --git a/injector/memory_pressure_test.go b/injector/memory_pressure_test.go index 17f6deedfb..6f67afd4e2 100644 --- a/injector/memory_pressure_test.go +++ b/injector/memory_pressure_test.go @@ -85,9 +85,13 @@ var _ = Describe("Memory pressure", func() { }) It("succeeds and calls stop after proper inject", func() { + doneCh := make(chan struct{}) + close(doneCh) + background.EXPECT().Start().Return(nil).Once() background.EXPECT().KeepAlive().Once() background.EXPECT().Stop().Return(nil).Once() + background.EXPECT().Done().Return((<-chan struct{})(doneCh)).Once() inj := NewMemoryPressureInjector(config, "50%", time.Duration(0), factory, args)