Skip to content
3 changes: 3 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ func main() {

cpc := containerprofilecache.NewContainerProfileCache(cfg, storageClient, k8sObjectCache, prometheusExporter)
cpc.Start(ctx)
if cpm, ok := containerProfileManager.(*containerprofilemanagerv1.ContainerProfileManager); ok {
cpm.SetCompletionNotifier(cpc)
}
logger.L().Info("ContainerProfileCache active; legacy AP/NN caches removed")

dc := dnscache.NewDnsCache(dnsResolver)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/joncrlsn/dque v0.0.0-20241024143830-7723fd131a64
github.com/kubescape/backend v0.0.39
github.com/kubescape/go-logger v0.0.28
github.com/kubescape/k8s-interface v0.0.207
github.com/kubescape/k8s-interface v0.0.210
github.com/kubescape/storage v0.0.258
github.com/kubescape/workerpool v0.0.0-20250526074519-0e4a4e7f44cf
github.com/moby/sys/mountinfo v0.7.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ github.com/kubescape/backend v0.0.39 h1:B1QRfKCSFlzuE+jWOnk/l7EpH71/Q3n14KKq0QSn
github.com/kubescape/backend v0.0.39/go.mod h1:cMEGP8cXUZgY89YU4GRBGIla9HZW7grZsUtlCwvZgAE=
github.com/kubescape/go-logger v0.0.28 h1:xulKTp9kOg3rD98sopFELQ6yZCHQoQXMDzteoSHDFKI=
github.com/kubescape/go-logger v0.0.28/go.mod h1:YZHFjwGCDar1hP9OyBLE46oR7a0Y/Z/0FperDo8+9D0=
github.com/kubescape/k8s-interface v0.0.207 h1:jX+EqZLjSArw4xa+XMvjnnoK0Q8IxdD2tvihwLa/WGg=
github.com/kubescape/k8s-interface v0.0.207/go.mod h1:WNYUG93aZ5kDmuaRKFLtVhp18Yc6EfaHdD1gLYtVTN4=
github.com/kubescape/k8s-interface v0.0.210 h1:3TiO3lYxdIHncoBRAMAMFdwanHmllUpYKFy5cG0h97o=
github.com/kubescape/k8s-interface v0.0.210/go.mod h1:WNYUG93aZ5kDmuaRKFLtVhp18Yc6EfaHdD1gLYtVTN4=
github.com/kubescape/storage v0.0.258 h1:0mL0z3dAmtP1qup7VgoEgwLgbBSROu5oOusBAPeMmus=
github.com/kubescape/storage v0.0.258/go.mod h1:VHs+xQzvZKE2lJDN8rR1sFmTa43N6XJAcatZ249gviU=
github.com/kubescape/syft v1.32.0-ks.2 h1:xdUksUmKEyyVKsTfJDYW8Z5HawVJtelsUolPOsWtDx0=
Expand Down
12 changes: 12 additions & 0 deletions pkg/containerprofilemanager/v1/containerprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ type ContainerProfileManager struct {
hostProfile *v1beta1.ContainerProfile
hostProfileMu sync.RWMutex
hostID string

completionNotifier objectcache.CompletionNotifier
}

func (cpm *ContainerProfileManager) SetCompletionNotifier(n objectcache.CompletionNotifier) {
cpm.completionNotifier = n
}

func (cpm *ContainerProfileManager) notifyCompleted(containerID string) {
if cpm.completionNotifier != nil {
cpm.completionNotifier.NotifyContainerCompleted(containerID)
}
}

// NewContainerProfileManager creates a new container profile manager
Expand Down
6 changes: 6 additions & 0 deletions pkg/containerprofilemanager/v1/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (cpm *ContainerProfileManager) monitorContainer(container *containercollect
helpers.String("status", string(watchedContainer.GetStatus())),
helpers.String("completionStatus", string(watchedContainer.GetCompletionStatus())))
}
if watchedContainer.GetStatus() == objectcache.WatchedContainerStatusCompleted {
cpm.notifyCompleted(watchedContainer.ContainerID)
}
// Signal ack to lifecycle goroutine
if watchedContainer.AckChan != nil {
watchedContainer.AckChan <- struct{}{}
Expand All @@ -63,6 +66,7 @@ func (cpm *ContainerProfileManager) monitorContainer(container *containercollect
helpers.String("status", string(watchedContainer.GetStatus())),
helpers.String("completionStatus", string(watchedContainer.GetCompletionStatus())))
}
cpm.notifyCompleted(watchedContainer.ContainerID)
// Signal ack to lifecycle goroutine
if watchedContainer.AckChan != nil {
watchedContainer.AckChan <- struct{}{}
Expand Down Expand Up @@ -92,11 +96,13 @@ func (cpm *ContainerProfileManager) handleSaveProfileError(err error, watchedCon
watchedContainer.SetStatus(objectcache.WatchedContainerStatusTooLarge)
cpm.deleteContainer(container)
cpm.notifyContainerEndOfLife(container)
cpm.notifyCompleted(watchedContainer.ContainerID)
return file.ObjectTooLargeError
} else if err.Error() == file.ObjectCompletedError.Error() {
watchedContainer.SetStatus(objectcache.WatchedContainerStatusCompleted)
cpm.deleteContainer(container)
cpm.notifyContainerEndOfLife(container)
cpm.notifyCompleted(watchedContainer.ContainerID)
return file.ObjectCompletedError
} else {
logger.L().Error("failed to save container profile", helpers.Error(err),
Expand Down
9 changes: 9 additions & 0 deletions pkg/objectcache/completion_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package objectcache

// CompletionNotifier is implemented by ContainerProfileCacheImpl. The
// containerprofilemanager calls NotifyContainerCompleted when it writes a
// container profile with status="completed" to storage, allowing the CP cache
// to promote any pending entry without waiting for the next reconciler tick.
type CompletionNotifier interface {
NotifyContainerCompleted(containerID string)
}
79 changes: 66 additions & 13 deletions pkg/objectcache/containerprofilecache/containerprofilecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ type ContainerProfileCacheImpl struct {
k8sObjectCache objectcache.K8sObjectCache
metricsManager metricsmanager.MetricsManager

reconcileEvery time.Duration
rpcBudget time.Duration
reconcileEvery time.Duration
rpcBudget time.Duration
refreshInProgress atomic.Bool

// deprecationDedup tracks (kind|ns/name@rv) keys to emit one WARN log
Expand Down Expand Up @@ -139,7 +139,7 @@ func NewContainerProfileCache(cfg config.Config, storageClient storage.ProfileCl
if rpcBudget <= 0 {
rpcBudget = defaultStorageRPCBudget
}
return &ContainerProfileCacheImpl{
c := &ContainerProfileCacheImpl{
cfg: cfg,
containerLocks: resourcelocks.New(),
storageClient: storageClient,
Expand All @@ -149,6 +149,14 @@ func NewContainerProfileCache(cfg config.Config, storageClient storage.ProfileCl
rpcBudget: rpcBudget,
nudge: make(chan struct{}, 1),
}
// Pre-initialize SafeMap internal maps: Load() reads m.items == nil without
// a lock while Set() writes m.items under a write lock, causing a data race
// on the first concurrent access to a zero-value SafeMap.
c.entries.Set("", nil)
c.entries.Delete("")
c.pending.Set("", nil)
c.pending.Delete("")
return c
}

func shouldLogOptionalUserManagedFetchError(err error) bool {
Expand Down Expand Up @@ -362,16 +370,17 @@ func (c *ContainerProfileCacheImpl) tryPopulateEntry(
}
}

// Fix (reviewer #3): if the consolidated CP is still Partial and this
// container is not PreRunning (i.e. we saw it start fresh after the
// agent was already up), the partial view belongs to a PREVIOUS container
// incarnation. Legacy caches explicitly deleted such partials on restart
// so rule evaluation fell through to "no profile" until a new Full
// profile arrived. Mirror that: keep pending, retry each tick.
if !sharedData.PreRunningContainer {
if cp != nil && cp.Annotations[helpersv1.CompletionMetadataKey] == helpersv1.Partial {
cp = nil
}
// Only cache profiles whose status is terminal (Completed or TooLarge).
// Learning/ready profiles are still being written; caching them would let
// rules fire against incomplete data. TooLarge is terminal: the manager
// stopped collecting but the truncated data is still valid for detection.
// Return false so the synthetic-CP fallback below does not bypass the gate.
if cp != nil && !isTerminalCPStatus(cp.Annotations[helpersv1.StatusMetadataKey]) {
logger.L().Debug("tryPopulateEntry: CP status not terminal; keeping pending",
helpers.String("containerID", containerID),
helpers.String("namespace", ns),
helpers.String("status", cp.Annotations[helpersv1.StatusMetadataKey]))
return false
}

// Fetch user-authored legacy CRDs when the pod carries the
Expand Down Expand Up @@ -659,5 +668,49 @@ func (c *ContainerProfileCacheImpl) waitForSharedContainerData(containerID strin
}, backoff.WithBackOff(backoff.NewExponentialBackOff()))
}

// NotifyContainerCompleted is called by containerprofilemanager when it writes a
// CP with status="completed". If the container is still pending it launches a
// bounded retry goroutine (up to 5 attempts × 3 s) so the cache entry is
// promoted within seconds of the consolidation cycle completing, without waiting
// for the next 30 s reconciler tick.
func (c *ContainerProfileCacheImpl) NotifyContainerCompleted(containerID string) {
p, pending := c.pending.Load(containerID)
if !pending {
return
}
go func() {
for i := 0; i < 20; i++ {
if i > 0 {
time.Sleep(3 * time.Second)
}
if _, still := c.pending.Load(containerID); !still {
return
}
ctx, cancel := context.WithTimeout(context.Background(), c.rpcBudget)
var promoted bool
c.containerLocks.WithLock(containerID, func() {
if _, still := c.pending.Load(containerID); still {
promoted = c.tryPopulateEntry(ctx, containerID, p.container, p.sharedData, p.cpName, p.workloadName)
}
})
cancel()
if promoted {
return
}
}
}()
}

// isTerminalCPStatus reports whether the CP status annotation value represents
// a terminal state that the cache should accept. Terminal states are:
// - Completed: learning period finished normally.
// - TooLarge: manager stopped collecting because the profile grew too large;
// the truncated data is still valid for rule evaluation.
//
// Learning ("ready") is not terminal — the CP is still being written.
func isTerminalCPStatus(status string) bool {
return status == helpersv1.Completed || status == helpersv1.TooLarge
}

// Ensure ContainerProfileCacheImpl implements the ContainerProfileCache interface.
var _ objectcache.ContainerProfileCache = (*ContainerProfileCacheImpl)(nil)
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ func TestSharedFastPath_NoOverlay(t *testing.T) {
// is merged into the projected profile.
func TestOverlayPath_DeepCopies(t *testing.T) {
cp := &v1beta1.ContainerProfile{
ObjectMeta: metav1.ObjectMeta{Name: "cp-1", Namespace: "default", ResourceVersion: "1"},
Spec: v1beta1.ContainerProfileSpec{Capabilities: []string{"SYS_PTRACE"}},
ObjectMeta: metav1.ObjectMeta{
Name: "cp-1", Namespace: "default", ResourceVersion: "1",
Annotations: map[string]string{helpersv1.StatusMetadataKey: helpersv1.Completed},
},
Spec: v1beta1.ContainerProfileSpec{Capabilities: []string{"SYS_PTRACE"}},
}
userAP := &v1beta1.ApplicationProfile{
ObjectMeta: metav1.ObjectMeta{Name: "override", Namespace: "default", ResourceVersion: "u1"},
Expand Down Expand Up @@ -207,7 +210,10 @@ func TestOverlayPath_DeepCopies(t *testing.T) {
// fresh mutex.
func TestDeleteContainer_LockAndCleanup(t *testing.T) {
cp := &v1beta1.ContainerProfile{
ObjectMeta: metav1.ObjectMeta{Name: "cp-delete", Namespace: "default", ResourceVersion: "1"},
ObjectMeta: metav1.ObjectMeta{
Name: "cp-delete", Namespace: "default", ResourceVersion: "1",
Annotations: map[string]string{helpersv1.StatusMetadataKey: helpersv1.Completed},
},
}
client := &fakeProfileClient{cp: cp}
c, k8s := newTestCache(t, client)
Expand Down Expand Up @@ -283,7 +289,10 @@ func TestContainerCallback_HostContainer(t *testing.T) {
// GetCallStackSearchTree.
func TestCallStackIndexBuiltFromProfile(t *testing.T) {
cp := &v1beta1.ContainerProfile{
ObjectMeta: metav1.ObjectMeta{Name: "cp-stack", Namespace: "default", ResourceVersion: "1"},
ObjectMeta: metav1.ObjectMeta{
Name: "cp-stack", Namespace: "default", ResourceVersion: "1",
Annotations: map[string]string{helpersv1.StatusMetadataKey: helpersv1.Completed},
},
Spec: v1beta1.ContainerProfileSpec{
IdentifiedCallStacks: []v1beta1.IdentifiedCallStack{
{
Expand Down
44 changes: 26 additions & 18 deletions pkg/objectcache/containerprofilecache/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
//
// The reconciler is the safety-net eviction path AND the freshness refresh
// loop. Each tick it:
// 1. reconcileOnce: evicts cache entries whose pod is gone or whose
// container is no longer Running.
// 2. refreshAllEntries (single-flight via atomic flag): re-fetches the
// consolidated CP, the workload-level AP+NN, the user-managed
// "ug-<workload>" AP+NN, and any label-referenced user AP/NN overlay,
// then rebuilds the projection iff any resourceVersion changed. Fast-skip
// when every RV matches what's already cached.
// 1. reconcileOnce: evicts cache entries whose pod is gone or whose
// container is no longer Running.
// 2. refreshAllEntries (single-flight via atomic flag): re-fetches the
// consolidated CP, the workload-level AP+NN, the user-managed
// "ug-<workload>" AP+NN, and any label-referenced user AP/NN overlay,
// then rebuilds the projection iff any resourceVersion changed. Fast-skip
// when every RV matches what's already cached.
//
// RPC cost @ 300 containers / 30s cadence steady-state: up to 7 gets per
// entry per tick (CP + 3×AP + 3×NN). At 300 entries that's 70 RPC/s in the
Expand All @@ -32,8 +32,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// tickLoop drives the reconciler. Evict runs synchronously on the tick;
// refresh runs on a single-flight goroutine guarded by refreshInProgress so a
// tickLoop drives the reconciler. Each tick it evicts terminated containers,
// retries pending entries, and refreshes all cached entries. Pending-entry
// retries are also triggered immediately via NotifyContainerCompleted when the
// containerprofilemanager writes a CP with status="completed".
//
// Refresh runs on a single-flight goroutine guarded by refreshInProgress so a
// slow refresh never stacks.
func (c *ContainerProfileCacheImpl) tickLoop(ctx context.Context) {
if c.reconcileEvery == 0 {
Expand Down Expand Up @@ -273,12 +277,9 @@ func (c *ContainerProfileCacheImpl) refreshAllEntries(ctx context.Context) {
//
// base CP → workload AP+NN → user-managed (ug-) AP+NN → user overlay AP+NN.
//
// We intentionally DO NOT re-apply the partial-on-non-PreRunning gate here:
// any entry that survived addContainer already passed that gate (or was
// PreRunning), so refresh can accept partial profiles freely. (Fix B for
// Test_17 / Test_19: the workload AP/NN must be re-fetched each tick so a
// "ready" -> "completed" transition propagates to ProfileState.Status, which
// in turn promotes fail_on_profile from false to true.)
// The completed-only gate is re-applied here: if the CP regresses to a
// non-Completed status we keep the existing cached entry rather than
// projecting stale/incomplete data.
func (c *ContainerProfileCacheImpl) refreshOneEntry(ctx context.Context, id string, e *CachedContainerProfile) {
// Resurrection guard (reviewer #1): refreshAllEntries snapshots entries
// without holding containerLocks, so a concurrent deleteContainer /
Expand Down Expand Up @@ -322,6 +323,13 @@ func (c *ContainerProfileCacheImpl) refreshOneEntry(ctx context.Context, id stri
helpers.Error(cpErr))
cp = nil
}
if cp != nil && !isTerminalCPStatus(cp.Annotations[helpersv1.StatusMetadataKey]) {
logger.L().Debug("refreshOneEntry: CP status not terminal; keeping cached entry",
helpers.String("containerID", id),
helpers.String("cpName", e.CPName),
helpers.String("status", cp.Annotations[helpersv1.StatusMetadataKey]))
return
}
var userManagedAP *v1beta1.ApplicationProfile
var userManagedNN *v1beta1.NetworkNeighborhood
if e.WorkloadName != "" {
Expand Down Expand Up @@ -519,9 +527,9 @@ func (c *ContainerProfileCacheImpl) rebuildEntryFromSources(
}

newEntry := &CachedContainerProfile{
Projected: projectedCP,
SpecHash: projectedCP.SpecHash,
State: &objectcache.ProfileState{Completion: effectiveCP.Annotations[helpersv1.CompletionMetadataKey], Status: effectiveCP.Annotations[helpersv1.StatusMetadataKey], Name: effectiveCP.Name},
Projected: projectedCP,
SpecHash: projectedCP.SpecHash,
State: &objectcache.ProfileState{Completion: effectiveCP.Annotations[helpersv1.CompletionMetadataKey], Status: effectiveCP.Annotations[helpersv1.StatusMetadataKey], Name: effectiveCP.Name},
CallStackTree: tree,
ContainerName: prev.ContainerName,
PodName: prev.PodName,
Expand Down
Loading
Loading