diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 64dcf10e09ab96f1445d032fd2249c7dedbf0ec3..3e8e0570c578f3b1c7b1df9e4b7fe6df8daf3037 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -765,4 +765,8 @@ const ( const ( NPUNormalStatus = "normal" NPUResettingStatus = "resetting" + // UpdateAnnotationRetryTimes update annotation retry times + UpdateAnnotationRetryTimes = 3 + // SubHealthyAnnotationKey sub-healthy annotation key on node + SubHealthyAnnotationKey = "subHealthy" ) diff --git a/pkg/device/ascend310_test.go b/pkg/device/ascend310_test.go index cf372604af5457078fcac7dd969dbdfc70071af0..8eaeb2e70340010b75cf1702c78c3b3cfabc2528 100644 --- a/pkg/device/ascend310_test.go +++ b/pkg/device/ascend310_test.go @@ -79,7 +79,10 @@ func TestDoWithVolcanoListAndWatch310(t *testing.T) { allInfo, err := manager.GetNPUs() convey.So(err, convey.ShouldBeNil) groupDevice := ClassifyDevices(allInfo.AllDevs, allInfo.AllDevTypes) - + mockAnnotation := gomonkey.ApplyPrivateMethod(reflect.TypeOf(new(AscendTools)), + "annotateWithSubHealthy", func(_ common.DevStatusSet) { + return + }) mockGetPodsUsedNpu := gomonkey.ApplyMethod(reflect.TypeOf(new(kubeclient.ClientK8s)), "GetPodsUsedNpu", func(_ *kubeclient.ClientK8s) sets.String { return nil @@ -106,6 +109,7 @@ func TestDoWithVolcanoListAndWatch310(t *testing.T) { mockGetPodsUsedNpu.Reset() mockGetConfigMap.Reset() mockCreateConfigMap.Reset() + mockAnnotation.Reset() }() manager.client.SetNodeDeviceInfoCache(createFakeDeviceInfo()) manager.DoWithVolcanoListAndWatch(groupDevice) diff --git a/pkg/device/ascend310p_test.go b/pkg/device/ascend310p_test.go index bab53b2ec4ccebb435da82517d381a2e29acf3ce..683658db08597a710988ba813032552e09fc81d9 100644 --- a/pkg/device/ascend310p_test.go +++ b/pkg/device/ascend310p_test.go @@ -55,7 +55,10 @@ func TestDoWithVolcanoListAndWatch310p(t *testing.T) { allInfo, err := manager.GetNPUs() convey.So(err, convey.ShouldBeNil) groupDevice := ClassifyDevices(allInfo.AllDevs, allInfo.AllDevTypes) - + mockAnnotation := gomonkey.ApplyPrivateMethod(reflect.TypeOf(new(AscendTools)), + "annotateWithSubHealthy", func(_ common.DevStatusSet) { + return + }) mockGetPodsUsedNpu := gomonkey.ApplyMethod(reflect.TypeOf(new(kubeclient.ClientK8s)), "GetPodsUsedNpu", func(_ *kubeclient.ClientK8s) sets.String { return nil @@ -81,6 +84,7 @@ func TestDoWithVolcanoListAndWatch310p(t *testing.T) { mockGetPodsUsedNpu.Reset() mockGetConfigMap.Reset() mockCreateConfigMap.Reset() + mockAnnotation.Reset() }() manager.client.SetNodeDeviceInfoCache(createFakeDeviceInfo()) manager.DoWithVolcanoListAndWatch(groupDevice) diff --git a/pkg/device/ascend910_test.go b/pkg/device/ascend910_test.go index 9241d89ae0da36c6818ddabe56fd0ab2929258af..7799a6f6fc707677654aeaf11fe1ea961dec148a 100644 --- a/pkg/device/ascend910_test.go +++ b/pkg/device/ascend910_test.go @@ -89,7 +89,10 @@ func TestDoWithVolcanoListAndWatch910(t *testing.T) { allInfo, err := manager.GetNPUs() convey.So(err, convey.ShouldBeNil) groupDevice := ClassifyDevices(allInfo.AllDevs, allInfo.AllDevTypes) - + mockAnnotation := gomonkey.ApplyPrivateMethod(reflect.TypeOf(new(AscendTools)), + "annotateWithSubHealthy", func(_ common.DevStatusSet) { + return + }) mockGetPodsUsedNpu := gomonkey.ApplyMethod(reflect.TypeOf(new(kubeclient.ClientK8s)), "GetPodsUsedNpu", func(_ *kubeclient.ClientK8s) sets.String { return nil @@ -125,6 +128,7 @@ func TestDoWithVolcanoListAndWatch910(t *testing.T) { mockPatchNodeState.Reset() mockCreateConfigMap.Reset() mockNodeBack.Reset() + mockAnnotation.Reset() }() manager.client.SetNodeDeviceInfoCache(createFakeDeviceInfo()) manager.DoWithVolcanoListAndWatch(groupDevice) diff --git a/pkg/device/ascendcommon.go b/pkg/device/ascendcommon.go index 25132bd0b74f0bc532c4be94829ff63a6da701f8..86d4eb14d426e52a297aaf0fd637934c61a47c12 100644 --- a/pkg/device/ascendcommon.go +++ b/pkg/device/ascendcommon.go @@ -46,6 +46,8 @@ var ( faultMode = make(map[int32]string, common.GeneralMapSize) lastCheckNodeLabel int64 useIpv4 = true + preSubHealthy = false + firstUpdate = true ) const ( @@ -229,6 +231,8 @@ func (tool *AscendTools) UpdateNodeDeviceInfo(devStatusSet common.DevStatusSet, } tool.delVirDevInfo(newDeviceList) + tool.annotateWithSubHealthy(devStatusSet) + manuallySeparateNPU := tool.handleManuallySeparateNPUFaultInfo() // if subscribe failed, will use get interface if common.SwitchSubscribeFailed && common.ParamOption.EnableSwitchFault { @@ -252,6 +256,26 @@ func (tool *AscendTools) UpdateNodeDeviceInfo(devStatusSet common.DevStatusSet, return waitErr } +func (tool *AscendTools) annotateWithSubHealthy(devStatusSet common.DevStatusSet) { + curSubHealthyStatus := false + for _, df := range devStatusSet.DeviceFault { + if df.FaultLevel == common.PreSeparateNPU { + curSubHealthyStatus = true + break + } + } + if firstUpdate || preSubHealthy != curSubHealthyStatus { + preSubHealthy = curSubHealthyStatus + err := tool.client.UpdateNodeAnnotation(common.SubHealthyAnnotationKey, + strconv.FormatBool(curSubHealthyStatus), common.UpdateAnnotationRetryTimes) + if err == nil { + firstUpdate = false + } else { + hwlog.RunLog.Warnf("update node annotation failed, err: %v", err) + } + } +} + func (tool *AscendTools) delVirDevInfo(newDeviceList map[string]string) { for annotationTag := range common.GetAllDeviceInfoTypeList() { if _, ok := newDeviceList[annotationTag]; !ok { @@ -729,8 +753,8 @@ func (tool *AscendTools) isHealthy(device *common.NpuDevice) string { tool.npuIsUsedNow(device.DeviceName) && common.ParamOption.GraceToleranceOn == true) { return v1beta1.Healthy } - if faultType == common.PreSeparateNPU && tool.npuIsUsedNow(device.DeviceName) { - hwlog.RunLog.Infof("detect %s but device is in use, device name: %s", faultType, device.DeviceName) + if faultType == common.PreSeparateNPU { + hwlog.RunLog.Infof("detect %s, device name: %s", faultType, device.DeviceName) return v1beta1.Healthy } return v1beta1.Unhealthy diff --git a/pkg/kubeclient/kubeclient.go b/pkg/kubeclient/kubeclient.go index 53ee9d64d5cdaee47bc9265f45b15460d7d67f2d..e367d77b6bb4efe5211b5ba44c5fa559736515d5 100644 --- a/pkg/kubeclient/kubeclient.go +++ b/pkg/kubeclient/kubeclient.go @@ -17,10 +17,12 @@ package kubeclient import ( "context" + "errors" "fmt" "os" "reflect" "strings" + "time" "huawei.com/npu-exporter/v6/common-utils/hwlog" "k8s.io/api/core/v1" @@ -335,3 +337,31 @@ func (ki *ClientK8s) ResourceEventHandler(res ResourceType, filter func(obj inte func (ki *ClientK8s) FlushPodCacheNextQuerying() { ki.IsApiErr = true } + +func (ki *ClientK8s) UpdateNodeAnnotation(key, value string, retryTimes int) error { + if ki == nil { + return errors.New("ClientK8s is nil") + } + node, err := ki.GetNode() + retry := 0 + for err != nil && retry < retryTimes { + node, err = ki.GetNode() + retry++ + time.Sleep(time.Duration(retry) * time.Second) + } + if err != nil { + return err + } + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[key] = value + _, err = ki.Clientset.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) + retry = 0 + for err != nil && retry < retryTimes { + _, err = ki.Clientset.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) + retry++ + time.Sleep(time.Duration(retry) * time.Second) + } + return err +}