Skip to content

Commit 64a3174

Browse files
committed
Switch node list to watch
Signed-off-by: Markus Blaschke <[email protected]>
1 parent 34b3e7f commit 64a3174

File tree

4 files changed

+66
-38
lines changed

4 files changed

+66
-38
lines changed

autopilot/lib.k8s.go

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,66 @@
11
package autopilot
22

33
import (
4-
"context"
54
"encoding/json"
65
"fmt"
76
log "github.com/sirupsen/logrus"
87
"github.com/webdevopos/azure-k8s-autopilot/k8s"
8+
corev1 "k8s.io/api/core/v1"
99
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1010
"k8s.io/apimachinery/pkg/types"
11+
"k8s.io/apimachinery/pkg/watch"
1112
"time"
1213
)
1314

14-
func (r *AzureK8sAutopilot) getK8sNodeList() (nodeList *k8s.NodeList, err error) {
15-
ctx := context.Background()
15+
func (r *AzureK8sAutopilot) startNodeWatch() error {
16+
// init list
17+
r.nodeList.lock.Lock()
18+
r.nodeList.list = map[string]k8s.Node{}
19+
r.nodeList.lock.Unlock()
1620

17-
opts := metav1.ListOptions{}
18-
opts.LabelSelector = r.Config.K8S.NodeLabelSelector
19-
list, k8sError := r.k8sClient.CoreV1().Nodes().List(ctx, opts)
20-
if k8sError != nil {
21-
err = k8sError
22-
return
21+
timeout := int64(60 * 60 * 1)
22+
nodeWatcher, err := r.k8sClient.CoreV1().Nodes().Watch(r.ctx, metav1.ListOptions{TimeoutSeconds: &timeout, Watch: true})
23+
if err != nil {
24+
log.Panic(err)
2325
}
24-
25-
nodeList = &k8s.NodeList{NodeList: list}
26-
27-
// fetch all nodes
28-
for {
29-
if list.RemainingItemCount == nil || *list.RemainingItemCount == 0 {
30-
break
26+
defer nodeWatcher.Stop()
27+
28+
for res := range nodeWatcher.ResultChan() {
29+
switch res.Type {
30+
case watch.Added:
31+
r.nodeList.lock.Lock()
32+
if node, ok := res.Object.(*corev1.Node); ok {
33+
r.nodeList.list[node.Name] = k8s.Node{Node: node}
34+
}
35+
r.nodeList.lock.Unlock()
36+
case watch.Deleted:
37+
r.nodeList.lock.Lock()
38+
if node, ok := res.Object.(*corev1.Node); ok {
39+
delete(r.nodeList.list, node.Name)
40+
}
41+
r.nodeList.lock.Unlock()
42+
case watch.Modified:
43+
r.nodeList.lock.Lock()
44+
if node, ok := res.Object.(*corev1.Node); ok {
45+
r.nodeList.list[node.Name] = k8s.Node{Node: node}
46+
}
47+
r.nodeList.lock.Unlock()
48+
case watch.Error:
49+
return fmt.Errorf("unable to understand watch event %v", res.Type)
3150
}
51+
}
3252

33-
opts.Continue = list.Continue
53+
return fmt.Errorf("terminated")
54+
}
3455

35-
remainList, k8sError := r.k8sClient.CoreV1().Nodes().List(ctx, opts)
36-
if k8sError != nil {
37-
err = k8sError
38-
return
39-
}
56+
func (r *AzureK8sAutopilot) getK8sNodeList() (nodeList *k8s.NodeList, err error) {
57+
nodeList = &k8s.NodeList{}
4058

41-
list.Continue = remainList.Continue
42-
list.RemainingItemCount = remainList.RemainingItemCount
43-
nodeList.Items = append(nodeList.Items, remainList.Items...)
59+
r.nodeList.lock.Lock()
60+
for _, node := range r.nodeList.list {
61+
nodeList.List = append(nodeList.List, &node)
4462
}
45-
63+
r.nodeList.lock.Unlock()
4664
return
4765
}
4866

autopilot/main.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ import (
1414
"github.com/webdevopos/azure-k8s-autopilot/k8s"
1515
"golang.org/x/net/context"
1616
"k8s.io/api/policy/v1beta1"
17-
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1818
"k8s.io/client-go/kubernetes"
1919
_ "k8s.io/client-go/plugin/pkg/client/auth"
2020
"k8s.io/client-go/rest"
2121
"k8s.io/client-go/tools/clientcmd"
2222
"os"
2323
"strings"
24+
"sync"
2425
"time"
2526
)
2627

@@ -59,6 +60,11 @@ type (
5960
cache *cache.Cache
6061
nodeRepairLock *cache.Cache
6162
nodeUpdateLock *cache.Cache
63+
64+
nodeList struct {
65+
list map[string]k8s.Node
66+
lock sync.Mutex
67+
}
6268
}
6369
)
6470

@@ -183,6 +189,15 @@ func (r *AzureK8sAutopilot) Start() {
183189
r.leaderElect()
184190
log.Infof("starting autopilot")
185191

192+
go func() {
193+
for {
194+
log.Info("(re)starting node watch")
195+
if err := r.startNodeWatch(); err != nil {
196+
log.Errorf("node watcher stopped: %v", err)
197+
}
198+
}
199+
}()
200+
186201
if r.Config.Repair.Crontab != "" {
187202
r.startAutopilotRepair()
188203
}
@@ -294,7 +309,7 @@ func (r *AzureK8sAutopilot) checkSelfEviction(node *k8s.Node) bool {
294309
}
295310

296311
eviction := v1beta1.Eviction{
297-
ObjectMeta: v1.ObjectMeta{
312+
ObjectMeta: metav1.ObjectMeta{
298313
Name: *r.Config.Instance.Pod,
299314
Namespace: *r.Config.Instance.Namespace,
300315
},

autopilot/task.repair.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func (r *AzureK8sAutopilot) repairRun(contextLogger *log.Entry) {
1919

2020
r.nodeRepairLock.DeleteExpired()
2121

22-
contextLogger.Debugf("found %v nodes in cluster (%v in locked state)", len(nodeList.Items), r.nodeRepairLock.ItemCount())
22+
contextLogger.Debugf("found %v nodes in cluster (%v in locked state)", len(nodeList.GetNodes()), r.nodeRepairLock.ItemCount())
2323

2424
nodeLoop:
2525
for _, node := range nodeList.GetNodes() {

k8s/nodelist.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,16 @@ package k8s
22

33
import (
44
"fmt"
5-
v1 "k8s.io/api/core/v1"
65
)
76

87
type (
98
NodeList struct {
10-
*v1.NodeList
9+
List []*Node
1110
}
1211
)
1312

1413
func (n *NodeList) GetNodes() (list []*Node) {
15-
for _, value := range n.Items {
16-
node := value
17-
list = append(list, &Node{Node: &node})
18-
}
19-
return
14+
return n.List
2015
}
2116

2217
func (n *NodeList) GetAzureVmssList() (vmssList map[string]*NodeInfo, err error) {
@@ -47,9 +42,9 @@ func (n *NodeList) GetAzureVmssList() (vmssList map[string]*NodeInfo, err error)
4742
}
4843

4944
func (n *NodeList) FindNodeByProviderId(providerId string) (ret *Node) {
50-
for _, node := range n.Items {
45+
for _, node := range n.List {
5146
if node.Spec.ProviderID == providerId {
52-
ret = &Node{&node}
47+
ret = node
5348
break
5449
}
5550
}

0 commit comments

Comments
 (0)