diff --git a/k8s-mpam-controller/cmd/agent/main.go b/k8s-mpam-controller/cmd/agent/main.go new file mode 100644 index 0000000000000000000000000000000000000000..7dd07b8c681f34662fdd8dd720906d4ec1fdb410 --- /dev/null +++ b/k8s-mpam-controller/cmd/agent/main.go @@ -0,0 +1,25 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "k8s-mpam-controller/pkg/agent" +) + +func main() { + agent.Main() +} diff --git a/k8s-mpam-controller/pkg/agent/client.go b/k8s-mpam-controller/pkg/agent/client.go index 312a1bdf646f145eeac976ebd5b513bdaf22d2d1..a5901b6ba71838a5db36f6f1a0f3eac39743ce2a 100644 --- a/k8s-mpam-controller/pkg/agent/client.go +++ b/k8s-mpam-controller/pkg/agent/client.go @@ -1,3 +1,19 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( diff --git a/k8s-mpam-controller/pkg/agent/label.go b/k8s-mpam-controller/pkg/agent/label.go new file mode 100644 index 0000000000000000000000000000000000000000..c83b9dd1595d30aed5af96ec17323d215f334d6b --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/label.go @@ -0,0 +1,60 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agent + +import ( + "context" + "os" + + "k8s.io/client-go/kubernetes" + "k8s.io/klog" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const mpamLabel = "MPAM" + +// labelNodeMPAM label a node to indicate if it supports MPAM +func labelNodeMPAM(k8sCli *kubernetes.Clientset) bool { + support = true + + // label the node + node, err := k8sCli.CoreV1().Nodes().Get(context.TODO(), nodeName, meta.GetOptions{}) + if err != nil || node == nil { + klog.Errorf("Failed to get node: %v", err) + klog.Warning("please ensure environment variable NODE_NAME has been set!") + return false + } + + // check if resctrl is supported + if _, err := os.Stat("/sys/fs/resctrl"); err != nil { + node.Labels[mpamLabel] = "no" + support = false + } else { + // check if resctrl fs has been mounted + if _, err := os.Stat("/sys/fs/resctrl/schemata"); err != nil { + node.Labels[mpamLabel] = "disabled" + support = false + } else { + node.Labels[mpamLabel] = "enabled" + } + } + + k8sCli.CoreV1().Nodes().Update(context.TODO(), node, meta.UpdateOptions{}) + + return support +} diff --git a/k8s-mpam-controller/pkg/agent/main.go b/k8s-mpam-controller/pkg/agent/main.go new file mode 100644 index 0000000000000000000000000000000000000000..61646a1ce140e9315691b074db25f00d9e5a78f8 --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/main.go @@ -0,0 +1,133 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agent + +import ( + "context" + "flag" + "io/ioutil" + "os" + "strings" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const sleepTime = 10 + +var direct bool +var clientset *kubernetes.Clientset +var support bool + +func Main() { + var err error + + kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file (optional, if run in cluster)") + server := flag.String("server", "", "the relay server address") + caFile := flag.String("ca-file", "", "absolute path to the root certificate file") + certFile := flag.String("cert-file", "", "absolute path to the certificate file") + keyFile := flag.String("key-file", "", "absolute path to the private key file") + serverName := flag.String("cn", "", "the common name (CN) of the certificate") + flag.BoolVar(&direct, "direct", false, "direct mode, default false. if true, it doesn't depends on a server to relay pod information") + + flag.Parse() + + if !direct { + if *server == "" || *caFile == "" || *certFile == "" || *keyFile == "" || *serverName == "" { + klog.Error("Arguments: server / ca-file / cert-file / key-file and cn should be set") + return + } + } + + // creates the clientset + clientset, err = createClientSet(kubeconfig) + if err != nil { + klog.Errorf("Failed creates clientset: %v", err) + return + } + + getNodeName() + klog.Info("Node name: " + nodeName) + node, err := clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, meta.GetOptions{}) + if err != nil || node == nil { + klog.Errorf("Failed to get node: %v", err) + klog.Warning("please ensure environment variable NODE_NAME has been set!") + return + } + + for !labelNodeMPAM(clientset) { + klog.Info("Seems this node doesn't support MPAM. Please ensure resctrl fs is mounted") + time.Sleep(sleepTime * time.Second) + if support { + break + } + } + + getWatcher(clientset).start() + + agentRun(server, caFile, certFile, keyFile, serverName) +} + +func agentRun(server, caFile, certFile, keyFile, serverName *string) { + for { + if !support { + break + } + if direct { + time.Sleep(sleepTime * time.Second) + } else { + if err := startClient(*server, *caFile, *certFile, *keyFile, *serverName); err != nil { + klog.Errorf("Client error: %v", err) + } + } + } +} + +func createClientSet(kubeconfig *string) (*kubernetes.Clientset, error) { + var config *rest.Config + var err error + + if *kubeconfig == "" { + klog.Info("using in-cluster config") + config, err = rest.InClusterConfig() + } else { + config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) + } + + if err != nil { + klog.Errorf("Failed to build config: %v", err) + return nil, err + } + + return kubernetes.NewForConfig(config) +} + +func getNodeName() { + // Node name is expected to be set in environment variable "NODE_NAME" + nodeName = os.Getenv("NODE_NAME") + + if nodeName == "" { + if hostname, err := ioutil.ReadFile("/etc/hostname"); err == nil { + nodeName = strings.ToLower(strings.TrimSpace(string(hostname))) + } + } +} diff --git a/k8s-mpam-controller/pkg/agent/mpam.go b/k8s-mpam-controller/pkg/agent/mpam.go index c1075e419799117f2e2ce831fc9659afbe7c29a9..07278578d42fdd65dce30d5ee4be359a34cc0a89 100644 --- a/k8s-mpam-controller/pkg/agent/mpam.go +++ b/k8s-mpam-controller/pkg/agent/mpam.go @@ -1,3 +1,19 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package agent import ( @@ -89,7 +105,7 @@ func cleanResctrlGroup(groups []string) { } for _, fi := range fis { if !fi.IsDir() { - klog.Warning("fi ia not a dir") + klog.Warning("fi is not a dir") continue } found := false diff --git a/k8s-mpam-controller/pkg/agent/watcher.go b/k8s-mpam-controller/pkg/agent/watcher.go new file mode 100644 index 0000000000000000000000000000000000000000..c4ad5049ac1cb15e767eeaec5952a020222a60ce --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/watcher.go @@ -0,0 +1,284 @@ +/* +Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agent + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" + + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// the namespace of the ConfigMaps +const configMapNamespace = "rc-config" + +// resource control configuration +type configData map[string]string + +// nodeName contains the name of the k8s node we're running on +var nodeName string + +type watcher struct { + k8sCli *kubernetes.Clientset // k8s client interface + + groupConfigMapWatch watch.Interface + + sync.RWMutex + + // node-specific configuration. it's 'Data' field value of the ConfigMap + // named rc-config.node.{NODE_NAME} + nodeCfg *configData + + // group-specific configuration. If a node belong to a node group, it is + // the 'Data' field value of the ConfigMap named rc-config.group.{GROUP_NAME}, + // otherwise it's the 'Data' field value of the ConfigMap named rc-config.default + groupCfg *configData +} + +var singleton_watcher *watcher + +// getWatcher returns singleton k8s watcher instance. +func getWatcher(k8sCli *kubernetes.Clientset) *watcher { + if singleton_watcher == nil { + singleton_watcher = &watcher{ + k8sCli: k8sCli, + } + } + + return singleton_watcher +} + +func (w *watcher) watchNode() { + // watch this Node + selector := meta.ListOptions{FieldSelector: "metadata.name=" + nodeName} + k8w, err := w.k8sCli.CoreV1().Nodes().Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch node (%q): %v", nodeName, err) + return + } + + go func(ev <-chan watch.Event, group string) { + for e := range ev { + switch e.Type { + case watch.Added, watch.Modified: + klog.Infof("node (%s) is updated", nodeName) + label, _ := e.Object.(*core.Node).Labels["ngroup"] + + // if the node group is changed, we start to watch the config of the new node group + if group != label { + group = label + klog.Infof("node group is set to %s", group) + w.watchGroupConfigMap(group) + } + case watch.Deleted: + klog.Warning("our node is removed...") + default: + klog.Info("other event type") + } + } + + klog.Warning("seems node watcher is closed, going to restart ...") + w.watchNode() + klog.Warning("node configMap watcher restarted") + }(k8w.ResultChan(), "") +} + +func (w *watcher) watchNodeConfigMap() { + // watch "rc-config.node.{NODE_NAME}" ConfigMap + selector := meta.ListOptions{FieldSelector: "metadata.name=" + "rc-config.node." + nodeName} + k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch ConfigMap rc-config.node.%q: %v", nodeName, err) + return + } + + go func(ev <-chan watch.Event) { + for e := range ev { + switch e.Type { + case watch.Added, watch.Modified: + klog.Info("ConfigMap rc-config.node." + nodeName + " is updated") + cm, ok := e.Object.(*core.ConfigMap) + if !ok { + klog.Warning("It's not ok for type *core.ConfigMap") + continue + } + w.setNodeConfig(&cm.Data) + case watch.Deleted: + klog.Info("ConfigMap rc-config.node." + nodeName + " is deleted") + w.setNodeConfig(nil) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems node configMap watcher is closed, going to restart ...") + w.watchNodeConfigMap() + klog.Warning("node configMap watcher restarted") + }(k8w.ResultChan()) +} + +func (w *watcher) watchGroupConfigMap(group string) { + if w.groupConfigMapWatch != nil { + w.groupConfigMapWatch.Stop() + } + + // watch group ConfigMap + cmName := "rc-config.default" + if group != "" { + cmName = "rc-config.group." + group + } + selector := meta.ListOptions{FieldSelector: "metadata.name=" + cmName} + k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch group ConfigMap (%q): %v", cmName, err) + return + } + + w.groupConfigMapWatch = k8w + klog.Info("start watching ConfigMap " + cmName) + + go func(ev <-chan watch.Event, group string) { + for e := range ev { + switch e.Type { + case watch.Added, watch.Modified: + cm, ok := e.Object.(*core.ConfigMap) + if !ok { + klog.Warning("It's not ok for type *core.ConfigMap") + continue + } + klog.Infof("group ConfigMap (%s) is updated", cm.Name) + w.setGroupConfig(&cm.Data) + case watch.Deleted: + cm, ok := e.Object.(*core.ConfigMap) + if !ok { + klog.Warning("It's not ok for type *core.ConfigMap") + continue + } + klog.Infof("group ConfigMap (%s) is deleted", cm.Name) + w.setGroupConfig(nil) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems group configMap watcher is closed, going to restart ...") + w.watchGroupConfigMap(group) + klog.Warning("group configMap watcher is restarted") + + }(k8w.ResultChan(), group) +} + +func (w *watcher) watchPods() { + // watch Pods in all namespace + k8w, err := w.k8sCli.CoreV1().Pods(meta.NamespaceAll).Watch(context.TODO(), meta.ListOptions{}) + if err != nil { + klog.Errorf("watch Pods in all namespace failed") + return + } + + go func(ev <-chan watch.Event) { + for e := range ev { + switch e.Type { + case watch.Added, watch.Modified: + klog.Infof("pod (%s) is updated", e.Object.(*core.Pod).Name) + if rcgroup, ok := e.Object.(*core.Pod).Labels["rcgroup"]; ok { + klog.Infof("Pod: %s; rcgroup: %s", string(e.Object.(*core.Pod).UID), rcgroup) + assignControlGroup(string(e.Object.(*core.Pod).UID), rcgroup) + } + + case watch.Deleted: + klog.Info("a pod is deleted " + e.Object.(*core.Pod).UID) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems pod watcher is closed, going to restart ...") + w.watchPods() + klog.Warning("pod watcher is restarted") + + }(k8w.ResultChan()) +} + +func (w *watcher) start() { + klog.Info("starting agent watcher ...") + if nodeName == "" { + klog.Warning("node name not set, NODE_NAME env variable should be set to match the name of this k8s Node") + return + } + + if !getNumclosids() { + klog.Errorf("get num_closids failed, please ensure resctrl fs is mounted") + return + } + + if !getCbmmask() { + klog.Errorf("get cbm_mask failed, please ensure resctrl fs is mounted") + return + } + + w.watchNodeConfigMap() + w.watchGroupConfigMap("") + w.watchNode() + + if direct { + w.watchPods() + } +} + +// applyConfig applies the current configuration. +func (w *watcher) applyConfig() { + klog.Info("apply configuration") + + config := w.groupCfg + + if w.nodeCfg != nil { + config = w.nodeCfg + } + + if config == nil { + klog.Warning("There is no configuration") + } + + applyConfig(config) +} + +// set node-specific configuration +func (w *watcher) setNodeConfig(data *map[string]string) { + w.Lock() + defer w.Unlock() + + w.nodeCfg = (*configData)(data) + w.applyConfig() +} + +// set group-specific or default configuration +func (w *watcher) setGroupConfig(data *map[string]string) { + w.Lock() + defer w.Unlock() + + w.groupCfg = (*configData)(data) + + if w.nodeCfg == nil { + w.applyConfig() + } +}