diff --git a/k8s-mpam-controller/cmd/agent/main.go b/k8s-mpam-controller/cmd/agent/main.go new file mode 100644 index 0000000000000000000000000000000000000000..a0be1aa32b56236aa69fa4a39be56d6667c5bf6c --- /dev/null +++ b/k8s-mpam-controller/cmd/agent/main.go @@ -0,0 +1,16 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + * Description: 程序入口 + * Author: 于建智 y00841556 + * Create: 2023-11-3 + */ + +package main + +import ( + "k8s-mpam-controller/pkg/agent" +) + +func main() { + agent.Main() +} diff --git a/k8s-mpam-controller/pkg/agent/label.go b/k8s-mpam-controller/pkg/agent/label.go new file mode 100644 index 0000000000000000000000000000000000000000..c71f4f871b245d0ed558d3a6e22f2b40e28cbc64 --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/label.go @@ -0,0 +1,50 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + * Description: 标记集群中节点是否支持MPAM特性 + * Author: 于建智 y00841556 + * Create: 2023-11-3 + */ + +package agent + +import ( + "context" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sclient "k8s.io/client-go/kubernetes" + "os" + + "k8s.io/klog" +) + +const mpamLabel = "MPAM" + +// labelNodeMPAM label a node to indicate if it supports MPAM +func labelNodeMPAM(k8sCli *k8sclient.Clientset) bool { + support := true + + // label the node + node, err := k8sCli.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{}) + if err != nil || node == nil { + klog.Errorf("Failed to get node: %v", err) + klog.Warning("please ensure environment variable NODE_NAME has been set!") + return false + } + + // check if resctrl is supported + if _, err := os.Stat("/sys/fs/resctrl"); err != nil { + node.Labels[mpamLabel] = "no" + support = false + } else { + // check if resctrl fs has been mounted + if _, err := os.Stat("/sys/fs/resctrl/schemata"); err != nil { + node.Labels[mpamLabel] = "disabled" + support = false + } else { + node.Labels[mpamLabel] = "enabled" + } + } + + k8sCli.CoreV1().Nodes().Update(context.TODO(), node, meta_v1.UpdateOptions{}) + + return support +} diff --git a/k8s-mpam-controller/pkg/agent/main.go b/k8s-mpam-controller/pkg/agent/main.go new file mode 100644 index 0000000000000000000000000000000000000000..bdd06bf27b056c5df7200bba16ccf972f5686bdd --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/main.go @@ -0,0 +1,106 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + * Description: k8s使能MPAM特性主程序 + * Author: 于建智 y00841556 + * Create: 2023-11-3 + */ + +package agent + +import ( + "context" + "flag" + "io/ioutil" + "math" + "os" + "strings" + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog" +) + +var direct bool +var clientset *kubernetes.Clientset + +func Main() { + var config *rest.Config + var err error + + kubeconfig := flag.String("kubeconfig", "", "absolute path to the kubeconfig file (optional, if run in cluster)") + server := flag.String("server", "", "the relay server address") + caFile := flag.String("ca-file", "", "absolute path to the root certificate file") + certFile := flag.String("cert-file", "", "absolute path to the certificate file") + keyFile := flag.String("key-file", "", "absolute path to the private key file") + serverName := flag.String("cn", "", "the common name (CN) of the certificate") + flag.BoolVar(&direct, "direct", false, "direct mode, default false. if true, it doesn't depends on a server to relay pod information") + + flag.Parse() + + if !direct { + if *server == "" || *caFile == "" || *certFile == "" || *keyFile == "" || *serverName == "" { + klog.Error("Arguments: server / ca-file / cert-file / key-file and cn should be set") + return + } + } + + if *kubeconfig == "" { + klog.Info("using in-cluster config") + config, err = rest.InClusterConfig() + } else { + config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig) + } + + if err != nil { + klog.Errorf("Failed to build config: %v", err) + return + } + + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + klog.Errorf("Failed creates clientset: %v", err) + return + } + + klog.Info("Node name: " + nodeName) + node, err := clientset.CoreV1().Nodes().Get(context.TODO(), nodeName, meta_v1.GetOptions{}) + if err != nil || node == nil { + klog.Errorf("Failed to get node: %v", err) + klog.Warning("please ensure environment variable NODE_NAME has been set!") + return + } + + if !labelNodeMPAM(clientset) { + klog.Info("Seems this node doesn't support MPAM. Please ensure resctrl fs is mounted") + time.Sleep(math.MaxInt64) + } + + getWatcher(clientset).start() + + if direct { + for { + time.Sleep(10 * time.Second) + } + } else { + for { + if err := startClient(*server, *caFile, *certFile, *keyFile, *serverName); err != nil { + klog.Errorf("Client error: %v", err) + } + } + } +} + +func init() { + // Node name is expected to be set in environment variable "NODE_NAME" + nodeName = os.Getenv("NODE_NAME") + + if nodeName == "" { + if hostname, err := ioutil.ReadFile("/etc/hostname"); err == nil { + nodeName = strings.ToLower(strings.TrimSpace(string(hostname))) + } + } +} diff --git a/k8s-mpam-controller/pkg/agent/watcher.go b/k8s-mpam-controller/pkg/agent/watcher.go new file mode 100644 index 0000000000000000000000000000000000000000..bd4b97e515545fbd69cf550e9ebf00954772c54d --- /dev/null +++ b/k8s-mpam-controller/pkg/agent/watcher.go @@ -0,0 +1,263 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved. + * Description: list-watch机制监测集群中的ConfigMap,当监测到变化时自动应用到对应节点 + * Author: 于建智 y00841556 + * Create: 2023-11-3 + */ + +package agent + +import ( + "context" + "sync" + + core_v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8swatch "k8s.io/apimachinery/pkg/watch" + k8sclient "k8s.io/client-go/kubernetes" + + "k8s.io/klog" +) + +// the namespace of the ConfigMaps +const configMapNamespace = "rc-config" + +// resource control configuration +type configData map[string]string + +// nodeName contains the name of the k8s node we're running on +var nodeName string + +type watcher struct { + k8sCli *k8sclient.Clientset // k8s client interface + + groupConfigMapWatch k8swatch.Interface + + sync.RWMutex + + // node-specific configuration. it's 'Data' field value of the ConfigMap + // named rc-config.node.{NODE_NAME} + nodeCfg *configData + + // group-specific configuration. If a node belong to a node group, it is + // the 'Data' field value of the ConfigMap named rc-config.group.{GROUP_NAME}, + // otherwise it's the 'Data' field value of the ConfigMap named rc-config.default + groupCfg *configData +} + +var singleton_watcher *watcher + +// getWatcher returns singleton k8s watcher instance. +func getWatcher(k8sCli *k8sclient.Clientset) *watcher { + if singleton_watcher == nil { + singleton_watcher = &watcher{ + k8sCli: k8sCli, + } + } + + return singleton_watcher +} + +func (w *watcher) watchNode() { + // watch this Node + selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + nodeName} + k8w, err := w.k8sCli.CoreV1().Nodes().Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch node (%q): %v", nodeName, err) + return + } + + go func(ev <-chan k8swatch.Event, group string) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + klog.Infof("node (%s) is updated", nodeName) + label, _ := e.Object.(*core_v1.Node).Labels["ngroup"] + + // if the node group is changed, we start to watch the config of the new node group + if group != label { + group = label + klog.Infof("node group is set to %s", group) + w.watchGroupConfigMap(group) + } + case k8swatch.Deleted: + klog.Warning("our node is removed...") + default: + klog.Info("other event type") + } + } + + klog.Warning("seems node watcher is closed, going to restart ...") + w.watchNode() + klog.Warning("node configMap watcher restarted") + }(k8w.ResultChan(), "") +} + +func (w *watcher) watchNodeConfigMap() { + // watch "rc-config.node.{NODE_NAME}" ConfigMap + selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + "rc-config.node." + nodeName} + k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch ConfigMap rc-config.node.%q: %v", nodeName, err) + return + } + + go func(ev <-chan k8swatch.Event) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + klog.Info("ConfigMap rc-config.node." + nodeName + " is updated") + cm := e.Object.(*core_v1.ConfigMap) + w.setNodeConfig(&cm.Data) + case k8swatch.Deleted: + klog.Info("ConfigMap rc-config.node." + nodeName + " is deleted") + w.setNodeConfig(nil) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems node configMap watcher is closed, going to restart ...") + w.watchNodeConfigMap() + klog.Warning("node configMap watcher restarted") + }(k8w.ResultChan()) +} + +func (w *watcher) watchGroupConfigMap(group string) { + if w.groupConfigMapWatch != nil { + w.groupConfigMapWatch.Stop() + } + + // watch group ConfigMap + cmName := "rc-config.default" + if group != "" { + cmName = "rc-config.group." + group + } + selector := meta_v1.ListOptions{FieldSelector: "metadata.name=" + cmName} + k8w, err := w.k8sCli.CoreV1().ConfigMaps(configMapNamespace).Watch(context.TODO(), selector) + if err != nil { + klog.Errorf("Failed to watch group ConfigMap (%q): %v", cmName, err) + return + } + + w.groupConfigMapWatch = k8w + klog.Info("start watching ConfigMap " + cmName) + + go func(ev <-chan k8swatch.Event, group string) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + cm := e.Object.(*core_v1.ConfigMap) + klog.Infof("group ConfigMap (%s) is updated", cm.Name) + w.setGroupConfig(&cm.Data) + case k8swatch.Deleted: + cm := e.Object.(*core_v1.ConfigMap) + klog.Infof("group ConfigMap (%s) is deleted", cm.Name) + w.setGroupConfig(nil) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems group configMap watcher is closed, going to restart ...") + w.watchGroupConfigMap(group) + klog.Warning("group configMap watcher is restarted") + + }(k8w.ResultChan(), group) +} + +func (w *watcher) watchPods() { + // watch Pods in all namespace + k8w, err := w.k8sCli.CoreV1().Pods(meta_v1.NamespaceAll).Watch(context.TODO(), meta_v1.ListOptions{}) + if err != nil { + klog.Errorf("watch Pods in all namespace failed") + return + } + + go func(ev <-chan k8swatch.Event) { + for e := range ev { + switch e.Type { + case k8swatch.Added, k8swatch.Modified: + klog.Infof("pod (%s) is updated", e.Object.(*core_v1.Pod).Name) + if rcgroup, ok := e.Object.(*core_v1.Pod).Labels["rcgroup"]; ok { + klog.Infof("Pod: %s; rcgroup: %s", string(e.Object.(*core_v1.Pod).UID), rcgroup) + assignControlGroup(string(e.Object.(*core_v1.Pod).UID), rcgroup) + } + + case k8swatch.Deleted: + klog.Info("a pod is deleted " + e.Object.(*core_v1.Pod).UID) + default: + klog.Info("other event type") + } + } + + klog.Warning("seems pod watcher is closed, going to restart ...") + w.watchPods() + klog.Warning("pod watcher is restarted") + + }(k8w.ResultChan()) +} + +func (w *watcher) start() { + klog.Info("starting agent watcher ...") + if nodeName == "" { + klog.Warning("node name not set, NODE_NAME env variable should be set to match the name of this k8s Node") + return + } + + if !getNumclosids() { + klog.Errorf("get num_closids failed, please ensure resctrl fs is mounted") + return + } + + if !getCbmmask() { + klog.Errorf("get cbm_mask failed, please ensure resctrl fs is mounted") + return + } + + w.watchNodeConfigMap() + w.watchGroupConfigMap("") + w.watchNode() + + if direct { + w.watchPods() + } +} + +// applyConfig applies the current configuration. +func (w *watcher) applyConfig() { + klog.Info("apply configuration") + + config := w.groupCfg + + if w.nodeCfg != nil { + config = w.nodeCfg + } + + if config == nil { + klog.Warning("There is no configuration") + } + + applyConfig(config) +} + +// set node-specific configuration +func (w *watcher) setNodeConfig(data *map[string]string) { + w.Lock() + defer w.Unlock() + + w.nodeCfg = (*configData)(data) + w.applyConfig() +} + +// set group-specific or default configuration +func (w *watcher) setGroupConfig(data *map[string]string) { + w.Lock() + defer w.Unlock() + + w.groupCfg = (*configData)(data) + + if w.nodeCfg == nil { + w.applyConfig() + } +}