From 814bbe9810d1193317b08022cfd8bb66f75ae0d1 Mon Sep 17 00:00:00 2001 From: y00841556 Date: Fri, 3 Nov 2023 18:25:00 +0800 Subject: [PATCH 1/3] =?UTF-8?q?k8s-mpam-controller=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E6=BA=90=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- k8s-mpam-controller/cmd/agent/main.go | 9 + k8s-mpam-controller/pkg/agent/label.go | 43 ++++ k8s-mpam-controller/pkg/agent/main.go | 99 +++++++++ k8s-mpam-controller/pkg/agent/watcher.go | 256 +++++++++++++++++++++++ 4 files changed, 407 insertions(+) create mode 100644 k8s-mpam-controller/cmd/agent/main.go create mode 100644 k8s-mpam-controller/pkg/agent/label.go create mode 100644 k8s-mpam-controller/pkg/agent/main.go create mode 100644 k8s-mpam-controller/pkg/agent/watcher.go diff --git a/k8s-mpam-controller/cmd/agent/main.go b/k8s-mpam-controller/cmd/agent/main.go new file mode 100644 index 0000000..ddce56d --- /dev/null +++ b/k8s-mpam-controller/cmd/agent/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "k8s-mpam-controller/pkg/agent" +) + +func main() { + agent.Main() +} diff --git a/k8s-mpam-controller/pkg/agent/label.go b/k8s-mpam-controller/pkg/agent/label.go new file mode 100644 index 0000000..6dbb57f --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/label.go @@ -0,0 +1,43 @@ +package agent + +import ( + "context" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sclient "k8s.io/client-go/kubernetes" + "os" + + "k8s.io/klog" +) + +const mpamLabel = "MPAM" + +// labelNodeMPAM label a node to indicate if it supports MPAM +func labelNodeMPAM(k8sCli *k8sclient.Clientset) bool { + support := true + + // label the node + node, err := k8sCli.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{}) + if err != nil || node == nil { + klog.Errorf("Failed to get node: %v", err) + klog.Warning("please ensure environment variable NODE_NAME has been set!") + return false + } + + // check if resctrl is supported + if _, err := os.Stat("/sys/fs/resctrl"); err != nil { + node.Labels[mpamLabel] = "no" + support = false + } else { + // check if resctrl fs has been mounted + if _, err := os.Stat("/sys/fs/resctrl/schemata"); err != nil { + node.Labels[mpamLabel] = "disabled" + support = false + } else { + node.Labels[mpamLabel] = "enabled" + } + } + + k8sCli.CoreV1().Nodes().Update(context.TODO(), node, meta_v1.UpdateOptions{}) + + return support +} diff --git a/k8s-mpam-controller/pkg/agent/main.go b/k8s-mpam-controller/pkg/agent/main.go new file mode 100644 index 0000000..8b91315 --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/main.go @@ -0,0 +1,99 @@ +package agent + +import ( + "context" + "flag" + "io/ioutil" + "math" + "os" + "strings" + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" +) + +var direct bool +var clientset *kubernetes.Clientset + +func Main() { + var config *rest.Config + var err error + + kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file (optional, if run in cluster)") + server := flag.String("server", "", "the relay server address") + caFile := flag.String("ca-file", "", "absolute path to the root certificate file") + certFile := flag.String("cert-file", "", "absolute path to the certificate file") + keyFile := flag.String("key-file", "", "absolute path to the private key file") + serverName := flag.String("cn", "", "the common name (CN) of the certificate") + flag.BoolVar(&direct, "direct", false, "direct mode, default false. if true, it doesn't depends on a server to relay pod information") + + flag.Parse() + + if !direct { + if *server == "" || *caFile == "" || *certFile == "" || *keyFile == "" || *serverName == "" { + klog.Error("Arguments: server / ca-file / cert-file / key-file and cn should be set") + return + } + } + + if *kubeconfig == "" { + klog.Info("using in-cluster config") + config, err = rest.InClusterConfig() + } else { + config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) + } + + if err != nil { + klog.Errorf("Failed to build config: %v", err) + return + } + + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + klog.Errorf("Failed creates clientset: %v", err) + return + } + + klog.Info("Node name: " + nodeName) + node, err := clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{}) + if err != nil || node == nil { + klog.Errorf("Failed to get node: %v", err) + klog.Warning("please ensure environment variable NODE_NAME has been set!") + return + } + + if !labelNodeMPAM(clientset) { + klog.Info("Seems this node doesn't support MPAM. Please ensure resctrl fs is mounted") + time.Sleep(math.MaxInt64) + } + + getWatcher(clientset).start() + + if direct { + for { + time.Sleep(10 * time.Second) + } + } else { + for { + if err := startClient(*server, *caFile, *certFile, *keyFile, *serverName); err != nil { + klog.Errorf("Client error: %v", err) + } + } + } +} + +func init() { + // Node name is expected to be set in environment variable "NODE_NAME" + nodeName = os.Getenv("NODE_NAME") + + if nodeName == "" { + if hostname, err := ioutil.ReadFile("/etc/hostname"); err == nil { + nodeName = strings.ToLower(strings.TrimSpace(string(hostname))) + } + } +} diff --git a/k8s-mpam-controller/pkg/agent/watcher.go b/k8s-mpam-controller/pkg/agent/watcher.go new file mode 100644 index 0000000..939735f --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/watcher.go @@ -0,0 +1,256 @@ +package agent + +import ( + "context" + "sync" + + core_v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8swatch "k8s.io/apimachinery/pkg/watch" + k8sclient "k8s.io/client-go/kubernetes" + + "k8s.io/klog" +) + +// the namespace of the ConfigMaps +const configMapNamespace = "rc-config" + +// resource control configuration +type configData map[string]string + +// nodeName contains the name of the k8s node we're running on +var nodeName string + +type watcher struct { + k8sCli *k8sclient.Clientset // k8s client interface + + groupConfigMapWatch k8swatch.Interface + + sync.RWMutex + + // node-specific configuration. it's 'Data' field value of the ConfigMap + // named rc-config.node.{NODE_NAME} + nodeCfg *configData + + // group-specific configuration. If a node belong to a node group, it is + // the 'Data' field value of the ConfigMap named rc-config.group.{GROUP_NAME}, + // otherwise it's the 'Data' field value of the ConfigMap named rc-config.default + groupCfg *configData +} + +var singleton_watcher *watcher + +// getWatcher returns singleton k8s watcher instance. +func getWatcher(k8sCli *k8sclient.Clientset) *watcher { + if singleton_watcher == nil { + singleton_watcher = &watcher{ + k8sCli: k8sCli, + } + } + + return singleton_watcher +} + +func (w *watcher) watchNode() { + // watch this Node + selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + nodeName} + k8w, err := w.k8sCli.CoreV1().Nodes().Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch node (%q): %v", nodeName, err) + return + } + + go func(ev <-chan k8swatch.Event, group string) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + klog.Infof("node (%s) is updated", nodeName) + label, _ := e.Object.(*core_v1.Node).Labels["ngroup"] + + // if the node group is changed, we start to watch the config of the new node group + if group != label { + group = label + klog.Infof("node group is set to %s", group) + w.watchGroupConfigMap(group) + } + case k8swatch.Deleted: + klog.Warning("our node is removed...") + default: + klog.Info("other event type") + } + } + + klog.Warning("seems node watcher is closed, going to restart ...") + w.watchNode() + klog.Warning("node configMap watcher restarted") + }(k8w.ResultChan(), "") +} + +func (w *watcher) watchNodeConfigMap() { + // watch "rc-config.node.{NODE_NAME}" ConfigMap + selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + "rc-config.node." + nodeName} + k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch ConfigMap rc-config.node.%q: %v", nodeName, err) + return + } + + go func(ev <-chan k8swatch.Event) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + klog.Info("ConfigMap rc-config.node." + nodeName + " is updated") + cm := e.Object.(*core_v1.ConfigMap) + w.setNodeConfig(&cm.Data) + case k8swatch.Deleted: + klog.Info("ConfigMap rc-config.node." + nodeName + " is deleted") + w.setNodeConfig(nil) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems node configMap watcher is closed, going to restart ...") + w.watchNodeConfigMap() + klog.Warning("node configMap watcher restarted") + }(k8w.ResultChan()) +} + +func (w *watcher) watchGroupConfigMap(group string) { + if w.groupConfigMapWatch != nil { + w.groupConfigMapWatch.Stop() + } + + // watch group ConfigMap + cmName := "rc-config.default" + if group != "" { + cmName = "rc-config.group." + group + } + selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + cmName} + k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch group ConfigMap (%q): %v", cmName, err) + return + } + + w.groupConfigMapWatch = k8w + klog.Info("start watching ConfigMap " + cmName) + + go func(ev <-chan k8swatch.Event, group string) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + cm := e.Object.(*core_v1.ConfigMap) + klog.Infof("group ConfigMap (%s) is updated", cm.Name) + w.setGroupConfig(&cm.Data) + case k8swatch.Deleted: + cm := e.Object.(*core_v1.ConfigMap) + klog.Infof("group ConfigMap (%s) is deleted", cm.Name) + w.setGroupConfig(nil) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems group configMap watcher is closed, going to restart ...") + w.watchGroupConfigMap(group) + klog.Warning("group configMap watcher is restarted") + + }(k8w.ResultChan(), group) +} + +func (w *watcher) watchPods() { + // watch Pods in all namespace + k8w, err := w.k8sCli.CoreV1().Pods(meta_v1.NamespaceAll).Watch(context.TODO(), meta_v1.ListOptions{}) + if err != nil { + klog.Errorf("watch Pods in all namespace failed") + return + } + + go func(ev <-chan k8swatch.Event) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + klog.Infof("pod (%s) is updated", e.Object.(*core_v1.Pod).Name) + if rcgroup, ok := e.Object.(*core_v1.Pod).Labels["rcgroup"]; ok { + klog.Infof("Pod: %s; rcgroup: %s", string(e.Object.(*core_v1.Pod).UID), rcgroup) + assignControlGroup(string(e.Object.(*core_v1.Pod).UID), rcgroup) + } + + case k8swatch.Deleted: + klog.Info("a pod is deleted " + e.Object.(*core_v1.Pod).UID) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems pod watcher is closed, going to restart ...") + w.watchPods() + klog.Warning("pod watcher is restarted") + + }(k8w.ResultChan()) +} + +func (w *watcher) start() { + klog.Info("starting agent watcher ...") + if nodeName == "" { + klog.Warning("node name not set, NODE_NAME env variable should be set to match the name of this k8s Node") + return + } + + if !getNumclosids() { + klog.Errorf("get num_closids failed, please ensure resctrl fs is mounted") + return + } + + if !getCbmmask() { + klog.Errorf("get cbm_mask failed, please ensure resctrl fs is mounted") + return + } + + w.watchNodeConfigMap() + w.watchGroupConfigMap("") + w.watchNode() + + if direct { + w.watchPods() + } +} + +// applyConfig applies the current configuration. +func (w *watcher) applyConfig() { + klog.Info("apply configuration") + + config := w.groupCfg + + if w.nodeCfg != nil { + config = w.nodeCfg + } + + if config == nil { + klog.Warning("There is no configuration") + } + + applyConfig(config) +} + +// set node-specific configuration +func (w *watcher) setNodeConfig(data *map[string]string) { + w.Lock() + defer w.Unlock() + + w.nodeCfg = (*configData)(data) + w.applyConfig() +} + +// set group-specific or default configuration +func (w *watcher) setGroupConfig(data *map[string]string) { + w.Lock() + defer w.Unlock() + + w.groupCfg = (*configData)(data) + + if w.nodeCfg == nil { + w.applyConfig() + } +} -- Gitee From 106343a3c406e0c50555637809a1b3a7cf33fe51 Mon Sep 17 00:00:00 2001 From: y00841556 Date: Mon, 6 Nov 2023 14:45:49 +0800 Subject: [PATCH 2/3] =?UTF-8?q?k8s-mpam-controller=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E6=BA=90=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- k8s-mpam-controller/cmd/agent/main.go | 16 +++++ k8s-mpam-controller/pkg/agent/label.go | 29 ++++++-- k8s-mpam-controller/pkg/agent/main.go | 77 +++++++++++++------- k8s-mpam-controller/pkg/agent/watcher.go | 92 +++++++++++++++--------- 4 files changed, 151 insertions(+), 63 deletions(-) diff --git a/k8s-mpam-controller/cmd/agent/main.go b/k8s-mpam-controller/cmd/agent/main.go index ddce56d..7dd07b8 100644 --- a/k8s-mpam-controller/cmd/agent/main.go +++ b/k8s-mpam-controller/cmd/agent/main.go @@ -1,3 +1,19 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package main import ( diff --git a/k8s-mpam-controller/pkg/agent/label.go b/k8s-mpam-controller/pkg/agent/label.go index 6dbb57f..7620824 100644 --- a/k8s-mpam-controller/pkg/agent/label.go +++ b/k8s-mpam-controller/pkg/agent/label.go @@ -1,22 +1,39 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( "context" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8sclient "k8s.io/client-go/kubernetes" "os" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" ) const mpamLabel = "MPAM" // labelNodeMPAM label a node to indicate if it supports MPAM -func labelNodeMPAM(k8sCli *k8sclient.Clientset) bool { - support := true +func labelNodeMPAM(k8sCli *kubernetes.Clientset) bool { + support = true // label the node - node, err := k8sCli.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{}) + node, err := k8sCli.CoreV1().Nodes().Get(context.TODO(), nodeName, meta.GetOptions{}) if err != nil || node == nil { klog.Errorf("Failed to get node: %v", err) klog.Warning("please ensure environment variable NODE_NAME has been set!") @@ -37,7 +54,7 @@ func labelNodeMPAM(k8sCli *k8sclient.Clientset) bool { } } - k8sCli.CoreV1().Nodes().Update(context.TODO(), node, meta_v1.UpdateOptions{}) + k8sCli.CoreV1().Nodes().Update(context.TODO(), node, meta.UpdateOptions{}) return support } diff --git a/k8s-mpam-controller/pkg/agent/main.go b/k8s-mpam-controller/pkg/agent/main.go index 8b91315..179933e 100644 --- a/k8s-mpam-controller/pkg/agent/main.go +++ b/k8s-mpam-controller/pkg/agent/main.go @@ -1,28 +1,44 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( "context" "flag" "io/ioutil" - "math" "os" "strings" "time" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" ) +const sleepTime = 10 + var direct bool var clientset *kubernetes.Clientset +var support bool func Main() { - var config *rest.Config - var err error - kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file (optional, if run in cluster)") server := flag.String("server", "", "the relay server address") caFile := flag.String("ca-file", "", "absolute path to the root certificate file") @@ -40,46 +56,38 @@ func Main() { } } - if *kubeconfig == "" { - klog.Info("using in-cluster config") - config, err = rest.InClusterConfig() - } else { - config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) - } - - if err != nil { - klog.Errorf("Failed to build config: %v", err) - return - } - // creates the clientset - clientset, err := kubernetes.NewForConfig(config) + clientset, err := createClientSet(kubeconfig) if err != nil { klog.Errorf("Failed creates clientset: %v", err) return } + getNodeName() klog.Info("Node name: " + nodeName) - node, err := clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{}) + node, err := clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, meta.GetOptions{}) if err != nil || node == nil { klog.Errorf("Failed to get node: %v", err) klog.Warning("please ensure environment variable NODE_NAME has been set!") return } - if !labelNodeMPAM(clientset) { + for !labelNodeMPAM(clientset) { klog.Info("Seems this node doesn't support MPAM. Please ensure resctrl fs is mounted") - time.Sleep(math.MaxInt64) + time.Sleep(sleepTime * time.Second) + if support { + break + } } getWatcher(clientset).start() if direct { - for { - time.Sleep(10 * time.Second) + for support { + time.Sleep(sleepTime * time.Second) } } else { - for { + for support { if err := startClient(*server, *caFile, *certFile, *keyFile, *serverName); err != nil { klog.Errorf("Client error: %v", err) } @@ -87,7 +95,26 @@ func Main() { } } -func init() { +func createClientSet(kubeconfig *string) (*kubernetes.Clientset, error) { + var config *rest.Config + var err error + + if *kubeconfig == "" { + klog.Info("using in-cluster config") + config, err = rest.InClusterConfig() + } else { + config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) + } + + if err != nil { + klog.Errorf("Failed to build config: %v", err) + return nil, err + } + + return kubernetes.NewForConfig(config) +} + +func getNodeName() { // Node name is expected to be set in environment variable "NODE_NAME" nodeName = os.Getenv("NODE_NAME") diff --git a/k8s-mpam-controller/pkg/agent/watcher.go b/k8s-mpam-controller/pkg/agent/watcher.go index 939735f..b7bf23d 100644 --- a/k8s-mpam-controller/pkg/agent/watcher.go +++ b/k8s-mpam-controller/pkg/agent/watcher.go @@ -1,13 +1,29 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( "context" "sync" - core_v1 "k8s.io/api/core/v1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8swatch "k8s.io/apimachinery/pkg/watch" - k8sclient "k8s.io/client-go/kubernetes" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/klog" ) @@ -22,9 +38,9 @@ type configData map[string]string var nodeName string type watcher struct { - k8sCli *k8sclient.Clientset // k8s client interface + k8sCli *kubernetes.Clientset // k8s client interface - groupConfigMapWatch k8swatch.Interface + groupConfigMapWatch watch.Interface sync.RWMutex @@ -41,7 +57,7 @@ type watcher struct { var singleton_watcher *watcher // getWatcher returns singleton k8s watcher instance. -func getWatcher(k8sCli *k8sclient.Clientset) *watcher { +func getWatcher(k8sCli *kubernetes.Clientset) *watcher { if singleton_watcher == nil { singleton_watcher = &watcher{ k8sCli: k8sCli, @@ -53,19 +69,19 @@ func getWatcher(k8sCli *k8sclient.Clientset) *watcher { func (w *watcher) watchNode() { // watch this Node - selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + nodeName} + selector := meta.ListOptions{FieldSelector: "metadata.name=" + nodeName} k8w, err := w.k8sCli.CoreV1().Nodes().Watch(context.TODO(), selector) if err != nil { klog.Errorf("Failed to watch node (%q): %v", nodeName, err) return } - go func(ev <-chan k8swatch.Event, group string) { + go func(ev <-chan watch.Event, group string) { for e := range ev { switch e.Type { - case k8swatch.Added, k8swatch.Modified: + case watch.Added, watch.Modified: klog.Infof("node (%s) is updated", nodeName) - label, _ := e.Object.(*core_v1.Node).Labels["ngroup"] + label, _ := e.Object.(*core.Node).Labels["ngroup"] // if the node group is changed, we start to watch the config of the new node group if group != label { @@ -73,7 +89,7 @@ func (w *watcher) watchNode() { klog.Infof("node group is set to %s", group) w.watchGroupConfigMap(group) } - case k8swatch.Deleted: + case watch.Deleted: klog.Warning("our node is removed...") default: klog.Info("other event type") @@ -88,21 +104,25 @@ func (w *watcher) watchNode() { func (w *watcher) watchNodeConfigMap() { // watch "rc-config.node.{NODE_NAME}" ConfigMap - selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + "rc-config.node." + nodeName} + selector := meta.ListOptions{FieldSelector: "metadata.name=" + "rc-config.node." + nodeName} k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) if err != nil { klog.Errorf("Failed to watch ConfigMap rc-config.node.%q: %v", nodeName, err) return } - go func(ev <-chan k8swatch.Event) { + go func(ev <-chan watch.Event) { for e := range ev { switch e.Type { - case k8swatch.Added, k8swatch.Modified: + case watch.Added, watch.Modified: klog.Info("ConfigMap rc-config.node." + nodeName + " is updated") - cm := e.Object.(*core_v1.ConfigMap) + cm, ok := e.Object.(*core.ConfigMap) + if !ok { + klog.Warning("It's not ok for type *core.ConfigMap") + continue + } w.setNodeConfig(&cm.Data) - case k8swatch.Deleted: + case watch.Deleted: klog.Info("ConfigMap rc-config.node." + nodeName + " is deleted") w.setNodeConfig(nil) default: @@ -126,7 +146,7 @@ func (w *watcher) watchGroupConfigMap(group string) { if group != "" { cmName = "rc-config.group." + group } - selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + cmName} + selector := meta.ListOptions{FieldSelector: "metadata.name=" + cmName} k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) if err != nil { klog.Errorf("Failed to watch group ConfigMap (%q): %v", cmName, err) @@ -136,15 +156,23 @@ func (w *watcher) watchGroupConfigMap(group string) { w.groupConfigMapWatch = k8w klog.Info("start watching ConfigMap " + cmName) - go func(ev <-chan k8swatch.Event, group string) { + go func(ev <-chan watch.Event, group string) { for e := range ev { switch e.Type { - case k8swatch.Added, k8swatch.Modified: - cm := e.Object.(*core_v1.ConfigMap) + case watch.Added, watch.Modified: + cm, ok := e.Object.(*core.ConfigMap) + if !ok { + klog.Warning("It's not ok for type *core.ConfigMap") + continue + } klog.Infof("group ConfigMap (%s) is updated", cm.Name) w.setGroupConfig(&cm.Data) - case k8swatch.Deleted: - cm := e.Object.(*core_v1.ConfigMap) + case watch.Deleted: + cm, ok := e.Object.(*core.ConfigMap) + if !ok { + klog.Warning("It's not ok for type *core.ConfigMap") + continue + } klog.Infof("group ConfigMap (%s) is deleted", cm.Name) w.setGroupConfig(nil) default: @@ -161,24 +189,24 @@ func (w *watcher) watchGroupConfigMap(group string) { func (w *watcher) watchPods() { // watch Pods in all namespace - k8w, err := w.k8sCli.CoreV1().Pods(meta_v1.NamespaceAll).Watch(context.TODO(), meta_v1.ListOptions{}) + k8w, err := w.k8sCli.CoreV1().Pods(meta.NamespaceAll).Watch(context.TODO(), meta.ListOptions{}) if err != nil { klog.Errorf("watch Pods in all namespace failed") return } - go func(ev <-chan k8swatch.Event) { + go func(ev <-chan watch.Event) { for e := range ev { switch e.Type { - case k8swatch.Added, k8swatch.Modified: - klog.Infof("pod (%s) is updated", e.Object.(*core_v1.Pod).Name) - if rcgroup, ok := e.Object.(*core_v1.Pod).Labels["rcgroup"]; ok { - klog.Infof("Pod: %s; rcgroup: %s", string(e.Object.(*core_v1.Pod).UID), rcgroup) - assignControlGroup(string(e.Object.(*core_v1.Pod).UID), rcgroup) + case watch.Added, watch.Modified: + klog.Infof("pod (%s) is updated", e.Object.(*core.Pod).Name) + if rcgroup, ok := e.Object.(*core.Pod).Labels["rcgroup"]; ok { + klog.Infof("Pod: %s; rcgroup: %s", string(e.Object.(*core.Pod).UID), rcgroup) + assignControlGroup(string(e.Object.(*core.Pod).UID), rcgroup) } - case k8swatch.Deleted: - klog.Info("a pod is deleted " + e.Object.(*core_v1.Pod).UID) + case watch.Deleted: + klog.Info("a pod is deleted " + e.Object.(*core.Pod).UID) default: klog.Info("other event type") } -- Gitee From e834b5bd5be04b6ac45eb6c9a6e7712e49af42f7 Mon Sep 17 00:00:00 2001 From: y00841556 Date: Mon, 6 Nov 2023 21:42:31 +0800 Subject: [PATCH 3/3] =?UTF-8?q?k8s-mpam-controller=E6=BA=90=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- k8s-mpam-controller/pkg/agent/client.go | 16 ++++++++++++++++ k8s-mpam-controller/pkg/agent/label.go | 4 ++-- k8s-mpam-controller/pkg/agent/main.go | 23 +++++++++++++++-------- k8s-mpam-controller/pkg/agent/mpam.go | 18 +++++++++++++++++- k8s-mpam-controller/pkg/agent/watcher.go | 6 +++--- 5 files changed, 53 insertions(+), 14 deletions(-) diff --git a/k8s-mpam-controller/pkg/agent/client.go b/k8s-mpam-controller/pkg/agent/client.go index 312a1bd..a5901b6 100644 --- a/k8s-mpam-controller/pkg/agent/client.go +++ b/k8s-mpam-controller/pkg/agent/client.go @@ -1,3 +1,19 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( diff --git a/k8s-mpam-controller/pkg/agent/label.go b/k8s-mpam-controller/pkg/agent/label.go index 7620824..c83b9dd 100644 --- a/k8s-mpam-controller/pkg/agent/label.go +++ b/k8s-mpam-controller/pkg/agent/label.go @@ -20,10 +20,10 @@ import ( "context" "os" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/klog" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) const mpamLabel = "MPAM" diff --git a/k8s-mpam-controller/pkg/agent/main.go b/k8s-mpam-controller/pkg/agent/main.go index 179933e..61646a1 100644 --- a/k8s-mpam-controller/pkg/agent/main.go +++ b/k8s-mpam-controller/pkg/agent/main.go @@ -24,12 +24,12 @@ import ( "strings" "time" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/klog" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) const sleepTime = 10 @@ -39,6 +39,8 @@ var clientset *kubernetes.Clientset var support bool func Main() { + var err error + kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file (optional, if run in cluster)") server := flag.String("server", "", "the relay server address") caFile := flag.String("ca-file", "", "absolute path to the root certificate file") @@ -57,7 +59,7 @@ func Main() { } // creates the clientset - clientset, err := createClientSet(kubeconfig) + clientset, err = createClientSet(kubeconfig) if err != nil { klog.Errorf("Failed creates clientset: %v", err) return @@ -82,12 +84,17 @@ func Main() { getWatcher(clientset).start() - if direct { - for support { - time.Sleep(sleepTime * time.Second) + agentRun(server, caFile, certFile, keyFile, serverName) +} + +func agentRun(server, caFile, certFile, keyFile, serverName *string) { + for { + if !support { + break } - } else { - for support { + if direct { + time.Sleep(sleepTime * time.Second) + } else { if err := startClient(*server, *caFile, *certFile, *keyFile, *serverName); err != nil { klog.Errorf("Client error: %v", err) } diff --git a/k8s-mpam-controller/pkg/agent/mpam.go b/k8s-mpam-controller/pkg/agent/mpam.go index c1075e4..0727857 100644 --- a/k8s-mpam-controller/pkg/agent/mpam.go +++ b/k8s-mpam-controller/pkg/agent/mpam.go @@ -1,3 +1,19 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( @@ -89,7 +105,7 @@ func cleanResctrlGroup(groups []string) { } for _, fi := range fis { if !fi.IsDir() { - klog.Warning("fi ia not a dir") + klog.Warning("fi is not a dir") continue } found := false diff --git a/k8s-mpam-controller/pkg/agent/watcher.go b/k8s-mpam-controller/pkg/agent/watcher.go index b7bf23d..c4ad504 100644 --- a/k8s-mpam-controller/pkg/agent/watcher.go +++ b/k8s-mpam-controller/pkg/agent/watcher.go @@ -20,12 +20,12 @@ import ( "context" "sync" - core "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/klog" + + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) // the namespace of the ConfigMaps -- Gitee