diff --git a/main.go b/main.go index b0cbc2c8b11c48cc1c485b5d934231c9b3d112ef..03230a10d4b18ed183ef9946249d6b17d77bada1 100644 --- a/main.go +++ b/main.go @@ -81,6 +81,7 @@ var ( ", range [1, 30]") dealWatchHandler = flag.Bool("dealWatchHandler", false, "update pod cache when receiving pod informer watch errors") + checkCachedPods = flag.Bool("checkCachedPods", true, "check pods in cache periodically, default true") ) var ( @@ -234,6 +235,7 @@ func setParameters() { ShareCount: *shareDevCount, LinkdownTimeout: *linkdownTimeout, DealWatchHandler: *dealWatchHandler, + CheckCachedPods: *checkCachedPods, } } diff --git a/pkg/common/proto.go b/pkg/common/proto.go index 56b85ef515f8c5b5ad880b8fe9345032293d7ae0..15653d78fd60150b5f3247756093b1276de1a15e 100644 --- a/pkg/common/proto.go +++ b/pkg/common/proto.go @@ -121,6 +121,7 @@ type Option struct { LinkdownTimeout int64 // linkdown timeout duration DealWatchHandler bool // update pod cache when receiving pod informer watch errors EnableSwitchFault bool // if enable switch fault + CheckCachedPods bool // check cached pods periodically } // GetAllDeviceInfoTypeList Get All Device Info Type List diff --git a/pkg/kubeclient/client_server.go b/pkg/kubeclient/client_server.go index 85da291d20e3e2a58c24707e950cad7ab0309bb7..40cf479f02a6c2e5d4c62f0addb03d1a2655f4eb 100644 --- a/pkg/kubeclient/client_server.go +++ b/pkg/kubeclient/client_server.go @@ -86,10 +86,10 @@ func (ki *ClientK8s) TryUpdatePodCacheAnnotation(pod *v1.Pod, annotation map[str // update cache lock.Lock() defer lock.Unlock() - for i, podInCache := range podList { + for i, podInCache := range podCache { if podInCache.Namespace == pod.Namespace && podInCache.Name == pod.Name { for k, v := range annotation { - podList[i].Annotations[k] = v + podCache[i].Annotations[k] = v } hwlog.RunLog.Debugf("update annotation in pod cache success, name: %s, namespace: %s", pod.Name, pod.Namespace) return nil diff --git a/pkg/kubeclient/client_server_test.go b/pkg/kubeclient/client_server_test.go index 8e2dc861d6a6278ce502dace9f432d6f23b5f6a6..7f0cfc37b66fa5a39d8bb459f913df3b1924c33d 100644 --- a/pkg/kubeclient/client_server_test.go +++ b/pkg/kubeclient/client_server_test.go @@ -185,14 +185,10 @@ func TestTryUpdatePodAnnotation(t *testing.T) { }) defer mockPatchPod.Reset() convey.Convey("try update pod annotation when get pod is nil", t, func() { - mockGetPod := mockGetPodOpr(v1.Pod{}) - defer mockGetPod.Reset() err := utKubeClient.TryUpdatePodAnnotation(testPod, getDeviceInfo(common.HuaweiAscend310P, npuChip310PPhyID0)) convey.So(err.Error(), convey.ShouldEqual, "patch pod annotation failed, exceeded max number of retries") }) convey.Convey("try update pod annotation when get pod is not nil", t, func() { - mockGetPod := mockGetPodOpr(*testPod) - defer mockGetPod.Reset() err := utKubeClient.TryUpdatePodAnnotation(testPod, getDeviceInfo(common.HuaweiAscend310P, npuChip310PPhyID0)) convey.So(err.Error(), convey.ShouldEqual, "patch pod annotation failed, exceeded max number of retries") @@ -357,14 +353,6 @@ func mockCMOpr(updateCM *v1.ConfigMap) (*gomonkey.Patches, *gomonkey.Patches) { return mockCreateCM, mockUpdateCM } -func mockGetPodOpr(mockPod v1.Pod) *gomonkey.Patches { - mockGetPod := gomonkey.ApplyMethod(reflect.TypeOf(new(ClientK8s)), "GetPodCache", - func(_ *ClientK8s, _, _ string) v1.Pod { - return mockPod - }) - return mockGetPod -} - func resetMock(resetMockList ...*gomonkey.Patches) { for _, resetMock := range resetMockList { resetMock.Reset() diff --git a/pkg/kubeclient/kube_cache.go b/pkg/kubeclient/kube_cache.go index 7bf6fa04520022af2f0c1e3ff303026ca126764d..d1059c188e131030bd54001b9e3896c64b7400aa 100644 --- a/pkg/kubeclient/kube_cache.go +++ b/pkg/kubeclient/kube_cache.go @@ -16,22 +16,94 @@ package kubeclient import ( + "context" + "hash/fnv" "sync" + "time" "huawei.com/npu-exporter/v6/common-utils/hwlog" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "Ascend-device-plugin/pkg/common" ) var ( - podList []v1.Pod - lock sync.Mutex + podCache = map[types.UID]*podInfo{} + lock = sync.Mutex{} nodeServerIp string serverUsageLabel string nodeDeviceInfoCache *common.NodeDeviceInfoCache ) +type podInfo struct { + *v1.Pod + updateTime time.Time +} + +const ( + timeIntervalForCheckPod = 10 * time.Minute + periodicForStartCheck = 600 + podCacheTimeout = time.Hour +) + +// PodInformerInspector check pod in cache +func (ki *ClientK8s) PodInformerInspector(ctx context.Context) { + hashVal := fnv.New32() + if _, err := hashVal.Write([]byte(ki.NodeName)); err != nil { + hwlog.RunLog.Errorf("failed to write nodeName to hash, err: %v", err) + return + } + + val := hashVal.Sum32() % periodicForStartCheck + hwlog.RunLog.Infof("after %d second, pod informer inspector will start", val) + time.Sleep(time.Duration(val) * time.Second) + wait.Until(func() { ki.checkPodInCache(ctx) }, timeIntervalForCheckPod, ctx.Done()) +} + +func (ki *ClientK8s) checkPodInCache(ctx context.Context) { + lock.Lock() + defer lock.Unlock() + needDelete := make([]types.UID, 0) + needRefesh := make([]types.UID, 0) + for uid, pi := range podCache { + hwlog.RunLog.Debugf("check pod(%s/%s) in cache, updateTime: %v, now: %v", pi.Namespace, pi.Name, + pi.updateTime.Format(time.DateTime), time.Now().Format(time.DateTime)) + if time.Since(pi.updateTime) < podCacheTimeout { + continue + } + pod, err := ki.getPod(ctx, pi.Namespace, pi.Name) + if err != nil { + if errors.IsNotFound(err) { + hwlog.RunLog.Infof("delete pod(%s/%s) from cache", pi.Namespace, pi.Name) + needDelete = append(needDelete, uid) + continue + } + hwlog.RunLog.Errorf("failed to get pod %s/%s, err: %v", pi.Pod.Namespace, pi.Pod.Name, err) + continue + } + if pod.Spec.NodeName != ki.NodeName || pod.UID != uid { + hwlog.RunLog.Infof("delete pod(%s/%s) from cache", pod.Namespace, pod.Name) + needDelete = append(needDelete, uid) + continue + } + needRefesh = append(needRefesh, uid) + } + for _, uid := range needDelete { + delete(podCache, uid) + } + for _, uid := range needRefesh { + podCache[uid].updateTime = time.Now() + } +} + +func (ki *ClientK8s) getPod(ctx context.Context, namespace, name string) (*v1.Pod, error) { + return ki.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) +} + // UpdatePodList update pod list by informer func UpdatePodList(oldObj, newObj interface{}, operator EventType) { newPod, ok := newObj.(*v1.Pod) @@ -41,111 +113,81 @@ func UpdatePodList(oldObj, newObj interface{}, operator EventType) { lock.Lock() defer lock.Unlock() switch operator { - case EventTypeAdd: - podList = append(podList, *newPod) - case EventTypeDelete: - deletePod(newPod) - case EventTypeUpdate: - oldPod, ok := oldObj.(*v1.Pod) - if !ok { - return - } - for i, localPod := range podList { - if localPod.UID == oldPod.UID { - podList[i] = *newPod - return - } + case EventTypeAdd, EventTypeUpdate: + hwlog.RunLog.Infof("pod(%s/%s) is %s to cache", newPod.Namespace, newPod.Name, operator) + podCache[newPod.UID] = &podInfo{ + Pod: newPod, + updateTime: time.Now(), } - podList = append(podList, *newPod) - hwlog.RunLog.Infof("pod %s update failed, use add", newPod.Name) + case EventTypeDelete: + hwlog.RunLog.Infof("pod(%s/%s) is deleted from cache", newPod.Namespace, newPod.Name) + delete(podCache, newPod.UID) default: hwlog.RunLog.Errorf("operator is undefined, find operater: %s", operator) } } -func deletePod(newPod *v1.Pod) { - index := -1 - for i, localPod := range podList { - if localPod.Namespace == newPod.Namespace && localPod.Name == newPod.Name { - index = i - break - } - } - if index != -1 { - podList = append(podList[:index], podList[index+1:]...) +func (ki *ClientK8s) refreshPodList() { + newV1PodList, err := ki.GetAllPodList() + if err != nil { + hwlog.RunLog.Errorf("get pod list from api-server failed: %v", err) return } - hwlog.RunLog.Infof("pod delete failed, pod %s already delete", newPod.Name) + newPodCache := map[types.UID]*podInfo{} + for _, pod := range newV1PodList.Items { + newPodCache[pod.UID] = &podInfo{ + Pod: &pod, + updateTime: time.Now(), + } + } + lock.Lock() + podCache = newPodCache + lock.Unlock() + ki.IsApiErr = false + hwlog.RunLog.Info("get new pod list success") } // GetAllPodListCache get pod list by field selector with cache, func (ki *ClientK8s) GetAllPodListCache() []v1.Pod { if ki.IsApiErr { - newV1PodList, err := ki.GetAllPodList() - if err != nil { - hwlog.RunLog.Errorf("get pod list from api-server failed: %v", err) - return podList - } - podList = newV1PodList.Items - ki.IsApiErr = false - hwlog.RunLog.Info("get new pod list success") + ki.refreshPodList() } + pods := make([]v1.Pod, 0, len(podCache)) + lock.Lock() + defer lock.Unlock() - return podList + for _, pi := range podCache { + pods = append(pods, *pi.Pod) + } + return pods } // GetActivePodListCache is to get active pod list with cache func (ki *ClientK8s) GetActivePodListCache() []v1.Pod { - if len(podList) == 0 { - return []v1.Pod{} - } - newPodList := make([]v1.Pod, 0, common.GeneralMapSize) - lock.Lock() - defer lock.Unlock() - if ki.IsApiErr { - newV1PodList, err := ki.GetAllPodList() - if err != nil { - hwlog.RunLog.Errorf("get pod list from api-server failed: %v", err) - } else { - podList = newV1PodList.Items - ki.IsApiErr = false - hwlog.RunLog.Info("get new pod list success") - } + ki.refreshPodList() } - for _, pod := range podList { - if err := common.CheckPodNameAndSpace(pod.GetName(), common.PodNameMaxLength); err != nil { + lock.Lock() + defer lock.Unlock() + newPodList := make([]v1.Pod, 0, common.GeneralMapSize) + for _, pi := range podCache { + if err := common.CheckPodNameAndSpace(pi.GetName(), common.PodNameMaxLength); err != nil { hwlog.RunLog.Warnf("pod name syntax illegal, err: %v", err) continue } - if err := common.CheckPodNameAndSpace(pod.GetNamespace(), common.PodNameSpaceMaxLength); err != nil { + if err := common.CheckPodNameAndSpace(pi.GetNamespace(), common.PodNameSpaceMaxLength); err != nil { hwlog.RunLog.Warnf("pod namespace syntax illegal, err: %v", err) continue } - if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { + if pi.Status.Phase == v1.PodFailed || pi.Status.Phase == v1.PodSucceeded { continue } - newPodList = append(newPodList, pod) + newPodList = append(newPodList, *pi.Pod) } return newPodList } -// GetPodCache get pod by namespace and name with cache -func (ki *ClientK8s) GetPodCache(namespace, name string) v1.Pod { - if len(podList) == 0 { - return v1.Pod{} - } - lock.Lock() - defer lock.Unlock() - for _, pod := range podList { - if pod.Namespace == namespace && pod.Name == name { - return pod - } - } - return v1.Pod{} -} - // GetNodeServerIDCache Get Node Server ID with cache func (ki *ClientK8s) GetNodeServerIDCache() (string, error) { if nodeServerIp != "" { diff --git a/pkg/kubeclient/kube_cache_test.go b/pkg/kubeclient/kube_cache_test.go index f1445214472013e438c298eda6d97027f412c12c..16ed9a5cac538904588d5804894f079a6f65a4ce 100644 --- a/pkg/kubeclient/kube_cache_test.go +++ b/pkg/kubeclient/kube_cache_test.go @@ -15,13 +15,16 @@ package kubeclient import ( + "context" "fmt" "testing" + "time" "github.com/agiledragon/gomonkey/v2" "github.com/smartystreets/goconvey/convey" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "Ascend-device-plugin/pkg/common" @@ -73,3 +76,84 @@ func TestGetA800IA2Label(t *testing.T) { convey.So(err, convey.ShouldBeNil) }) } + +// TestGetServerUsageLabelCache01 test case for get pod has timeout +func TestCheckPodInCache01(t *testing.T) { + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "xxxxxxxxx1", + Namespace: "default", + Name: "pod1", + }, + } + patch := gomonkey. + ApplyFuncReturn(NewClientK8s, &ClientK8s{ + Clientset: &kubernetes.Clientset{}, + NodeName: "node", + DeviceInfoName: common.DeviceInfoCMNamePrefix + "node", + IsApiErr: false, + }, nil) + defer patch.Reset() + + patch1 := gomonkey.ApplyPrivateMethod(&ClientK8s{}, "getPod", func(_ *ClientK8s, + _ context.Context, _, _ string) (*v1.PodList, error) { + return &v1.PodList{Items: []v1.Pod{*pod1}}, nil + }) + defer patch1.Reset() + convey.Convey("test check pod in cache", t, func() { + client, _ := NewClientK8s() + pod1UpdateTime := time.Now().Add(-time.Hour).Add(-time.Minute) + expectNewPodCache := map[types.UID]*podInfo{} + podCache = map[types.UID]*podInfo{ + "xxxxxxxxx1": { + Pod: &v1.Pod{}, + updateTime: pod1UpdateTime, + }, + } + client.checkPodInCache(context.TODO()) + convey.ShouldEqual(podCache, expectNewPodCache) + }) +} + +// TestGetServerUsageLabelCache02 test case for get pod has not timeout +func TestCheckPodInCache02(t *testing.T) { + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "xxxxxxxxx2", + Namespace: "default", + Name: "pod2", + }, + } + patch := gomonkey. + ApplyFuncReturn(NewClientK8s, &ClientK8s{ + Clientset: &kubernetes.Clientset{}, + NodeName: "node", + DeviceInfoName: common.DeviceInfoCMNamePrefix + "node", + IsApiErr: false, + }, nil) + defer patch.Reset() + + patch1 := gomonkey.ApplyPrivateMethod(&ClientK8s{}, "getPod", func(_ *ClientK8s, + _ context.Context, _, _ string) (*v1.PodList, error) { + return &v1.PodList{Items: []v1.Pod{*pod2}}, nil + }) + defer patch1.Reset() + convey.Convey("test check pod in cache", t, func() { + client, _ := NewClientK8s() + pod2UpdateTime := time.Now().Add(-time.Minute) + expectNewPodCache := map[types.UID]*podInfo{ + "xxxxxxxxx2": { + Pod: &v1.Pod{}, + updateTime: pod2UpdateTime, + }, + } + podCache = map[types.UID]*podInfo{ + "xxxxxxxxx2": { + Pod: &v1.Pod{}, + updateTime: pod2UpdateTime, + }, + } + client.checkPodInCache(context.TODO()) + convey.ShouldEqual(podCache, expectNewPodCache) + }) +} diff --git a/pkg/server/manager.go b/pkg/server/manager.go index f5b262c277a234201736cf2685c8949ef76394b2..b72f8808630d5eb69a241a415b4152c7aea99832 100644 --- a/pkg/server/manager.go +++ b/pkg/server/manager.go @@ -353,6 +353,9 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { hdm.separateNPUIDFromDeviceInfoIntoCache() go hdm.pollFaultCodeCM(ctx) go hdm.Serve(ctx) + if common.ParamOption.CheckCachedPods { + go hdm.manager.GetKubeClient().PodInformerInspector(ctx) + } initTime := time.Now() for { select {