diff --git a/pkg/common/fault_code.go b/pkg/common/fault_code.go index 6d5997727719e4433af670a17ffc3753bbcfe949..a6bd60074a85b113f773bbd234a51e7b68c0dd6a 100644 --- a/pkg/common/fault_code.go +++ b/pkg/common/fault_code.go @@ -89,6 +89,8 @@ var ( devFaultInfoMapLock sync.Mutex // SubscribeFailed subscribe failed flag SubscribeFailed bool + // Synchronize used for synchronizing the fault cache between the main process and the grace tolerance coroutines + Synchronize bool // manuallySeparateNpuMapLock operate manuallySeparateNpuMap lock manuallySeparateNpuMapLock sync.Mutex // manuallySeparateNpuMap manually separate npu info cache diff --git a/pkg/device/ascend910.go b/pkg/device/ascend910.go index 8c4745228f04d239ac1c76ffe339a6bcdc7f912b..e119de84968ca3f4e329b94912b2350e732f6a71 100644 --- a/pkg/device/ascend910.go +++ b/pkg/device/ascend910.go @@ -32,9 +32,10 @@ import ( ) const ( - networkDetectOK = uint32(0) - networkDetectInit = uint32(6) - podDevStatusAnnotation = "podDevStatus" + networkDetectOK = uint32(0) + networkDetectInit = uint32(6) + synchronizeWaitMagnification = 3 + podDevStatusAnnotation = "podDevStatus" ) var ( @@ -117,7 +118,7 @@ func (hnm *HwAscend910Manager) GraceTolerance(classifyDevs map[string][]*common. return } // 2. performs graceful fault tolerance for tasks to be processed based on the device information in the cache - if err := hnm.processAllTask(); err != nil { + if err := hnm.processAllTask(classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to process task, err: %#v", err) return } @@ -497,7 +498,7 @@ func (hnm *HwAscend910Manager) refreshNormalPodAnnotation(taskName string) { hwlog.RunLog.Info("normal pod refresh annotation success") } -func (hnm *HwAscend910Manager) processAllTask() error { +func (hnm *HwAscend910Manager) processAllTask(classifyDevs map[string][]*common.NpuDevice) error { taskDevFaultInfoList := hnm.hotResetManager.GetAllTaskDevFaultInfoList() for taskName := range taskDevFaultInfoList { policy, policyLevel, err := hnm.hotResetManager.GetTaskProcessPolicy(taskName) @@ -505,48 +506,67 @@ func (hnm *HwAscend910Manager) processAllTask() error { hwlog.RunLog.Errorf("failed to get task %s process policy, err: %#v", taskName, err) continue } - switch policyLevel { - case common.RestartErrorLevel, common.ResetErrorLevel, common.RestartRequestErrorLevel: - hwlog.RunLog.Debugf("start handle fault: %s - %d, task name: %s", policy, policyLevel, taskName) - default: - hnm.refreshNormalPodAnnotation(taskName) + if hnm.policyLevelHandle(policy, taskName, policyLevel) { continue } - if resetFlag, err := hnm.isTaskInReset(taskName); err != nil || resetFlag { - if resetFlag && !hnm.hotResetManager.IsCurNodeTaskInReset(taskName) && - hnm.hotResetManager.IsExistFaultyDevInTask(taskName) { - hwlog.RunLog.Infof("task %s is in reset process which is not performed on current node. NPU"+ - " faults occurred on current node, the task's process policy will be marked as isolate", taskName) - go hnm.tryWriteIsolationInfo(taskName) - } + if hnm.isolateSceneHandle(taskName) { continue } + resetInfo, err := hnm.preProcess(taskName, policy) if err != nil { return err } - if err = hnm.runProcessTask(taskName, policyLevel, resetInfo); err != nil { + if err = hnm.runProcessTask(taskName, policyLevel, resetInfo, classifyDevs); err != nil { return err } } return nil } -func (hnm *HwAscend910Manager) runProcessTask(taskName string, policyLevel int, resetInfo *common.TaskResetInfo) error { +func (hnm *HwAscend910Manager) policyLevelHandle(handlePolicy, handleTaskName string, handlePolicyLevel int) bool { + switch handlePolicyLevel { + case common.RestartErrorLevel, common.ResetErrorLevel, common.RestartRequestErrorLevel: + hwlog.RunLog.Debugf("start handle fault: %s - %d, task name: %s", handlePolicy, + handlePolicyLevel, handleTaskName) + default: + hnm.refreshNormalPodAnnotation(handleTaskName) + return true + } + + return false +} + +func (hnm *HwAscend910Manager) isolateSceneHandle(handleTaskName string) bool { + if resetFlag, err := hnm.isTaskInReset(handleTaskName); err != nil || resetFlag { + if resetFlag && !hnm.hotResetManager.IsCurNodeTaskInReset(handleTaskName) && + hnm.hotResetManager.IsExistFaultyDevInTask(handleTaskName) { + hwlog.RunLog.Infof("task %s is in reset process which is not performed on current node. NPU"+ + " faults occurred on current node, the task's process policy will be marked as isolate", handleTaskName) + go hnm.tryWriteIsolationInfo(handleTaskName) + } + return true + } + return false +} + +func (hnm *HwAscend910Manager) runProcessTask(taskName string, policyLevel int, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) error { switch policyLevel { case common.RestartRequestErrorLevel: - go hnm.restartRequestProcess(taskName, resetInfo) + go hnm.restartRequestProcess(taskName, resetInfo, classifyDevs) case common.RestartErrorLevel: - go hnm.restartProcess(taskName, resetInfo) + go hnm.restartProcess(taskName, resetInfo, classifyDevs) case common.ResetErrorLevel: - go hnm.resetProcess(taskName, resetInfo) + go hnm.resetProcess(taskName, resetInfo, classifyDevs) default: return fmt.Errorf("invalid processing policy") } return nil } -func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to unset device in reset, err %v", err) @@ -562,11 +582,11 @@ func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) // wait L2 fault to self-healing time.Sleep(common.WaitFlushingCMTime * time.Second) - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err %v", err) return } - currentPolicy, err := hnm.upgradeRestartRequestProcess(taskName, devFaultInfoList) + currentPolicy, err := hnm.upgradeRestartRequestProcess(taskName, devFaultInfoList, classifyDevs) if err != nil { hwlog.RunLog.Errorf("failed to exec upgrade reset process, err: %v", err) return @@ -586,7 +606,8 @@ func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo return } -func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to unset device in reset, err %v", err) @@ -601,11 +622,11 @@ func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common common.RecordFaultInfoList(devFaultInfoList) devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) time.Sleep(common.WaitFlushingCMTime * time.Second) - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err %v", err) return } - currentPolicy, err := hnm.upgradeRestartProcess(taskName, devFaultInfoList) + currentPolicy, err := hnm.upgradeRestartProcess(taskName, devFaultInfoList, classifyDevs) if err != nil { hwlog.RunLog.Errorf("failed to exec upgrade restart process, err: %v", err) return @@ -623,7 +644,8 @@ func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common } // upgradeRestartProcess upgrade the device restart processing to the device reset processing -func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultInfoList []*common.TaskDevInfo) (string, +func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultInfoList []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) (string, error) { restartFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartErrorLevel) if err != nil { @@ -640,7 +662,7 @@ func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultIn hwlog.RunLog.Errorf("failed to update reset cm to recover failed status, err: %v", err) return "", err } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { return "", err } resultFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartErrorLevel) @@ -662,7 +684,7 @@ func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultIn // upgradeRestartProcess upgrade the device restart processing to the device reset processing func (hnm *HwAscend910Manager) upgradeRestartRequestProcess(taskName string, - devFaultInfoList []*common.TaskDevInfo) (string, error) { + devFaultInfoList []*common.TaskDevInfo, classifyDevs map[string][]*common.NpuDevice) (string, error) { faultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartRequestErrorLevel) if err != nil { @@ -679,7 +701,7 @@ func (hnm *HwAscend910Manager) upgradeRestartRequestProcess(taskName string, hwlog.RunLog.Errorf("failed to update reset cm to ResetError, err: %v", err) return "", err } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { return "", err } resultFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, @@ -726,7 +748,8 @@ func (hnm *HwAscend910Manager) updateResetCMStatus(taskName, policy, initPolicy, return nil } -func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to exec post process, err: %v", err) @@ -741,7 +764,7 @@ func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.T common.RecordFaultInfoList(devFaultInfoList) devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) time.Sleep(common.WaitFlushingCMTime * time.Second) - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to reset device, err: %v", err) return } @@ -839,20 +862,40 @@ func (hnm *HwAscend910Manager) postProcess(taskName string, resetInfo *common.Ta hwlog.RunLog.Infof("grace tolerance process complete, task name: %s", taskName) return nil } -func (hnm *HwAscend910Manager) refreshDevFaultInfo(devFaultInfo []*common.TaskDevInfo) error { - for _, devInfo := range devFaultInfo { - _, errorCode, err := hnm.GetDmgr().GetDeviceAllErrorCode(devInfo.LogicId) - if err != nil { - hwlog.RunLog.Errorf("failed to get err code of device %d", devInfo.LogicId) - return err + +func (hnm *HwAscend910Manager) refreshDevFaultInfo(devFaultInfo []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) error { + devStatusList, ok := classifyDevs[common.Ascend910] + if !ok { + return fmt.Errorf("not found %s device type in %v", common.Ascend910, devStatusList) + } + + common.Synchronize = false + // wait for main process to update the fault cache info. The coroutine performs grace tolerance processing based on + // the updated fault cache info of the main process + if err := wait.PollImmediate(time.Second, time.Duration( + synchronizeWaitMagnification*common.ParamOption.ListAndWatchPeriod)*time.Second, func() (bool, error) { + if !common.Synchronize { + hwlog.RunLog.Debug("failed to synchronize the fault cache of the main process, will retry again") + return false, nil + } + for _, npuDevice := range devStatusList { + devFaultInfo[npuDevice.LogicID].ErrorCode = npuDevice.FaultCodes + devFaultInfo[npuDevice.LogicID].Policy = hnm.hotResetManager. + GetDevProcessPolicy(common.GetFaultType(npuDevice.FaultCodes, npuDevice.LogicID)) + hwlog.RunLog.Infof("refresh device fault info, device %d, policy %s, err code: %v", npuDevice.LogicID, + devFaultInfo[npuDevice.LogicID].Policy, devFaultInfo[npuDevice.LogicID].ErrorCode) } - devInfo.Policy = hnm.hotResetManager.GetDevProcessPolicy(common.GetFaultType(errorCode, devInfo.LogicId)) - devInfo.ErrorCode = errorCode + return true, nil + }); err != nil { + return fmt.Errorf("synchronize the fault cache of the main process timeout: %v", err) } + return nil } -func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDevInfo) error { +func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) error { resetFaultInfoMap, err := hnm.hotResetManager.GetNeedResetDevMap(devFaultInfoList) if err != nil { hwlog.RunLog.Errorf("failed to get need reset device list, err: %v", err) @@ -865,7 +908,7 @@ func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDe for _, devInfo := range devFaultInfoList { common.SetDeviceInit(devInfo.LogicId) } - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err: %v", err) return err } diff --git a/pkg/device/ascend910_test.go b/pkg/device/ascend910_test.go index 8735d1aaeec34bd48ad3e1bb5f8d48cbd003a836..b01bb25e1c136b3c8bf4f9532575c44e3f944d93 100644 --- a/pkg/device/ascend910_test.go +++ b/pkg/device/ascend910_test.go @@ -175,7 +175,7 @@ func TestProcessAllTask(t *testing.T) { common.IsolateError: common.IsolateErrorLevel, }, } - err := manager.processAllTask() + err := manager.processAllTask(mockGroupDevice()) convey.So(err, convey.ShouldBeNil) }) } diff --git a/pkg/server/manager.go b/pkg/server/manager.go index 09c5ece9dc45c0f07abe22238bab6c06e2ed1753..9b5ca34ccf7d296b0540afed7ce7e47b15466f33 100644 --- a/pkg/server/manager.go +++ b/pkg/server/manager.go @@ -314,6 +314,7 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { common.DelOnceRecoverFault(hdm.groupDevice) common.DelOnceFrequencyFault() common.UnlockAllDeviceInfo() + common.Synchronize = true } } }