diff --git a/pkg/common/fault_code.go b/pkg/common/fault_code.go index a1438165635aba44b3376eb899151ef28afc3e11..66b98ea3a0567050fc2fed619462b917f8b9298c 100644 --- a/pkg/common/fault_code.go +++ b/pkg/common/fault_code.go @@ -97,6 +97,8 @@ var ( devFaultInfoMapLock sync.Mutex // SubscribeFailed subscribe failed flag SubscribeFailed bool + // Synchronize used for synchronizing the fault cache between the main process and the grace tolerance coroutines + Synchronize bool // manuallySeparateNpuMapLock operate manuallySeparateNpuMap lock manuallySeparateNpuMapLock sync.Mutex // manuallySeparateNpuMap manually separate npu info cache diff --git a/pkg/device/ascend910.go b/pkg/device/ascend910.go index b71ab231a56cb24c4d09d06be324aa53f635751b..534168bbe2f5f7e0527a18451bdef96ec0fafe94 100644 --- a/pkg/device/ascend910.go +++ b/pkg/device/ascend910.go @@ -31,12 +31,13 @@ import ( ) const ( - networkDetectOK = uint32(0) - networkDetectInit = uint32(6) - podDevStatusAnnotation = "podDevStatus" - updateResetCMFailedPattern = "failed to update reset cm to recovered status, err: %v" - unsetTaskFailedPattern = "failed to unset task in reset, err: %v" - failedToUpdateCmPattern = "failed to update reset cm to recover failed status, err: %v" + networkDetectOK = uint32(0) + networkDetectInit = uint32(6) + synchronizeWaitMagnification = 3 + podDevStatusAnnotation = "podDevStatus" + updateResetCMFailedPattern = "failed to update reset cm to recovered status, err: %v" + unsetTaskFailedPattern = "failed to unset task in reset, err: %v" + failedToUpdateCmPattern = "failed to update reset cm to recover failed status, err: %v" ) var ( @@ -129,7 +130,7 @@ func (hnm *HwAscend910Manager) GraceTolerance(classifyDevs map[string][]*common. return } // performs graceful fault tolerance for tasks to be processed based on the device information in the cache - if err := hnm.processAllTask(); err != nil { + if err := hnm.processAllTask(classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to process task, err: %#v", err) } // handling hot reset without task @@ -818,7 +819,7 @@ func (hnm *HwAscend910Manager) refreshNormalPodAnnotation(taskName string) { hwlog.RunLog.Info("normal pod refresh annotation success") } -func (hnm *HwAscend910Manager) processAllTask() error { +func (hnm *HwAscend910Manager) processAllTask(classifyDevs map[string][]*common.NpuDevice) error { taskDevFaultInfoList := hnm.hotResetManager.GetAllTaskDevFaultInfoList() for taskName := range taskDevFaultInfoList { policy, policyLevel, err := hnm.hotResetManager.GetTaskProcessPolicy(taskName) @@ -826,52 +827,71 @@ func (hnm *HwAscend910Manager) processAllTask() error { hwlog.RunLog.Errorf("failed to get task %s process policy, err: %#v", taskName, err) continue } - switch policyLevel { - case common.RestartRequestErrorLevel, common.RestartErrorLevel: - hwlog.RunLog.Debugf("start handling fault: %s - %d, task name: %s", policy, policyLevel, taskName) - case common.ResetErrorLevel: - hwlog.RunLog.Debugf("start handling fault: %s - %d, task name: %s", policy, policyLevel, taskName) - isHotResetOn = true - case common.FreeResetErrorLevel: - continue - default: + if policyLevelHandle(policy, taskName, policyLevel) { continue } - if resetFlag, err := hnm.isTaskInReset(taskName); err != nil || resetFlag { - if resetFlag && !hnm.hotResetManager.IsCurNodeTaskInReset(taskName) && - hnm.hotResetManager.IsExistFaultyDevInTask(taskName) { - hwlog.RunLog.Infof("task %s is in reset process which is not performed on current node. NPU"+ - " faults occurred on current node, the task's process policy will be marked as isolate", taskName) - go hnm.tryWriteIsolationInfo(taskName) - } + if hnm.isolateSceneHandle(taskName) { continue } resetInfo, err := hnm.preProcess(taskName, policy) if err != nil { return err } - if err = hnm.runProcessTask(taskName, policyLevel, resetInfo); err != nil { + if err = hnm.runProcessTask(taskName, policyLevel, resetInfo, classifyDevs); err != nil { return err } } return nil } -func (hnm *HwAscend910Manager) runProcessTask(taskName string, policyLevel int, resetInfo *common.TaskResetInfo) error { +func policyLevelHandle(handlePolicy, handleTaskName string, handlePolicyLevel int) bool { + switch handlePolicyLevel { + case common.RestartRequestErrorLevel, common.RestartErrorLevel: + hwlog.RunLog.Debugf("start handling fault: %s - %d, task name: %s", handlePolicy, + handlePolicyLevel, handleTaskName) + case common.ResetErrorLevel: + hwlog.RunLog.Debugf("start handling fault: %s - %d, task name: %s", handlePolicy, + handlePolicyLevel, handleTaskName) + isHotResetOn = true + case common.FreeResetErrorLevel: + return true + default: + return true + } + + return false +} + +func (hnm *HwAscend910Manager) isolateSceneHandle(handleTaskName string) bool { + if resetFlag, err := hnm.isTaskInReset(handleTaskName); err != nil || resetFlag { + if resetFlag && !hnm.hotResetManager.IsCurNodeTaskInReset(handleTaskName) && + hnm.hotResetManager.IsExistFaultyDevInTask(handleTaskName) { + hwlog.RunLog.Infof("task %s is in reset process which is not performed on current node. NPU"+ + " faults occurred on current node, the task's process policy will be marked as isolate", handleTaskName) + go hnm.tryWriteIsolationInfo(handleTaskName) + } + return true + } + return false +} + +func (hnm *HwAscend910Manager) runProcessTask(taskName string, policyLevel int, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) error { switch policyLevel { case common.RestartRequestErrorLevel: - go hnm.restartRequestProcess(taskName, resetInfo) + go hnm.restartRequestProcess(taskName, resetInfo, classifyDevs) case common.RestartErrorLevel: - go hnm.restartProcess(taskName, resetInfo) + go hnm.restartProcess(taskName, resetInfo, classifyDevs) case common.ResetErrorLevel: - go hnm.resetProcess(taskName, resetInfo) + go hnm.resetProcess(taskName, resetInfo, classifyDevs) default: return fmt.Errorf("invalid processing policy") } return nil } -func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to unset device in reset, err %v", err) @@ -886,7 +906,7 @@ func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo common.RecordFaultInfoList(devFaultInfoList) devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) - currentPolicy, needUpgrade, resetErr := hnm.checkDevErrorCode(taskName, devFaultInfoList) + currentPolicy, needUpgrade, resetErr := hnm.checkDevErrorCode(taskName, devFaultInfoList, classifyDevs) if resetErr == nil { hnm.handleSucceedRestartRequest(taskName, currentPolicy, devFaultInfoList, devFaultInfoListInReset) return @@ -895,7 +915,7 @@ func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo if !needUpgrade { return } - currentPolicy, resetErr = hnm.upgradeRestartRequestProcess(taskName, devFaultInfoList) + currentPolicy, resetErr = hnm.upgradeRestartRequestProcess(taskName, devFaultInfoList, classifyDevs) if resetErr != nil { hwlog.RunLog.Errorf("failed to exec upgrade reset process, err: %v", err) if err := hnm.updateResetCMStatus(taskName, common.IsolateError, common.RestartRequestError, @@ -923,7 +943,8 @@ func (hnm *HwAscend910Manager) handleSucceedRestartRequest(taskName, currentPoli } } -func (hnm *HwAscend910Manager) checkDevErrorCode(taskName string, devFaultInfo []*common.TaskDevInfo) (string, +func (hnm *HwAscend910Manager) checkDevErrorCode(taskName string, devFaultInfo []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) (string, bool, error) { timeOut := time.After(common.WaitErrorCodeCleanTime * time.Second) timeCost := 0 @@ -932,7 +953,7 @@ func (hnm *HwAscend910Manager) checkDevErrorCode(taskName string, devFaultInfo [ case <-timeOut: return "", true, fmt.Errorf("after %d second, there still has error code on device", common.WaitErrorCodeCleanTime) default: - if err := hnm.refreshDevFaultInfo(devFaultInfo); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfo, classifyDevs); err != nil { return "", false, err } faultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfo, @@ -951,7 +972,8 @@ func (hnm *HwAscend910Manager) checkDevErrorCode(taskName string, devFaultInfo [ } } -func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to unset device in reset, err %v", err) @@ -974,11 +996,11 @@ func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common return } time.Sleep(common.WaitFaultSelfHealingTime * time.Second) - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err %v", err) return } - currentPolicy, err := hnm.upgradeRestartProcess(taskName, devFaultInfoList) + currentPolicy, err := hnm.upgradeRestartProcess(taskName, devFaultInfoList, classifyDevs) if err != nil { hwlog.RunLog.Errorf("failed to exec upgrade restart process, err: %v", err) if err := hnm.updateResetCMStatus(taskName, common.IsolateError, common.RestartError, common.RecoverFailedStatus, @@ -1000,7 +1022,8 @@ func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common } // upgradeRestartProcess upgrade the device restart processing to the device reset processing -func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultInfoList []*common.TaskDevInfo) (string, +func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultInfoList []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) (string, error) { restartFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartErrorLevel) if err != nil { @@ -1017,7 +1040,7 @@ func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultIn hwlog.RunLog.Errorf(failedToUpdateCmPattern, err) return "", err } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { return "", err } resultFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartErrorLevel) @@ -1039,7 +1062,7 @@ func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultIn // upgradeRestartProcess upgrade the device restart processing to the device reset processing func (hnm *HwAscend910Manager) upgradeRestartRequestProcess(taskName string, - devFaultInfoList []*common.TaskDevInfo) (string, error) { + devFaultInfoList []*common.TaskDevInfo, classifyDevs map[string][]*common.NpuDevice) (string, error) { hwlog.RunLog.Warnf("L2 fault self-healing failed, upgrade fault, task name: %s", taskName) if err := hnm.updateResetCMStatusWithoutWait(taskName, common.ResetError, common.RestartRequestError, common.UnrecoveredStatus, devFaultInfoList); err != nil { @@ -1050,7 +1073,7 @@ func (hnm *HwAscend910Manager) upgradeRestartRequestProcess(taskName string, hwlog.RunLog.Errorf("failed to wait for all faulty devices having no process, err: %v", err) return "", err } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Warnf("L2 upgrade reset failed, err:%v", err) return "", err } @@ -1127,7 +1150,8 @@ func (hnm *HwAscend910Manager) updateResetCMStatusWithoutWait(taskName, policy, return nil } -func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { isHotResetOn = false if err := hnm.postProcess(taskName, resetInfo); err != nil { @@ -1150,7 +1174,7 @@ func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.T } return } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to reset device, err: %v", err) if err := hnm.updateResetCMStatus(taskName, common.IsolateError, common.ResetError, common.RecoverFailedStatus, devFaultInfoList); err != nil { @@ -1383,22 +1407,39 @@ func (hnm *HwAscend910Manager) postProcess(taskName string, resetInfo *common.Ta return nil } -func (hnm *HwAscend910Manager) refreshDevFaultInfo(devFaultInfo []*common.TaskDevInfo) error { - for _, devInfo := range devFaultInfo { - _, errorCode, err := hnm.GetDmgr().GetDeviceAllErrorCode(devInfo.LogicId) - if err != nil { - hwlog.RunLog.Errorf("failed to get err code of device %d", devInfo.LogicId) - return err +func (hnm *HwAscend910Manager) refreshDevFaultInfo(devFaultInfo []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) error { + devStatusList, ok := classifyDevs[common.Ascend910] + if !ok { + return fmt.Errorf("not found %s device type in %v", common.Ascend910, devStatusList) + } + + common.Synchronize = false + // wait for main process to update the fault cache info. The coroutine performs grace tolerance processing based on + // the updated fault cache info of the main process + if err := wait.PollImmediate(time.Second, time.Duration( + synchronizeWaitMagnification*common.ParamOption.ListAndWatchPeriod)*time.Second, func() (bool, error) { + if !common.Synchronize { + hwlog.RunLog.Debug("failed to synchronize the fault cache of the main process, will retry again") + return false, nil + } + for _, npuDevice := range devStatusList { + devFaultInfo[npuDevice.LogicID].ErrorCode = npuDevice.FaultCodes + devFaultInfo[npuDevice.LogicID].Policy = hnm.hotResetManager. + GetDevProcessPolicy(common.GetFaultType(npuDevice.FaultCodes, npuDevice.LogicID)) + hwlog.RunLog.Infof("refresh device fault info, device %d, policy %s, err code: %v", npuDevice.LogicID, + devFaultInfo[npuDevice.LogicID].Policy, devFaultInfo[npuDevice.LogicID].ErrorCode) } - devInfo.Policy = hnm.hotResetManager.GetDevProcessPolicy(common.GetFaultType(errorCode, devInfo.LogicId)) - devInfo.ErrorCode = errorCode - hwlog.RunLog.Infof("refresh device fault info, device %d, policy %s, err code: %v", devInfo.LogicId, - devInfo.Policy, errorCode) + return true, nil + }); err != nil { + return fmt.Errorf("synchronize the fault cache of the main process timeout: %v", err) } + return nil } -func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDevInfo) error { +func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) error { resetFaultInfoMap, err := hnm.hotResetManager.GetNeedResetDevMap(devFaultInfoList) if err != nil { hwlog.RunLog.Errorf("failed to get need reset device list, err: %v", err) @@ -1411,7 +1452,7 @@ func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDe for _, devInfo := range devFaultInfoList { common.SetDeviceInit(devInfo.LogicId) } - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err: %v", err) return err } diff --git a/pkg/device/ascend910_test.go b/pkg/device/ascend910_test.go index 459e349635e6a480ec714aa362d9d0c68091fa2a..d6e24b4374beba8cf8192ce43047d97fdba99d9e 100644 --- a/pkg/device/ascend910_test.go +++ b/pkg/device/ascend910_test.go @@ -388,7 +388,7 @@ func TestProcessAllTask(t *testing.T) { common.IsolateError: common.IsolateErrorLevel, }, } - err := manager.processAllTask() + err := manager.processAllTask(mockGroupDevice()) convey.So(err, convey.ShouldBeNil) }) } diff --git a/pkg/server/manager.go b/pkg/server/manager.go index 061d6505dd5001b726afb290195f0e92cdd1b6a0..338dafa3f86b4bd60383fb3a7fcd154a63b94ebf 100644 --- a/pkg/server/manager.go +++ b/pkg/server/manager.go @@ -375,6 +375,7 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { common.DelOnceRecoverFault(hdm.groupDevice) common.DelOnceFrequencyFault() common.UnlockAllDeviceInfo() + common.Synchronize = true } } }