Conversation
e2e/cluster.go
Outdated
| if errors.As(err, &azErr) && azErr.StatusCode == 404 { | ||
| return createNewMaintenanceConfiguration(ctx, cluster) | ||
| _, err = createNewMaintenanceConfiguration(ctx, cluster) | ||
| return err |
There was a problem hiding this comment.
In the 404 path, ensureMaintenanceConfiguration returns the raw error from createNewMaintenanceConfiguration without adding cluster context, while the non-404 path wraps errors with the cluster name. Wrap the 404/create error similarly so failures are attributable to the cluster being prepared.
| return err | |
| if err != nil { | |
| return fmt.Errorf("failed to create maintenance configuration 'default' for cluster %q: %w", *cluster.Name, err) | |
| } | |
| return nil |
| func (g *Group) recordError(err error) { | ||
| g.mu.Lock() | ||
| g.errs = append(g.errs, err) | ||
| g.mu.Unlock() | ||
| g.cancel() | ||
| } |
There was a problem hiding this comment.
Group.recordError records every non-nil error, including context.Canceled/context.DeadlineExceeded from tasks that are only exiting because another task failed and canceled the group. In failure cases this can flood DAGError with cancellation noise and obscure the root cause. Consider suppressing context cancellation errors once the group has already observed a “real” failure (or only recording the first error).
| // launch runs fn in a new goroutine after all deps complete. | ||
| // If any dep failed or ctx is cancelled, onSkip is called instead of fn. | ||
| func (g *Group) launch(deps []Dep, fn func(), onSkip func()) { | ||
| g.wg.Add(1) | ||
| go func() { | ||
| defer g.wg.Done() | ||
|
|
||
| for _, d := range deps { | ||
| d.wait() | ||
| } | ||
|
|
||
| for _, d := range deps { | ||
| if d.failed() { | ||
| onSkip() | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if g.ctx.Err() != nil { | ||
| onSkip() | ||
| return | ||
| } | ||
|
|
||
| fn() | ||
| }() |
There was a problem hiding this comment.
The executor blocks on Dep.wait() with no cycle detection. If a cycle is accidentally introduced (e.g., via wiring mistakes), Wait() will hang indefinitely. Consider adding a lightweight cycle check (even just in debug/tests), or documenting this sharp edge prominently so future uses don’t create non-obvious deadlocks.
| ) | ||
|
|
||
| func getClusterKubeClient(ctx context.Context, resourceGroupName, clusterName string) (*Kubeclient, error) { | ||
| func getClusterKubeClient(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*Kubeclient, error) { |
There was a problem hiding this comment.
getClusterKubeClient now dereferences cluster.Location and cluster.Name without validating the input. Since the signature changed from (resourceGroupName, clusterName string) to (*ManagedCluster), it would be safer to add nil checks for cluster, cluster.Location, and cluster.Name and return a descriptive error instead of panicking.
| func getClusterKubeClient(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*Kubeclient, error) { | |
| func getClusterKubeClient(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*Kubeclient, error) { | |
| if cluster == nil { | |
| return nil, fmt.Errorf("managed cluster is nil") | |
| } | |
| if cluster.Location == nil || *cluster.Location == "" { | |
| return nil, fmt.Errorf("managed cluster location is nil or empty") | |
| } | |
| if cluster.Name == nil || *cluster.Name == "" { | |
| return nil, fmt.Errorf("managed cluster name is nil or empty") | |
| } |
| } | ||
|
|
||
| func getClusterSubnetID(ctx context.Context, mcResourceGroupName string) (string, error) { | ||
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { |
There was a problem hiding this comment.
getClusterSubnetID now relies on cluster.Properties.NodeResourceGroup and dereferences it directly. With the new *ManagedCluster signature, add guards for cluster == nil, cluster.Properties == nil, and cluster.Properties.NodeResourceGroup == nil to avoid panics and return a clearer error when cluster metadata is incomplete.
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { | |
| func getClusterSubnetID(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (string, error) { | |
| if cluster == nil { | |
| return "", fmt.Errorf("managed cluster is nil") | |
| } | |
| if cluster.Properties == nil { | |
| return "", fmt.Errorf("managed cluster properties are nil") | |
| } | |
| if cluster.Properties.NodeResourceGroup == nil || *cluster.Properties.NodeResourceGroup == "" { | |
| return "", fmt.Errorf("managed cluster NodeResourceGroup is nil or empty") | |
| } |
| if !isNetworkIsolated { | ||
| dag.Run(g, func(ctx context.Context) error { return addFirewallRules(ctx, cluster) }) | ||
| } | ||
| if isNetworkIsolated { | ||
| if err := addNetworkIsolatedSettings(ctx, cluster, *cluster.Location); err != nil { | ||
| return nil, fmt.Errorf("add network isolated settings: %w", err) | ||
| } | ||
| } | ||
| if !isNetworkIsolated { // network isolated cluster blocks all egress via NSG | ||
| if err := addFirewallRules(ctx, cluster, *cluster.Location); err != nil { | ||
| return nil, fmt.Errorf("add firewall rules: %w", err) | ||
| } | ||
| dag.Run(g, func(ctx context.Context) error { return addNetworkIsolatedSettings(ctx, cluster) }) | ||
| } | ||
| needACR := isNetworkIsolated || attachPrivateAcr | ||
| acrNonAnon := dag.Run2(g, kube, identity, addACR(cluster, needACR, true)) | ||
| acrAnon := dag.Run2(g, kube, identity, addACR(cluster, needACR, false)) | ||
| dag.Run1(g, kube, ensureDebugDaemonsets(cluster, isNetworkIsolated), acrNonAnon, acrAnon) | ||
| extract := dag.Go1(g, kube, extractClusterParams(cluster)) |
There was a problem hiding this comment.
prepareCluster no longer enforces the previous ordering where network changes (firewall rules / network-isolated subnet+NSG updates) completed before EnsureDebugDaemonsets runs. Because the daemonset creation triggers pod scheduling/image pulls, running it concurrently with route table/NSG updates can introduce e2e flakiness. Consider capturing the firewall/NSG task(s) as Effect values and adding them as explicit dependencies to the Run1(... ensureDebugDaemonsets ...) task (and possibly any other k8s API tasks that assume steady node egress).
Implements Execute() which discovers the DAG, validates for cycles, and runs tasks concurrently respecting dependency order. Supports CancelDependents (skip downstream on failure) and CancelAll (cancel context) error strategies, plus MaxConcurrency semaphore limiting.
Adds integration tests mirroring the complete spec example (CreateRG → CreateVNet → CreateSubnet → CreateCluster → RunTests → Teardown), plus tests for transitive dependency access, mid-pipeline failure with CancelDependents, shared tasks across independent subgraphs, and empty graph. Removes unused dead code that triggered go vet copylock warning.
- Replace 9-param notifyDependents with runState struct methods - Eliminate redundant `failed` map — derive from results directly - Collapse 3 near-identical early-exit blocks into abort() helper - Launch dependents outside the mutex (collect-then-launch) - Remove spurious mu.Lock after wg.Wait (all goroutines done) - Sort DAGError.Error() output for deterministic error messages - Add ErrorStrategy.String() for consistency with TaskStatus - Remove dead test code (unused shared var, dead twoDepTask type) - Extract buildSpecDAG() helper to reduce copy-paste in tests - Reduce pre-canceled context test timeout from 2s to 100ms
Add e2e/dag package — a lightweight, type-safe DAG executor using Go generics. Two verbs: Go (returns value) and Run (side-effect), with numbered variants (Go1-Go3, Run1-Run3) for typed dependency injection. Replace the sequential prepareCluster implementation with a DAG version that runs independent tasks (bastion, subnet, kube, identity, firewall, garbage collection, etc.) concurrently after cluster creation completes. Also fix pre-existing fmt.Sprintf %%w usage in config/config.go. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace the clusterSetup struct and its 12 one-liner methods with inline closures. Each closure is 1-2 lines binding the cluster local to the real function call. Everything reads top-to-bottom in one place. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Move cluster creation back into the DAG so all tasks use typed dependency injection. Where function signatures match exactly (getOrCreateBastion, getClusterKubeletIdentity, collectGarbageVMSS), pass them directly without closures. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Extract named helpers for conditional tasks (configureFirewall, configureNetworkIsolation, setupACR, ensureDebugDaemonsets) and update function signatures (getClusterSubnetID, getClusterKubeClient, extractClusterParameters) to accept *ManagedCluster directly so they can be passed to the DAG without closures. prepareCluster now reads as a pure declarative DAG with no inline anonymous functions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- addFirewallRules/addNetworkIsolatedSettings: derive location from cluster instead of taking it as a param. Use conditional DAG registration instead of runtime checks. - addPrivateAzureContainerRegistry: derive resourceGroupName from cluster.Location internally. - ensureMaintenanceConfiguration: replaces getOrCreate wrapper, returns error only (value was never used). - getClusterSubnetID/getClusterKubeClient: take *ManagedCluster instead of extracted strings. - GoN/RunN: accept optional extra ...Dep barrier deps so typed dep tasks can also wait on untyped barriers (e.g. ACR effects). Remaining helpers (newClusterTask, addACRTask, addDebugDaemonsets) exist because they genuinely need parameter binding that can't be absorbed into the underlying functions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Pull cluster creation out of the DAG. Use generic bind/bindRun helpers to pass functions directly for tasks that only need cluster. Remaining factory helpers (addACR, ensureDebugDaemonsets, extractClusterParams) bind cluster to functions that also receive DAG-provided values (kube, identity) — these can't be eliminated without putting cluster back in the DAG. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Fix package doc to reference correct API names (Go/Run not Spawn/Do) - Fix section comments in tests to match current API names - Wait() now surfaces ctx.Err() when parent context is cancelled - TestParentContextCancelled asserts error is context.Canceled - ensureMaintenanceConfiguration 404 path wraps error with cluster context - Firewall/NSG tasks are now dependencies of ensureDebugDaemonsets
Add comprehensive test coverage including: - Go1-Go3/Run1-Run3 success and skip-on-dep-failure paths - Diamond topology, transitive skip, cancellation noise - Parent context cancellation and deadline exceeded - DAGError formatting (single and multiple errors) - Effect as dependency, empty group, Result.Get/MustGet - Cycle behavior documentation (typed API compile-time safety, untyped API deadlock, self-dependency deadlock) Clean up test comments to keep tests concise.
- Add panic recovery in launch() so panics in task functions are captured as errors instead of crashing the process. - Unify onSkip/onPanic into a single onFail(error) callback. - Update MustGet docs to reference actual API names. - Document cycle-deadlock limitation in package comment. - Add tests for panic in Go, Run, and downstream skip after panic.
| identity := dag.Go(g, func(ctx context.Context) (*armcontainerservice.UserAssignedIdentity, error) { | ||
| return getClusterKubeletIdentity(ctx, cluster) |
There was a problem hiding this comment.
getClusterKubeletIdentity now takes a context.Context but doesn't use it. Since this is just to fit the dag.Go signature, consider keeping the original signature and wrapping it in an inline closure in prepareCluster (avoids unnecessary API churn and keeps the function's contract minimal).
| identity := dag.Go(g, func(ctx context.Context) (*armcontainerservice.UserAssignedIdentity, error) { | |
| return getClusterKubeletIdentity(ctx, cluster) | |
| identity := dag.Go(g, func(context.Context) (*armcontainerservice.UserAssignedIdentity, error) { | |
| return getClusterKubeletIdentity(cluster) |
timmy-wright
left a comment
There was a problem hiding this comment.
LGTM. Worthwhile looking at the copilot comments tho. I'm surprised there's not already a DAG library we can use.
addFirewallRules and collectGarbageVMSS both trigger Azure subnet/route table mutations. Running them concurrently (introduced in #8149) can cause the AKS-managed pod route table to be detached from the subnet: 1. addFirewallRules associates abe2e-fw-rt with aks-subnet (displacing the AKS route table) 2. collectGarbageVMSS deletes stale VMSS, triggering cloud-controller route reconciliation that races with the subnet association 3. getOrCreateBastion creates AzureBastionSubnet (VNet-level mutation) The race leaves aks-agentpool-*-routetable with 0 subnet associations, so CCM-programmed pod routes are unreachable. Nodes stay stuck on NetworkUnavailable: 'Waiting for cloud routes' forever. Fix: add DAG edges so that: - addFirewallRules / addNetworkIsolatedSettings wait for bastion (serialize VNet subnet mutations) - collectGarbageVMSS waits for network setup (prevent VMSS deletion from racing with route table association) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
What this PR does / why we need it:
Adds a new
e2e/dagpackage that provides a lightweight, type-safe DAG execution library using closures and generics, then refactorsprepareClusterto use it for concurrent cluster setup.