From e82cd96216c0ec4ba2ee7082be06876becedffe0 Mon Sep 17 00:00:00 2001 From: Olivia Song Date: Fri, 16 May 2025 22:46:15 +0000 Subject: [PATCH] add flag to paginzate list calls --- cmd/main.go | 3 +- pkg/config/controller_config.go | 6 +++ pkg/policyendpoints/manager.go | 4 +- pkg/resolvers/endpoints.go | 92 ++++++++++++++++++++------------- pkg/resolvers/endpoints_test.go | 6 +-- 5 files changed, 68 insertions(+), 43 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 643d404..a1c10e8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -102,6 +102,7 @@ func main() { setupLog.Info("Checking args for PE chunk size", "PEChunkSize", controllerCFG.EndpointChunkSize) setupLog.Info("Checking args for policy batch time", "NPBatchTime", controllerCFG.PodUpdateBatchPeriodDuration) setupLog.Info("Checking args for reconciler count", "ReconcilerCount", controllerCFG.MaxConcurrentReconciles) + setupLog.Info("Checking args for k8s list call page size", "ListPageSize", controllerCFG.ListPageSize) if controllerCFG.EnableConfigMapCheck { var cancelFn context.CancelFunc @@ -129,7 +130,7 @@ func main() { } policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(), - controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager")) + controllerCFG.EndpointChunkSize, controllerCFG.ListPageSize, ctrl.Log.WithName("endpoints-manager")) finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager")) policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager, controllerCFG, finalizerManager, ctrl.Log.WithName("controllers").WithName("policy")) diff --git a/pkg/config/controller_config.go b/pkg/config/controller_config.go index 4c48f4a..8ed02ec 100644 --- a/pkg/config/controller_config.go +++ b/pkg/config/controller_config.go @@ -12,10 +12,12 @@ const ( flagEnableConfigMapCheck = "enable-configmap-check" flagEndpointChunkSize = "endpoint-chunk-size" flagEnableGoProfiling = "enable-goprofiling" + flagListPageSize = "list-page-size" defaultLogLevel = "info" defaultMaxConcurrentReconciles = 3 defaultEndpointsChunkSize = 200 defaultEnableConfigMapCheck = true + defaultListPageSize = 1000 flagPodUpdateBatchPeriodDuration = "pod-update-batch-period-duration" defaultBatchPeriodDuration = 1 * time.Second defaultEnableGoProfiling = false @@ -37,6 +39,8 @@ type ControllerConfig struct { RuntimeConfig RuntimeConfig // EnableGoProfiling enables the goprofiling for dev purpose EnableGoProfiling bool + // ListPageSize specifies the page size for k8s list calls + ListPageSize int } func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) { @@ -52,5 +56,7 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) { "Duration between batch updates of pods") fs.BoolVar(&cfg.EnableGoProfiling, flagEnableGoProfiling, defaultEnableGoProfiling, "Enable goprofiling for develop purpose") + fs.IntVar(&cfg.ListPageSize, flagListPageSize, defaultListPageSize, + "Page size for k8s list calls") cfg.RuntimeConfig.BindFlags(fs) } diff --git a/pkg/policyendpoints/manager.go b/pkg/policyendpoints/manager.go index 9611f2e..f662353 100644 --- a/pkg/policyendpoints/manager.go +++ b/pkg/policyendpoints/manager.go @@ -29,8 +29,8 @@ type PolicyEndpointsManager interface { } // NewPolicyEndpointsManager constructs a new policyEndpointsManager -func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager { - endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver")) +func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, listPageSize int, logger logr.Logger) *policyEndpointsManager { + endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, listPageSize, logger.WithName("endpoints-resolver")) return &policyEndpointsManager{ k8sClient: k8sClient, endpointsResolver: endpointsResolver, diff --git a/pkg/resolvers/endpoints.go b/pkg/resolvers/endpoints.go index 0beca9e..9455c02 100644 --- a/pkg/resolvers/endpoints.go +++ b/pkg/resolvers/endpoints.go @@ -26,18 +26,20 @@ type EndpointsResolver interface { } // NewEndpointsResolver constructs a new defaultEndpointsResolver -func NewEndpointsResolver(k8sClient client.Client, logger logr.Logger) *defaultEndpointsResolver { +func NewEndpointsResolver(k8sClient client.Client, listPageSize int, logger logr.Logger) *defaultEndpointsResolver { return &defaultEndpointsResolver{ - k8sClient: k8sClient, - logger: logger, + k8sClient: k8sClient, + listPageSize: listPageSize, + logger: logger, } } var _ EndpointsResolver = (*defaultEndpointsResolver)(nil) type defaultEndpointsResolver struct { - k8sClient client.Client - logger logr.Logger + k8sClient client.Client + listPageSize int + logger logr.Logger } func (r *defaultEndpointsResolver) Resolve(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo, @@ -101,21 +103,47 @@ func (r *defaultEndpointsResolver) computeEgressEndpoints(ctx context.Context, p return egressEndpoints, nil } +// listPodsWithPagination lists pods with pagination to avoid large memory usage and timeout issue from api server side +func (r *defaultEndpointsResolver) listPodsWithPagination(ctx context.Context, selector labels.Selector, namespace string) ([]corev1.Pod, error) { + var allPods []corev1.Pod + continueToken := "" + + for { + podList := &corev1.PodList{} + if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ + LabelSelector: selector, + Namespace: namespace, + Limit: int64(r.listPageSize), + Continue: continueToken, + }); err != nil { + r.logger.Info("Unable to List Pods", "err", err) + return nil, err + } + + allPods = append(allPods, podList.Items...) + continueToken = podList.Continue + + if continueToken == "" { + break + } + } + + return allPods, nil +} + func (r *defaultEndpointsResolver) computePodSelectorEndpoints(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.PodEndpoint, error) { var podEndpoints []policyinfo.PodEndpoint podSelector, err := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector) if err != nil { return nil, errors.Wrap(err, "unable to get pod selector") } - podList := &corev1.PodList{} - if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ - LabelSelector: podSelector, - Namespace: policy.Namespace, - }); err != nil { - r.logger.Info("Unable to List Pods", "err", err) + + pods, err := r.listPodsWithPagination(ctx, podSelector, policy.Namespace) + if err != nil { return nil, err } - for _, pod := range podList.Items { + + for _, pod := range pods { podIP := k8s.GetPodIP(&pod) if len(podIP) > 0 { podEndpoints = append(podEndpoints, policyinfo.PodEndpoint{ @@ -212,18 +240,14 @@ func (r *defaultEndpointsResolver) resolveNetworkPeers(ctx context.Context, poli } func (r *defaultEndpointsResolver) getIngressRulesPorts(ctx context.Context, policyNamespace string, policyPodSelector *metav1.LabelSelector, ports []networking.NetworkPolicyPort) []policyinfo.Port { - podList := &corev1.PodList{} - if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ - LabelSelector: r.createPodLabelSelector(policyPodSelector), - Namespace: policyNamespace, - }); err != nil { - r.logger.Info("Unable to List Pods", "err", err) + pods, err := r.listPodsWithPagination(ctx, r.createPodLabelSelector(policyPodSelector), policyNamespace) + if err != nil { return nil } - r.logger.V(2).Info("list pods for ingress", "podList", *podList, "namespace", policyNamespace, "selector", *policyPodSelector) + r.logger.V(2).Info("list pods for ingress", "podsCount", len(pods), "namespace", policyNamespace, "selector", *policyPodSelector) var portList []policyinfo.Port - for _, pod := range podList.Items { + for _, pod := range pods { portList = append(portList, r.getPortList(pod, ports)...) r.logger.Info("Got ingress port from pod", "pod", types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}.String()) } @@ -338,17 +362,13 @@ func (r *defaultEndpointsResolver) getMatchingPodAddresses(ctx context.Context, } // populate src pods for ingress and dst pods for egress - podList := &corev1.PodList{} - if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ - LabelSelector: r.createPodLabelSelector(ls), - Namespace: namespace, - }); err != nil { - r.logger.Info("Unable to List Pods", "err", err) + pods, err := r.listPodsWithPagination(ctx, r.createPodLabelSelector(ls), namespace) + if err != nil { return nil } - r.logger.V(1).Info("Got pods for label selector", "count", len(podList.Items), "selector", ls.String()) + r.logger.V(1).Info("Got pods for label selector", "count", len(pods), "selector", ls.String()) - for _, pod := range podList.Items { + for _, pod := range pods { podIP := k8s.GetPodIP(&pod) if len(podIP) == 0 { continue @@ -463,19 +483,17 @@ func (r *defaultEndpointsResolver) getMatchingServicePort(ctx context.Context, s if err != nil { return 0, err } - podList := &corev1.PodList{} - if err := r.k8sClient.List(ctx, podList, &client.ListOptions{ - LabelSelector: podSelector, - Namespace: svc.Namespace, - }); err != nil { - r.logger.Info("Unable to List Pods", "err", err) + + pods, err := r.listPodsWithPagination(ctx, podSelector, svc.Namespace) + if err != nil { return 0, err } - for i := range podList.Items { - if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &podList.Items[i], *port, protocol); err == nil { + + for i := range pods { + if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &pods[i], *port, protocol); err == nil { return portVal, nil } else { - r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", podList.Items[i]) + r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", pods[i]) } } return 0, errors.Errorf("unable to find matching service listen port %s for service %s", port.String(), k8s.NamespacedName(svc)) diff --git a/pkg/resolvers/endpoints_test.go b/pkg/resolvers/endpoints_test.go index ccf99f2..8a56491 100644 --- a/pkg/resolvers/endpoints_test.go +++ b/pkg/resolvers/endpoints_test.go @@ -587,7 +587,7 @@ func TestEndpointsResolver_Resolve(t *testing.T) { defer ctrl.Finish() mockClient := mock_client.NewMockClient(ctrl) - resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{})) + resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{})) for _, item := range tt.args.podListCalls { call := item @@ -776,7 +776,7 @@ func TestEndpointsResolver_ResolveNetworkPeers(t *testing.T) { defer ctrl.Finish() mockClient := mock_client.NewMockClient(ctrl) - resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{})) + resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{})) var ingressEndpoints []policyinfo.EndpointInfo var egressEndpoints []policyinfo.EndpointInfo @@ -972,7 +972,7 @@ func TestEndpointsResolver_ResolveNetworkPeers_NamedIngressPortsIPBlocks(t *test defer ctrl.Finish() mockClient := mock_client.NewMockClient(ctrl) - resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{})) + resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{})) var ingressEndpoints []policyinfo.EndpointInfo ctx := context.TODO()