Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func main() {
setupLog.Info("Checking args for PE chunk size", "PEChunkSize", controllerCFG.EndpointChunkSize)
setupLog.Info("Checking args for policy batch time", "NPBatchTime", controllerCFG.PodUpdateBatchPeriodDuration)
setupLog.Info("Checking args for reconciler count", "ReconcilerCount", controllerCFG.MaxConcurrentReconciles)
setupLog.Info("Checking args for k8s list call page size", "ListPageSize", controllerCFG.ListPageSize)

if controllerCFG.EnableConfigMapCheck {
var cancelFn context.CancelFunc
Expand Down Expand Up @@ -129,7 +130,7 @@ func main() {
}

policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(),
controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager"))
controllerCFG.EndpointChunkSize, controllerCFG.ListPageSize, ctrl.Log.WithName("endpoints-manager"))
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager"))
policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager,
controllerCFG, finalizerManager, ctrl.Log.WithName("controllers").WithName("policy"))
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/controller_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ const (
flagEnableConfigMapCheck = "enable-configmap-check"
flagEndpointChunkSize = "endpoint-chunk-size"
flagEnableGoProfiling = "enable-goprofiling"
flagListPageSize = "list-page-size"
defaultLogLevel = "info"
defaultMaxConcurrentReconciles = 3
defaultEndpointsChunkSize = 200
defaultEnableConfigMapCheck = true
defaultListPageSize = 1000
flagPodUpdateBatchPeriodDuration = "pod-update-batch-period-duration"
defaultBatchPeriodDuration = 1 * time.Second
defaultEnableGoProfiling = false
Expand All @@ -37,6 +39,8 @@ type ControllerConfig struct {
RuntimeConfig RuntimeConfig
// EnableGoProfiling enables the goprofiling for dev purpose
EnableGoProfiling bool
// ListPageSize specifies the page size for k8s list calls
ListPageSize int
}

func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
Expand All @@ -52,5 +56,7 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
"Duration between batch updates of pods")
fs.BoolVar(&cfg.EnableGoProfiling, flagEnableGoProfiling, defaultEnableGoProfiling,
"Enable goprofiling for develop purpose")
fs.IntVar(&cfg.ListPageSize, flagListPageSize, defaultListPageSize,
"Page size for k8s list calls")
cfg.RuntimeConfig.BindFlags(fs)
}
4 changes: 2 additions & 2 deletions pkg/policyendpoints/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type PolicyEndpointsManager interface {
}

// NewPolicyEndpointsManager constructs a new policyEndpointsManager
func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager {
endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver"))
func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, listPageSize int, logger logr.Logger) *policyEndpointsManager {
endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, listPageSize, logger.WithName("endpoints-resolver"))
return &policyEndpointsManager{
k8sClient: k8sClient,
endpointsResolver: endpointsResolver,
Expand Down
92 changes: 55 additions & 37 deletions pkg/resolvers/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ type EndpointsResolver interface {
}

// NewEndpointsResolver constructs a new defaultEndpointsResolver
func NewEndpointsResolver(k8sClient client.Client, logger logr.Logger) *defaultEndpointsResolver {
func NewEndpointsResolver(k8sClient client.Client, listPageSize int, logger logr.Logger) *defaultEndpointsResolver {
return &defaultEndpointsResolver{
k8sClient: k8sClient,
logger: logger,
k8sClient: k8sClient,
listPageSize: listPageSize,
logger: logger,
}
}

var _ EndpointsResolver = (*defaultEndpointsResolver)(nil)

type defaultEndpointsResolver struct {
k8sClient client.Client
logger logr.Logger
k8sClient client.Client
listPageSize int
logger logr.Logger
}

func (r *defaultEndpointsResolver) Resolve(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo,
Expand Down Expand Up @@ -101,21 +103,47 @@ func (r *defaultEndpointsResolver) computeEgressEndpoints(ctx context.Context, p
return egressEndpoints, nil
}

// listPodsWithPagination lists pods with pagination to avoid large memory usage and timeout issue from api server side
func (r *defaultEndpointsResolver) listPodsWithPagination(ctx context.Context, selector labels.Selector, namespace string) ([]corev1.Pod, error) {
var allPods []corev1.Pod
continueToken := ""

for {
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: selector,
Namespace: namespace,
Limit: int64(r.listPageSize),
Continue: continueToken,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
return nil, err
}

allPods = append(allPods, podList.Items...)
continueToken = podList.Continue

if continueToken == "" {
break
}
}

return allPods, nil
}

func (r *defaultEndpointsResolver) computePodSelectorEndpoints(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.PodEndpoint, error) {
var podEndpoints []policyinfo.PodEndpoint
podSelector, err := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
if err != nil {
return nil, errors.Wrap(err, "unable to get pod selector")
}
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: podSelector,
Namespace: policy.Namespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)

pods, err := r.listPodsWithPagination(ctx, podSelector, policy.Namespace)
if err != nil {
return nil, err
}
for _, pod := range podList.Items {

for _, pod := range pods {
podIP := k8s.GetPodIP(&pod)
if len(podIP) > 0 {
podEndpoints = append(podEndpoints, policyinfo.PodEndpoint{
Expand Down Expand Up @@ -212,18 +240,14 @@ func (r *defaultEndpointsResolver) resolveNetworkPeers(ctx context.Context, poli
}

func (r *defaultEndpointsResolver) getIngressRulesPorts(ctx context.Context, policyNamespace string, policyPodSelector *metav1.LabelSelector, ports []networking.NetworkPolicyPort) []policyinfo.Port {
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: r.createPodLabelSelector(policyPodSelector),
Namespace: policyNamespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
pods, err := r.listPodsWithPagination(ctx, r.createPodLabelSelector(policyPodSelector), policyNamespace)
if err != nil {
return nil
}

r.logger.V(2).Info("list pods for ingress", "podList", *podList, "namespace", policyNamespace, "selector", *policyPodSelector)
r.logger.V(2).Info("list pods for ingress", "podsCount", len(pods), "namespace", policyNamespace, "selector", *policyPodSelector)
var portList []policyinfo.Port
for _, pod := range podList.Items {
for _, pod := range pods {
portList = append(portList, r.getPortList(pod, ports)...)
r.logger.Info("Got ingress port from pod", "pod", types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}.String())
}
Expand Down Expand Up @@ -338,17 +362,13 @@ func (r *defaultEndpointsResolver) getMatchingPodAddresses(ctx context.Context,
}

// populate src pods for ingress and dst pods for egress
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: r.createPodLabelSelector(ls),
Namespace: namespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)
pods, err := r.listPodsWithPagination(ctx, r.createPodLabelSelector(ls), namespace)
if err != nil {
return nil
}
r.logger.V(1).Info("Got pods for label selector", "count", len(podList.Items), "selector", ls.String())
r.logger.V(1).Info("Got pods for label selector", "count", len(pods), "selector", ls.String())

for _, pod := range podList.Items {
for _, pod := range pods {
podIP := k8s.GetPodIP(&pod)
if len(podIP) == 0 {
continue
Expand Down Expand Up @@ -463,19 +483,17 @@ func (r *defaultEndpointsResolver) getMatchingServicePort(ctx context.Context, s
if err != nil {
return 0, err
}
podList := &corev1.PodList{}
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
LabelSelector: podSelector,
Namespace: svc.Namespace,
}); err != nil {
r.logger.Info("Unable to List Pods", "err", err)

pods, err := r.listPodsWithPagination(ctx, podSelector, svc.Namespace)
if err != nil {
return 0, err
}
for i := range podList.Items {
if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &podList.Items[i], *port, protocol); err == nil {

for i := range pods {
if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &pods[i], *port, protocol); err == nil {
return portVal, nil
} else {
r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", podList.Items[i])
r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", pods[i])
}
}
return 0, errors.Errorf("unable to find matching service listen port %s for service %s", port.String(), k8s.NamespacedName(svc))
Expand Down
6 changes: 3 additions & 3 deletions pkg/resolvers/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func TestEndpointsResolver_Resolve(t *testing.T) {
defer ctrl.Finish()

mockClient := mock_client.NewMockClient(ctrl)
resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{}))
resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{}))

for _, item := range tt.args.podListCalls {
call := item
Expand Down Expand Up @@ -776,7 +776,7 @@ func TestEndpointsResolver_ResolveNetworkPeers(t *testing.T) {
defer ctrl.Finish()

mockClient := mock_client.NewMockClient(ctrl)
resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{}))
resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{}))

var ingressEndpoints []policyinfo.EndpointInfo
var egressEndpoints []policyinfo.EndpointInfo
Expand Down Expand Up @@ -972,7 +972,7 @@ func TestEndpointsResolver_ResolveNetworkPeers_NamedIngressPortsIPBlocks(t *test
defer ctrl.Finish()

mockClient := mock_client.NewMockClient(ctrl)
resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{}))
resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{}))

var ingressEndpoints []policyinfo.EndpointInfo
ctx := context.TODO()
Expand Down
Loading