From a7379185b3f6601660edc1d491ac06fc19ac3c2d Mon Sep 17 00:00:00 2001 From: Light-Alex <245212467@qq.com> Date: Thu, 25 Jul 2024 17:44:24 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4=E6=98=8E?= =?UTF-8?q?=20Modification=E3=80=91=E4=BC=98=E9=9B=85=E5=AE=B9=E9=94=99?= =?UTF-8?q?=E5=8D=8F=E7=A8=8B=E5=92=8C=E4=B8=BB=E8=BF=9B=E7=A8=8B=E4=B9=8B?= =?UTF-8?q?=E9=97=B4=E6=96=B0=E5=A2=9E=E6=95=85=E9=9A=9C=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=9C=BA=E5=88=B6=E4=BB=A5=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E4=B8=8D=E4=B8=80=E8=87=B4=E5=AF=BC=E8=87=B4=E7=9A=84=E6=96=AD?= =?UTF-8?q?=E7=82=B9=E7=BB=AD=E8=AE=AD=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20=E3=80=90=E4=BF=AE=E6=94=B9=E4=BA=BA=20Modifier?= =?UTF-8?q?=E3=80=91yanchuanxiang?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/common/fault_code.go | 2 + pkg/device/ascend910.go | 131 +++++++++++++++++++++++------------ pkg/device/ascend910_test.go | 2 +- pkg/server/manager.go | 1 + 4 files changed, 91 insertions(+), 45 deletions(-) diff --git a/pkg/common/fault_code.go b/pkg/common/fault_code.go index 6d599772..a6bd6007 100644 --- a/pkg/common/fault_code.go +++ b/pkg/common/fault_code.go @@ -89,6 +89,8 @@ var ( devFaultInfoMapLock sync.Mutex // SubscribeFailed subscribe failed flag SubscribeFailed bool + // Synchronize used for synchronizing the fault cache between the main process and the grace tolerance coroutines + Synchronize bool // manuallySeparateNpuMapLock operate manuallySeparateNpuMap lock manuallySeparateNpuMapLock sync.Mutex // manuallySeparateNpuMap manually separate npu info cache diff --git a/pkg/device/ascend910.go b/pkg/device/ascend910.go index 8c474522..e119de84 100644 --- a/pkg/device/ascend910.go +++ b/pkg/device/ascend910.go @@ -32,9 +32,10 @@ import ( ) const ( - networkDetectOK = uint32(0) - networkDetectInit = uint32(6) - podDevStatusAnnotation = "podDevStatus" + networkDetectOK = uint32(0) + networkDetectInit = uint32(6) + synchronizeWaitMagnification = 3 + podDevStatusAnnotation = "podDevStatus" ) var ( @@ -117,7 +118,7 @@ func (hnm *HwAscend910Manager) GraceTolerance(classifyDevs map[string][]*common. return } // 2. performs graceful fault tolerance for tasks to be processed based on the device information in the cache - if err := hnm.processAllTask(); err != nil { + if err := hnm.processAllTask(classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to process task, err: %#v", err) return } @@ -497,7 +498,7 @@ func (hnm *HwAscend910Manager) refreshNormalPodAnnotation(taskName string) { hwlog.RunLog.Info("normal pod refresh annotation success") } -func (hnm *HwAscend910Manager) processAllTask() error { +func (hnm *HwAscend910Manager) processAllTask(classifyDevs map[string][]*common.NpuDevice) error { taskDevFaultInfoList := hnm.hotResetManager.GetAllTaskDevFaultInfoList() for taskName := range taskDevFaultInfoList { policy, policyLevel, err := hnm.hotResetManager.GetTaskProcessPolicy(taskName) @@ -505,48 +506,67 @@ func (hnm *HwAscend910Manager) processAllTask() error { hwlog.RunLog.Errorf("failed to get task %s process policy, err: %#v", taskName, err) continue } - switch policyLevel { - case common.RestartErrorLevel, common.ResetErrorLevel, common.RestartRequestErrorLevel: - hwlog.RunLog.Debugf("start handle fault: %s - %d, task name: %s", policy, policyLevel, taskName) - default: - hnm.refreshNormalPodAnnotation(taskName) + if hnm.policyLevelHandle(policy, taskName, policyLevel) { continue } - if resetFlag, err := hnm.isTaskInReset(taskName); err != nil || resetFlag { - if resetFlag && !hnm.hotResetManager.IsCurNodeTaskInReset(taskName) && - hnm.hotResetManager.IsExistFaultyDevInTask(taskName) { - hwlog.RunLog.Infof("task %s is in reset process which is not performed on current node. NPU"+ - " faults occurred on current node, the task's process policy will be marked as isolate", taskName) - go hnm.tryWriteIsolationInfo(taskName) - } + if hnm.isolateSceneHandle(taskName) { continue } + resetInfo, err := hnm.preProcess(taskName, policy) if err != nil { return err } - if err = hnm.runProcessTask(taskName, policyLevel, resetInfo); err != nil { + if err = hnm.runProcessTask(taskName, policyLevel, resetInfo, classifyDevs); err != nil { return err } } return nil } -func (hnm *HwAscend910Manager) runProcessTask(taskName string, policyLevel int, resetInfo *common.TaskResetInfo) error { +func (hnm *HwAscend910Manager) policyLevelHandle(handlePolicy, handleTaskName string, handlePolicyLevel int) bool { + switch handlePolicyLevel { + case common.RestartErrorLevel, common.ResetErrorLevel, common.RestartRequestErrorLevel: + hwlog.RunLog.Debugf("start handle fault: %s - %d, task name: %s", handlePolicy, + handlePolicyLevel, handleTaskName) + default: + hnm.refreshNormalPodAnnotation(handleTaskName) + return true + } + + return false +} + +func (hnm *HwAscend910Manager) isolateSceneHandle(handleTaskName string) bool { + if resetFlag, err := hnm.isTaskInReset(handleTaskName); err != nil || resetFlag { + if resetFlag && !hnm.hotResetManager.IsCurNodeTaskInReset(handleTaskName) && + hnm.hotResetManager.IsExistFaultyDevInTask(handleTaskName) { + hwlog.RunLog.Infof("task %s is in reset process which is not performed on current node. NPU"+ + " faults occurred on current node, the task's process policy will be marked as isolate", handleTaskName) + go hnm.tryWriteIsolationInfo(handleTaskName) + } + return true + } + return false +} + +func (hnm *HwAscend910Manager) runProcessTask(taskName string, policyLevel int, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) error { switch policyLevel { case common.RestartRequestErrorLevel: - go hnm.restartRequestProcess(taskName, resetInfo) + go hnm.restartRequestProcess(taskName, resetInfo, classifyDevs) case common.RestartErrorLevel: - go hnm.restartProcess(taskName, resetInfo) + go hnm.restartProcess(taskName, resetInfo, classifyDevs) case common.ResetErrorLevel: - go hnm.resetProcess(taskName, resetInfo) + go hnm.resetProcess(taskName, resetInfo, classifyDevs) default: return fmt.Errorf("invalid processing policy") } return nil } -func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to unset device in reset, err %v", err) @@ -562,11 +582,11 @@ func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) // wait L2 fault to self-healing time.Sleep(common.WaitFlushingCMTime * time.Second) - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err %v", err) return } - currentPolicy, err := hnm.upgradeRestartRequestProcess(taskName, devFaultInfoList) + currentPolicy, err := hnm.upgradeRestartRequestProcess(taskName, devFaultInfoList, classifyDevs) if err != nil { hwlog.RunLog.Errorf("failed to exec upgrade reset process, err: %v", err) return @@ -586,7 +606,8 @@ func (hnm *HwAscend910Manager) restartRequestProcess(taskName string, resetInfo return } -func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to unset device in reset, err %v", err) @@ -601,11 +622,11 @@ func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common common.RecordFaultInfoList(devFaultInfoList) devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) time.Sleep(common.WaitFlushingCMTime * time.Second) - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err %v", err) return } - currentPolicy, err := hnm.upgradeRestartProcess(taskName, devFaultInfoList) + currentPolicy, err := hnm.upgradeRestartProcess(taskName, devFaultInfoList, classifyDevs) if err != nil { hwlog.RunLog.Errorf("failed to exec upgrade restart process, err: %v", err) return @@ -623,7 +644,8 @@ func (hnm *HwAscend910Manager) restartProcess(taskName string, resetInfo *common } // upgradeRestartProcess upgrade the device restart processing to the device reset processing -func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultInfoList []*common.TaskDevInfo) (string, +func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultInfoList []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) (string, error) { restartFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartErrorLevel) if err != nil { @@ -640,7 +662,7 @@ func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultIn hwlog.RunLog.Errorf("failed to update reset cm to recover failed status, err: %v", err) return "", err } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { return "", err } resultFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartErrorLevel) @@ -662,7 +684,7 @@ func (hnm *HwAscend910Manager) upgradeRestartProcess(taskName string, devFaultIn // upgradeRestartProcess upgrade the device restart processing to the device reset processing func (hnm *HwAscend910Manager) upgradeRestartRequestProcess(taskName string, - devFaultInfoList []*common.TaskDevInfo) (string, error) { + devFaultInfoList []*common.TaskDevInfo, classifyDevs map[string][]*common.NpuDevice) (string, error) { faultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, common.RestartRequestErrorLevel) if err != nil { @@ -679,7 +701,7 @@ func (hnm *HwAscend910Manager) upgradeRestartRequestProcess(taskName string, hwlog.RunLog.Errorf("failed to update reset cm to ResetError, err: %v", err) return "", err } - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { return "", err } resultFaultInfoList, err := hnm.hotResetManager.GetDevListByPolicyLevel(devFaultInfoList, @@ -726,7 +748,8 @@ func (hnm *HwAscend910Manager) updateResetCMStatus(taskName, policy, initPolicy, return nil } -func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.TaskResetInfo) { +func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.TaskResetInfo, + classifyDevs map[string][]*common.NpuDevice) { defer func() { if err := hnm.postProcess(taskName, resetInfo); err != nil { hwlog.RunLog.Errorf("failed to exec post process, err: %v", err) @@ -741,7 +764,7 @@ func (hnm *HwAscend910Manager) resetProcess(taskName string, resetInfo *common.T common.RecordFaultInfoList(devFaultInfoList) devFaultInfoListInReset := hnm.hotResetManager.DeepCopyDevFaultInfoList(devFaultInfoList) time.Sleep(common.WaitFlushingCMTime * time.Second) - if err := hnm.resetDeviceOnce(devFaultInfoList); err != nil { + if err := hnm.resetDeviceOnce(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to reset device, err: %v", err) return } @@ -839,20 +862,40 @@ func (hnm *HwAscend910Manager) postProcess(taskName string, resetInfo *common.Ta hwlog.RunLog.Infof("grace tolerance process complete, task name: %s", taskName) return nil } -func (hnm *HwAscend910Manager) refreshDevFaultInfo(devFaultInfo []*common.TaskDevInfo) error { - for _, devInfo := range devFaultInfo { - _, errorCode, err := hnm.GetDmgr().GetDeviceAllErrorCode(devInfo.LogicId) - if err != nil { - hwlog.RunLog.Errorf("failed to get err code of device %d", devInfo.LogicId) - return err + +func (hnm *HwAscend910Manager) refreshDevFaultInfo(devFaultInfo []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) error { + devStatusList, ok := classifyDevs[common.Ascend910] + if !ok { + return fmt.Errorf("not found %s device type in %v", common.Ascend910, devStatusList) + } + + common.Synchronize = false + // wait for main process to update the fault cache info. The coroutine performs grace tolerance processing based on + // the updated fault cache info of the main process + if err := wait.PollImmediate(time.Second, time.Duration( + synchronizeWaitMagnification*common.ParamOption.ListAndWatchPeriod)*time.Second, func() (bool, error) { + if !common.Synchronize { + hwlog.RunLog.Debug("failed to synchronize the fault cache of the main process, will retry again") + return false, nil + } + for _, npuDevice := range devStatusList { + devFaultInfo[npuDevice.LogicID].ErrorCode = npuDevice.FaultCodes + devFaultInfo[npuDevice.LogicID].Policy = hnm.hotResetManager. + GetDevProcessPolicy(common.GetFaultType(npuDevice.FaultCodes, npuDevice.LogicID)) + hwlog.RunLog.Infof("refresh device fault info, device %d, policy %s, err code: %v", npuDevice.LogicID, + devFaultInfo[npuDevice.LogicID].Policy, devFaultInfo[npuDevice.LogicID].ErrorCode) } - devInfo.Policy = hnm.hotResetManager.GetDevProcessPolicy(common.GetFaultType(errorCode, devInfo.LogicId)) - devInfo.ErrorCode = errorCode + return true, nil + }); err != nil { + return fmt.Errorf("synchronize the fault cache of the main process timeout: %v", err) } + return nil } -func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDevInfo) error { +func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDevInfo, + classifyDevs map[string][]*common.NpuDevice) error { resetFaultInfoMap, err := hnm.hotResetManager.GetNeedResetDevMap(devFaultInfoList) if err != nil { hwlog.RunLog.Errorf("failed to get need reset device list, err: %v", err) @@ -865,7 +908,7 @@ func (hnm *HwAscend910Manager) resetDeviceOnce(devFaultInfoList []*common.TaskDe for _, devInfo := range devFaultInfoList { common.SetDeviceInit(devInfo.LogicId) } - if err := hnm.refreshDevFaultInfo(devFaultInfoList); err != nil { + if err := hnm.refreshDevFaultInfo(devFaultInfoList, classifyDevs); err != nil { hwlog.RunLog.Errorf("failed to refresh device fault info, err: %v", err) return err } diff --git a/pkg/device/ascend910_test.go b/pkg/device/ascend910_test.go index 8735d1aa..b01bb25e 100644 --- a/pkg/device/ascend910_test.go +++ b/pkg/device/ascend910_test.go @@ -175,7 +175,7 @@ func TestProcessAllTask(t *testing.T) { common.IsolateError: common.IsolateErrorLevel, }, } - err := manager.processAllTask() + err := manager.processAllTask(mockGroupDevice()) convey.So(err, convey.ShouldBeNil) }) } diff --git a/pkg/server/manager.go b/pkg/server/manager.go index 09c5ece9..9b5ca34c 100644 --- a/pkg/server/manager.go +++ b/pkg/server/manager.go @@ -314,6 +314,7 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { common.DelOnceRecoverFault(hdm.groupDevice) common.DelOnceFrequencyFault() common.UnlockAllDeviceInfo() + common.Synchronize = true } } } -- Gitee