From 02a028da2f9d1f586f49d6e8e5f462be154c53a7 Mon Sep 17 00:00:00 2001 From: wangchao Date: Fri, 18 Jul 2025 17:56:24 +0800 Subject: [PATCH 1/2] Support p2p detect --- test/torch_npu_schema.json | 4 +- third_party/acl/inc/aml/aml_fwk_detect.h | 8 +++ .../csrc/core/npu/interface/MlInterface.cpp | 21 ++++++++ .../csrc/core/npu/interface/MlInterface.h | 10 ++++ torch_npu/csrc/distributed/HCCLUtils.cpp | 10 ++++ torch_npu/csrc/distributed/HCCLUtils.hpp | 3 ++ torch_npu/csrc/distributed/Init.cpp | 3 +- .../csrc/distributed/ProcessGroupHCCL.cpp | 9 ++++ .../csrc/distributed/ProcessGroupHCCL.hpp | 2 + torch_npu/csrc/npu/Module.cpp | 11 +++-- torch_npu/csrc/npu/Stress_detect.cpp | 49 ++++++++++++++----- torch_npu/csrc/npu/Stress_detect.h | 7 ++- torch_npu/npu/utils.py | 27 +++++++++- 13 files changed, 142 insertions(+), 22 deletions(-) diff --git a/test/torch_npu_schema.json b/test/torch_npu_schema.json index f73cc70ee9..927804a594 100644 --- a/test/torch_npu_schema.json +++ b/test/torch_npu_schema.json @@ -1233,7 +1233,7 @@ "signature": "(device_id)" }, "torch_npu.npu.stress_detect": { - "signature": "()" + "signature": "(detecct_type=0)" }, "torch_npu.npu.seed": { "signature": "()" @@ -1677,7 +1677,7 @@ "signature": "(stream)" }, "torch_npu.npu.utils.stress_detect": { - "signature": "()" + "signature": "(detecct_type=0)" }, "torch_npu.npu.utils.synchronize": { "signature": "(device=None)" diff --git a/third_party/acl/inc/aml/aml_fwk_detect.h b/third_party/acl/inc/aml/aml_fwk_detect.h index 2eb355b338..26dbde4549 100644 --- a/third_party/acl/inc/aml/aml_fwk_detect.h +++ b/third_party/acl/inc/aml/aml_fwk_detect.h @@ -32,8 +32,16 @@ typedef struct AmlAicoreDetectAttr { uint8_t reserve[64]; } AmlAicoreDetectAttr; +struct AmlP2PDetectAttr { + void *workspace; + uint64_t workspaceSize; + uint8_t reserve[64]; +}; + AmlStatus AmlAicoreDetectOnline(int32_t deviceId, const AmlAicoreDetectAttr *attr); +AmlStatus AmlP2PDetectOnline(int32_t devId, void *comm, const AmlP2PDetectAttr *attr); + #ifdef __cplusplus } #endif diff --git a/torch_npu/csrc/core/npu/interface/MlInterface.cpp b/torch_npu/csrc/core/npu/interface/MlInterface.cpp index b992b4a188..4008c8eb27 100644 --- a/torch_npu/csrc/core/npu/interface/MlInterface.cpp +++ b/torch_npu/csrc/core/npu/interface/MlInterface.cpp @@ -14,6 +14,7 @@ namespace amlapi { REGISTER_LIBRARY(libascend_ml) LOAD_FUNCTION(AmlAicoreDetectOnline) +LOAD_FUNCTION(AmlP2PDetectOnline) bool IsExistAmlAicoreDetectOnline() { @@ -24,6 +25,15 @@ bool IsExistAmlAicoreDetectOnline() return isExist; } +bool IsExistAmlP2PDetectOnline() +{ + const static bool isExist = []() -> bool { + static auto func = GET_FUNC(AmlP2PDetectOnline); + return func != nullptr; + }(); + return isExist; +} + AmlStatus AmlAicoreDetectOnlineFace(int32_t deviceId, const AmlAicoreDetectAttr *attr) { typedef AmlStatus (*amlAicoreDetectOnline)(int32_t, const AmlAicoreDetectAttr *); @@ -35,5 +45,16 @@ AmlStatus AmlAicoreDetectOnlineFace(int32_t deviceId, const AmlAicoreDetectAttr return func(deviceId, attr); } +AmlStatus AmlP2PDetectOnlineFace(int32_t deviceId, void *comm, const AmlP2PDetectAttr *attr) +{ + typedef AmlStatus (*amlP2PDetectOnline)(int32_t, void *, const AmlP2PDetectAttr *); + static amlP2PDetectOnline func = nullptr; + if (func == nullptr) { + func = (amlP2PDetectOnline) GET_FUNC(AmlP2PDetectOnline); + } + TORCH_CHECK(func, "Failed to find function ", "AmlP2PDetectOnline", PTA_ERROR(ErrCode::NOT_FOUND)); + return func(deviceId, comm, attr); +} + } // namespace amlapi } // namespace c10_npu diff --git a/torch_npu/csrc/core/npu/interface/MlInterface.h b/torch_npu/csrc/core/npu/interface/MlInterface.h index 6af498629a..33389fcce3 100644 --- a/torch_npu/csrc/core/npu/interface/MlInterface.h +++ b/torch_npu/csrc/core/npu/interface/MlInterface.h @@ -8,10 +8,20 @@ namespace amlapi { */ bool IsExistAmlAicoreDetectOnline(); +/** + * This API is used to check whether AmlP2PDetectOnline exist. +*/ +bool IsExistAmlP2PDetectOnline(); + /** * This API is used to call AmlAicoreDetectOnline. */ AmlStatus AmlAicoreDetectOnlineFace(int32_t deviceId, const AmlAicoreDetectAttr *attr); +/** + * This API is used to call AmlP2PDetectOnline. +*/ +AmlStatus AmlP2PDetectOnlineFace(int32_t deviceId, void *comm, const AmlP2PDetectAttr *attr); + } // namespace amlapi } // namespace c10_npu diff --git a/torch_npu/csrc/distributed/HCCLUtils.cpp b/torch_npu/csrc/distributed/HCCLUtils.cpp index 74c2334ade..f398481c48 100644 --- a/torch_npu/csrc/distributed/HCCLUtils.cpp +++ b/torch_npu/csrc/distributed/HCCLUtils.cpp @@ -222,6 +222,16 @@ HcclResult HCCLComm::checkForHcclError() #endif } +int HCCLComm::amlP2PDetectOnline(int32_t deviceId, const AmlP2PDetectAttr *attr) +{ + std::unique_lock lock(mutex_); + int ret = -1; + if (hcclComm_ != nullptr) { + ret = c10_npu::amlapi::AmlP2PDetectOnlineFace(deviceId, hcclComm_, attr); + } + return ret; +} + void DebugInfoWriter::write(const std::string &hcclTrace) { // Open a file for writing. The ios::binary flag is used to write data as diff --git a/torch_npu/csrc/distributed/HCCLUtils.hpp b/torch_npu/csrc/distributed/HCCLUtils.hpp index b4662c1e49..3ebc331551 100644 --- a/torch_npu/csrc/distributed/HCCLUtils.hpp +++ b/torch_npu/csrc/distributed/HCCLUtils.hpp @@ -6,6 +6,7 @@ #include "torch_npu/csrc/core/npu/npu_log.h" #include "torch_npu/csrc/core/npu/sys_ctrl/npu_sys_ctrl.h" #include "torch_npu/csrc/core/npu/NPUException.h" +#include "torch_npu/csrc/core/npu/interface/MlInterface.h" #include #include @@ -233,6 +234,8 @@ public: HcclResult checkForHcclError(); + int amlP2PDetectOnline(int32_t deviceId, const AmlP2PDetectAttr *attr); + protected: HcclComm hcclComm_; mutable std::mutex mutex_; diff --git a/torch_npu/csrc/distributed/Init.cpp b/torch_npu/csrc/distributed/Init.cpp index 30e0dce7cd..8e0d7d224e 100644 --- a/torch_npu/csrc/distributed/Init.cpp +++ b/torch_npu/csrc/distributed/Init.cpp @@ -440,7 +440,8 @@ PyObject* c10d_npu_init(PyObject* _unused, PyObject* noargs) py::arg("input"), py::arg("output_split_sizes") = std::vector{}, py::arg("opts") = ::c10d::AllgatherOptions(), - py::call_guard()); + py::call_guard()) + .def("_set_hccl_comm_for_hccs_detect", &::c10d_npu::ProcessGroupHCCL::SetHcclCommForHccsDetect); intrusive_ptr_class_<::c10d_npu::ProcessGroupHCCL::Options>( processGroupHCCL, diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index 7ec4974387..4f4492a8b8 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -52,6 +52,7 @@ #include "torch_npu/csrc/framework/utils/OpPreparation.h" #include "torch_npu/csrc/profiler/npu_profiler.h" #include "torch_npu/csrc/logging/LogContext.h" +#include "torch_npu/csrc/npu/Stress_detect.h" #include "torch_npu/csrc/distributed/ProcessGroupHCCL.hpp" namespace py = pybind11; @@ -2476,6 +2477,14 @@ std::vector>& ProcessGroupHCCL::createHCCLComm( return devHCCLCommMap_[devicesKey]; } +void ProcessGroupHCCL::SetHcclCommForHccsDetect() +{ + at::Device device = getDeviceForRank(rank_); + std::vector devices = {device}; + std::shared_ptr hcclComm = getHcclCommByDevices(devices); + StressDetector::SetLocalHcclComm(hcclComm); +} + int64_t ProcessGroupHCCL::getStreamId(bool p2p, int peer) { int device = -1; diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp index 940075105b..7fc1a64573 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp @@ -607,6 +607,8 @@ public: int64_t getStreamId(bool p2p, int peer); + void SetHcclCommForHccsDetect(); + protected: // Helper that broadcasts HCCL Master ID to all ranks through the store void broadcastMasterID( diff --git a/torch_npu/csrc/npu/Module.cpp b/torch_npu/csrc/npu/Module.cpp index 8d8fb06843..dd970f0a6e 100644 --- a/torch_npu/csrc/npu/Module.cpp +++ b/torch_npu/csrc/npu/Module.cpp @@ -727,15 +727,16 @@ PyObject* THNPModule_maybeExchangeDevice_wrap(PyObject* self, PyObject* arg) END_HANDLE_TH_ERRORS } -PyObject* THNPModule_stressDetect_wrap(PyObject* self, PyObject* noargs) +PyObject* THNPModule_stressDetect_wrap(PyObject* self, PyObject* arg) { HANDLE_TH_ERRORS + int detectType = THPUtils_unpackLong(arg); torch_npu::utils::npu_lazy_init(); - int device_id; - NPU_CHECK_ERROR_WITHOUT_UCE(c10_npu::GetDevice(&device_id)); + int deviceId; + NPU_CHECK_ERROR_WITHOUT_UCE(c10_npu::GetDevice(&deviceId)); - int ret = StressDetector::perform_stress_detect(device_id); + int ret = StressDetector::perform_stress_detect(deviceId, detectType); return PyLong_FromLong(ret); END_HANDLE_TH_ERRORS } @@ -1804,7 +1805,7 @@ static struct PyMethodDef THNPModule_methods[] = { {"_npu_restart_device", (PyCFunction)THNPModule_restart_device_wrap, METH_O, nullptr}, {"_npu_check_uce_in_memory", (PyCFunction)THNPModule_check_uce_in_memory_wrap, METH_O, nullptr}, {"_npu_get_uce_addr", (PyCFunction)THNPModule_get_uce_addr_wrap, METH_NOARGS, nullptr}, - {"_npu_stress_detect", (PyCFunction)THNPModule_stressDetect_wrap, METH_NOARGS, nullptr}, + {"_npu_stress_detect", (PyCFunction)THNPModule_stressDetect_wrap, METH_O, nullptr}, {"_npu_getLocalDevice", (PyCFunction)THNPModule_getLocalDevice_wrap, METH_NOARGS, nullptr}, {"_npu_getDeviceCount", (PyCFunction)THNPModule_getDeviceCount_wrap, METH_NOARGS, nullptr}, {"_npu_canDeviceAccessPeer", (PyCFunction)THNPModule_npuCanDeviceAccessPeer_wrap, METH_VARARGS, nullptr}, diff --git a/torch_npu/csrc/npu/Stress_detect.cpp b/torch_npu/csrc/npu/Stress_detect.cpp index 3fcade819b..ae5ff058dd 100644 --- a/torch_npu/csrc/npu/Stress_detect.cpp +++ b/torch_npu/csrc/npu/Stress_detect.cpp @@ -16,6 +16,8 @@ std::mutex StressDetector::mtx; int StressDetector::device_id; void* StressDetector::workspaceAddr = nullptr; size_t StressDetector::workspaceSize = 0; +int StressDetector::stressDetectType = 0; +std::shared_ptr StressDetector::localHcclComm = nullptr; constexpr int kDetectSucceeded = 0; constexpr int kDetectFailed = 1; @@ -40,16 +42,35 @@ void StressDetector::worker_thread() // Execute the task int ret = -1; - if (c10_npu::amlapi::IsExistAmlAicoreDetectOnline()) { - AmlAicoreDetectAttr attr; - attr.mode = AML_DETECT_RUN_MODE_ONLINE; - attr.workspace = workspaceAddr; - attr.workspaceSize = workspaceSize; - ret = c10_npu::amlapi::AmlAicoreDetectOnlineFace(device_id, &attr); - ASCEND_LOGI("Stress detect with AmlAicoreDetectOnline, result is %d.", ret); - } else { - ret = c10_npu::acl::AclStressDetect(device_id, workspaceAddr, workspaceSize); - ASCEND_LOGI("Stress detect with StressDetect, result is %d.", ret); + try { + if (stressDetectType == 0) { + if (c10_npu::amlapi::IsExistAmlAicoreDetectOnline()) { + AmlAicoreDetectAttr attr; + attr.mode = AML_DETECT_RUN_MODE_ONLINE; + attr.workspace = workspaceAddr; + attr.workspaceSize = workspaceSize; + ret = c10_npu::amlapi::AmlAicoreDetectOnlineFace(device_id, &attr); + ASCEND_LOGI("Stress detect with AmlAicoreDetectOnline, result is %d.", ret); + } else { + ret = c10_npu::acl::AclStressDetect(device_id, workspaceAddr, workspaceSize); + ASCEND_LOGI("Stress detect with StressDetect, result is %d.", ret); + } + } else { + if (localHcclComm == nullptr) { + ASCEND_LOGE("Stress detect with AmlP2PDetectOnline failed, localHcclComm is nullptr."); + } else if (c10_npu::amlapi::IsExistAmlP2PDetectOnline()) { + AmlP2PDetectAttr attr; + attr.workspace = workspaceAddr; + attr.workspaceSize = workspaceSize; + ret = localHcclComm->amlP2PDetectOnline(device_id, &attr); + ASCEND_LOGI("Stress detect with AmlP2PDetectOnline, result is %d.", ret); + } else { + ASCEND_LOGE("Stress detect with AmlP2PDetectOnline failed, CANN version lower than 8.1.RC2 and currently does not support AmlP2PDetectOnline."); + } + } + } catch (std::exception &e) { + ret = -1; + ASCEND_LOGE("Stress detect failed. type is %d, error:%s", stressDetectType, e.what()); } // Task complete, free memory @@ -89,8 +110,13 @@ int StressDetector::transfer_result(int detectResult) return ret; } +void StressDetector::SetLocalHcclComm(std::shared_ptr hcclComm) +{ + localHcclComm = hcclComm; +} + // Synchronous stress detection task execution -int StressDetector::perform_stress_detect(int deviceid) +int StressDetector::perform_stress_detect(int deviceid, int detectType) { // If it's the first call, start the persistent thread if (!thread_initialized.load()) { @@ -132,6 +158,7 @@ int StressDetector::perform_stress_detect(int deviceid) StressDetector::device_id = deviceid; StressDetector::workspaceAddr = workspaceAddr; StressDetector::workspaceSize = workspaceSize; + StressDetector::stressDetectType = detectType; // Mark new task submitted new_task_submitted.store(true); diff --git a/torch_npu/csrc/npu/Stress_detect.h b/torch_npu/csrc/npu/Stress_detect.h index 4319122be7..831fe1a2aa 100644 --- a/torch_npu/csrc/npu/Stress_detect.h +++ b/torch_npu/csrc/npu/Stress_detect.h @@ -9,12 +9,15 @@ #include #include #include "torch_npu/csrc/core/npu/NPUMacros.h" +#include "torch_npu/csrc/distributed/HCCLUtils.hpp" class StressDetector { public: - TORCH_NPU_API static int perform_stress_detect(int deviceid); + TORCH_NPU_API static int perform_stress_detect(int deviceid, int detectType); TORCH_NPU_API static void stop_worker_thread(); + static void SetLocalHcclComm(std::shared_ptr hcclComm); + private: static void worker_thread(); @@ -44,6 +47,8 @@ private: static int device_id; static void* workspaceAddr; static size_t workspaceSize; + static int stressDetectType; + static std::shared_ptr localHcclComm; // Flag to indicate if the thread has been initialized static std::atomic thread_initialized; diff --git a/torch_npu/npu/utils.py b/torch_npu/npu/utils.py index 4475a9fb4f..f2e2f65ade 100644 --- a/torch_npu/npu/utils.py +++ b/torch_npu/npu/utils.py @@ -507,9 +507,32 @@ def clear_npu_overflow_flag(): torch_npu.npu_clear_float_status(float_status) -def stress_detect(): +hccl_detect_group = None + + +def stress_detect(detecct_type=0): + if detecct_type not in [0, 1]: + raise ValueError("Detecct_type should be 0 or 1. For details, 0 as `Online aicore detect`, 1 as `Online p2p detect`." + pta_error(ErrCode.VALUE)) torch_npu.npu._lazy_init() - return torch_npu._C._npu_stress_detect() + if detecct_type == 1: + if not torch.distributed.is_initialized(): + raise ValueError("The torch.distributed should to be initialized for hccs detection" + pta_error(ErrCode.UNAVAIL)) + global hccl_detect_group + if hccl_detect_group is None: + rank = int(os.getenv('RANK', -1)) + local_world_size = int(os.getenv('LOCAL_WORLD_SIZE', -1)) + if rank == -1 or local_world_size == -1: + raise ValueError("Environment variable 'RANK' or 'LOCAL_WORLD_SIZE' is not set." + pta_error(ErrCode.UNAVAIL)) + worker_index = rank // local_world_size + local_ranks = [] + for i in range(local_world_size): + local_ranks.append(local_world_size * worker_index + i) + try: + hccl_detect_group = torch.distributed.new_group(ranks=local_ranks) + hccl_detect_group._get_backend(torch.device('npu'))._set_hccl_comm_for_hccs_detect() + except Exception as err: + return 1 + return torch_npu._C._npu_stress_detect(detecct_type) def current_blas_handle(): -- Gitee From c67b3c940ee1cd0e9d36d8108918fc418ea607a9 Mon Sep 17 00:00:00 2001 From: wangchao Date: Tue, 29 Jul 2025 11:16:47 +0800 Subject: [PATCH 2/2] 1 --- test/torch_npu_schema.json | 4 ++-- torch_npu/csrc/distributed/HCCLUtils.cpp | 10 -------- torch_npu/csrc/distributed/HCCLUtils.hpp | 3 --- torch_npu/csrc/distributed/Init.cpp | 3 +-- .../csrc/distributed/ProcessGroupHCCL.cpp | 9 ------- .../csrc/distributed/ProcessGroupHCCL.hpp | 2 -- torch_npu/csrc/npu/Module.cpp | 23 ++++++++++++++---- torch_npu/csrc/npu/Stress_detect.cpp | 24 +++++++------------ torch_npu/csrc/npu/Stress_detect.h | 8 +++---- torch_npu/npu/utils.py | 21 +++++++++------- 10 files changed, 46 insertions(+), 61 deletions(-) diff --git a/test/torch_npu_schema.json b/test/torch_npu_schema.json index 927804a594..f048d8ed25 100644 --- a/test/torch_npu_schema.json +++ b/test/torch_npu_schema.json @@ -1233,7 +1233,7 @@ "signature": "(device_id)" }, "torch_npu.npu.stress_detect": { - "signature": "(detecct_type=0)" + "signature": "(mode=0)" }, "torch_npu.npu.seed": { "signature": "()" @@ -1677,7 +1677,7 @@ "signature": "(stream)" }, "torch_npu.npu.utils.stress_detect": { - "signature": "(detecct_type=0)" + "signature": "(mode=0)" }, "torch_npu.npu.utils.synchronize": { "signature": "(device=None)" diff --git a/torch_npu/csrc/distributed/HCCLUtils.cpp b/torch_npu/csrc/distributed/HCCLUtils.cpp index f398481c48..74c2334ade 100644 --- a/torch_npu/csrc/distributed/HCCLUtils.cpp +++ b/torch_npu/csrc/distributed/HCCLUtils.cpp @@ -222,16 +222,6 @@ HcclResult HCCLComm::checkForHcclError() #endif } -int HCCLComm::amlP2PDetectOnline(int32_t deviceId, const AmlP2PDetectAttr *attr) -{ - std::unique_lock lock(mutex_); - int ret = -1; - if (hcclComm_ != nullptr) { - ret = c10_npu::amlapi::AmlP2PDetectOnlineFace(deviceId, hcclComm_, attr); - } - return ret; -} - void DebugInfoWriter::write(const std::string &hcclTrace) { // Open a file for writing. The ios::binary flag is used to write data as diff --git a/torch_npu/csrc/distributed/HCCLUtils.hpp b/torch_npu/csrc/distributed/HCCLUtils.hpp index 3ebc331551..b4662c1e49 100644 --- a/torch_npu/csrc/distributed/HCCLUtils.hpp +++ b/torch_npu/csrc/distributed/HCCLUtils.hpp @@ -6,7 +6,6 @@ #include "torch_npu/csrc/core/npu/npu_log.h" #include "torch_npu/csrc/core/npu/sys_ctrl/npu_sys_ctrl.h" #include "torch_npu/csrc/core/npu/NPUException.h" -#include "torch_npu/csrc/core/npu/interface/MlInterface.h" #include #include @@ -234,8 +233,6 @@ public: HcclResult checkForHcclError(); - int amlP2PDetectOnline(int32_t deviceId, const AmlP2PDetectAttr *attr); - protected: HcclComm hcclComm_; mutable std::mutex mutex_; diff --git a/torch_npu/csrc/distributed/Init.cpp b/torch_npu/csrc/distributed/Init.cpp index 8e0d7d224e..30e0dce7cd 100644 --- a/torch_npu/csrc/distributed/Init.cpp +++ b/torch_npu/csrc/distributed/Init.cpp @@ -440,8 +440,7 @@ PyObject* c10d_npu_init(PyObject* _unused, PyObject* noargs) py::arg("input"), py::arg("output_split_sizes") = std::vector{}, py::arg("opts") = ::c10d::AllgatherOptions(), - py::call_guard()) - .def("_set_hccl_comm_for_hccs_detect", &::c10d_npu::ProcessGroupHCCL::SetHcclCommForHccsDetect); + py::call_guard()); intrusive_ptr_class_<::c10d_npu::ProcessGroupHCCL::Options>( processGroupHCCL, diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index 4f4492a8b8..7ec4974387 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -52,7 +52,6 @@ #include "torch_npu/csrc/framework/utils/OpPreparation.h" #include "torch_npu/csrc/profiler/npu_profiler.h" #include "torch_npu/csrc/logging/LogContext.h" -#include "torch_npu/csrc/npu/Stress_detect.h" #include "torch_npu/csrc/distributed/ProcessGroupHCCL.hpp" namespace py = pybind11; @@ -2477,14 +2476,6 @@ std::vector>& ProcessGroupHCCL::createHCCLComm( return devHCCLCommMap_[devicesKey]; } -void ProcessGroupHCCL::SetHcclCommForHccsDetect() -{ - at::Device device = getDeviceForRank(rank_); - std::vector devices = {device}; - std::shared_ptr hcclComm = getHcclCommByDevices(devices); - StressDetector::SetLocalHcclComm(hcclComm); -} - int64_t ProcessGroupHCCL::getStreamId(bool p2p, int peer) { int device = -1; diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp index 7fc1a64573..940075105b 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp @@ -607,8 +607,6 @@ public: int64_t getStreamId(bool p2p, int peer); - void SetHcclCommForHccsDetect(); - protected: // Helper that broadcasts HCCL Master ID to all ranks through the store void broadcastMasterID( diff --git a/torch_npu/csrc/npu/Module.cpp b/torch_npu/csrc/npu/Module.cpp index dd970f0a6e..4c6a49ed02 100644 --- a/torch_npu/csrc/npu/Module.cpp +++ b/torch_npu/csrc/npu/Module.cpp @@ -727,16 +727,29 @@ PyObject* THNPModule_maybeExchangeDevice_wrap(PyObject* self, PyObject* arg) END_HANDLE_TH_ERRORS } -PyObject* THNPModule_stressDetect_wrap(PyObject* self, PyObject* arg) +PyObject* THNPModule_stressDetect_wrap(PyObject* self, PyObject* args) { HANDLE_TH_ERRORS - int detectType = THPUtils_unpackLong(arg); + PyObject* value1 = nullptr; + PyObject* value2 = nullptr; + + if (!PyArg_ParseTuple(args, "OO", &value1, &value2)) { + ASCEND_LOGE("Stress detect failed, argument is invalid."); + return PyLong_FromLong(1); + } + int mode = THPUtils_unpackLong(value1); + int64_t comm = THPUtils_unpackLong(value2); + torch_npu::utils::npu_lazy_init(); int deviceId; - NPU_CHECK_ERROR_WITHOUT_UCE(c10_npu::GetDevice(&deviceId)); + aclError err = c10_npu::GetDevice(&deviceId); + if (err != ACL_ERROR_NONE) { + ASCEND_LOGE("Stress detect failed, error happened in GetDevice, err is %d.", err); + return PyLong_FromLong(1); + } - int ret = StressDetector::perform_stress_detect(deviceId, detectType); + int ret = StressDetector::perform_stress_detect(deviceId, mode, comm); return PyLong_FromLong(ret); END_HANDLE_TH_ERRORS } @@ -1805,7 +1818,7 @@ static struct PyMethodDef THNPModule_methods[] = { {"_npu_restart_device", (PyCFunction)THNPModule_restart_device_wrap, METH_O, nullptr}, {"_npu_check_uce_in_memory", (PyCFunction)THNPModule_check_uce_in_memory_wrap, METH_O, nullptr}, {"_npu_get_uce_addr", (PyCFunction)THNPModule_get_uce_addr_wrap, METH_NOARGS, nullptr}, - {"_npu_stress_detect", (PyCFunction)THNPModule_stressDetect_wrap, METH_O, nullptr}, + {"_npu_stress_detect", (PyCFunction)THNPModule_stressDetect_wrap, METH_VARARGS, nullptr}, {"_npu_getLocalDevice", (PyCFunction)THNPModule_getLocalDevice_wrap, METH_NOARGS, nullptr}, {"_npu_getDeviceCount", (PyCFunction)THNPModule_getDeviceCount_wrap, METH_NOARGS, nullptr}, {"_npu_canDeviceAccessPeer", (PyCFunction)THNPModule_npuCanDeviceAccessPeer_wrap, METH_VARARGS, nullptr}, diff --git a/torch_npu/csrc/npu/Stress_detect.cpp b/torch_npu/csrc/npu/Stress_detect.cpp index ae5ff058dd..048fc2907f 100644 --- a/torch_npu/csrc/npu/Stress_detect.cpp +++ b/torch_npu/csrc/npu/Stress_detect.cpp @@ -16,8 +16,8 @@ std::mutex StressDetector::mtx; int StressDetector::device_id; void* StressDetector::workspaceAddr = nullptr; size_t StressDetector::workspaceSize = 0; -int StressDetector::stressDetectType = 0; -std::shared_ptr StressDetector::localHcclComm = nullptr; +int StressDetector::stressMode = 0; +void* StressDetector::localHcclComm = nullptr; constexpr int kDetectSucceeded = 0; constexpr int kDetectFailed = 1; @@ -43,7 +43,7 @@ void StressDetector::worker_thread() // Execute the task int ret = -1; try { - if (stressDetectType == 0) { + if (stressMode == 0) { if (c10_npu::amlapi::IsExistAmlAicoreDetectOnline()) { AmlAicoreDetectAttr attr; attr.mode = AML_DETECT_RUN_MODE_ONLINE; @@ -56,13 +56,11 @@ void StressDetector::worker_thread() ASCEND_LOGI("Stress detect with StressDetect, result is %d.", ret); } } else { - if (localHcclComm == nullptr) { - ASCEND_LOGE("Stress detect with AmlP2PDetectOnline failed, localHcclComm is nullptr."); - } else if (c10_npu::amlapi::IsExistAmlP2PDetectOnline()) { + if (c10_npu::amlapi::IsExistAmlP2PDetectOnline()) { AmlP2PDetectAttr attr; attr.workspace = workspaceAddr; attr.workspaceSize = workspaceSize; - ret = localHcclComm->amlP2PDetectOnline(device_id, &attr); + ret = c10_npu::amlapi::AmlP2PDetectOnlineFace(device_id, localHcclComm, &attr); ASCEND_LOGI("Stress detect with AmlP2PDetectOnline, result is %d.", ret); } else { ASCEND_LOGE("Stress detect with AmlP2PDetectOnline failed, CANN version lower than 8.1.RC2 and currently does not support AmlP2PDetectOnline."); @@ -70,7 +68,7 @@ void StressDetector::worker_thread() } } catch (std::exception &e) { ret = -1; - ASCEND_LOGE("Stress detect failed. type is %d, error:%s", stressDetectType, e.what()); + ASCEND_LOGE("Stress detect failed. type is %d, error:%s", stressMode, e.what()); } // Task complete, free memory @@ -110,13 +108,8 @@ int StressDetector::transfer_result(int detectResult) return ret; } -void StressDetector::SetLocalHcclComm(std::shared_ptr hcclComm) -{ - localHcclComm = hcclComm; -} - // Synchronous stress detection task execution -int StressDetector::perform_stress_detect(int deviceid, int detectType) +int StressDetector::perform_stress_detect(int deviceid, int mode, int64_t comm) { // If it's the first call, start the persistent thread if (!thread_initialized.load()) { @@ -158,7 +151,8 @@ int StressDetector::perform_stress_detect(int deviceid, int detectType) StressDetector::device_id = deviceid; StressDetector::workspaceAddr = workspaceAddr; StressDetector::workspaceSize = workspaceSize; - StressDetector::stressDetectType = detectType; + StressDetector::stressMode = mode; + StressDetector::localHcclComm = reinterpret_cast(static_cast(comm)); // Mark new task submitted new_task_submitted.store(true); diff --git a/torch_npu/csrc/npu/Stress_detect.h b/torch_npu/csrc/npu/Stress_detect.h index 831fe1a2aa..47f020a870 100644 --- a/torch_npu/csrc/npu/Stress_detect.h +++ b/torch_npu/csrc/npu/Stress_detect.h @@ -13,11 +13,9 @@ class StressDetector { public: - TORCH_NPU_API static int perform_stress_detect(int deviceid, int detectType); + TORCH_NPU_API static int perform_stress_detect(int deviceid, int mode, int64_t comm); TORCH_NPU_API static void stop_worker_thread(); - static void SetLocalHcclComm(std::shared_ptr hcclComm); - private: static void worker_thread(); @@ -47,8 +45,8 @@ private: static int device_id; static void* workspaceAddr; static size_t workspaceSize; - static int stressDetectType; - static std::shared_ptr localHcclComm; + static int stressMode; + static void* localHcclComm; // Flag to indicate if the thread has been initialized static std::atomic thread_initialized; diff --git a/torch_npu/npu/utils.py b/torch_npu/npu/utils.py index f2e2f65ade..05cd7de585 100644 --- a/torch_npu/npu/utils.py +++ b/torch_npu/npu/utils.py @@ -510,29 +510,34 @@ def clear_npu_overflow_flag(): hccl_detect_group = None -def stress_detect(detecct_type=0): - if detecct_type not in [0, 1]: - raise ValueError("Detecct_type should be 0 or 1. For details, 0 as `Online aicore detect`, 1 as `Online p2p detect`." + pta_error(ErrCode.VALUE)) +def stress_detect(mode=0): + if mode not in [0, 1]: + warnings.warn("Detecct_type should be 0 or 1. For details, 0 as `Online aicore detect`, 1 as `Online p2p detect`.") + return 1 torch_npu.npu._lazy_init() - if detecct_type == 1: + comm = 0 + if mode == 1: if not torch.distributed.is_initialized(): - raise ValueError("The torch.distributed should to be initialized for hccs detection" + pta_error(ErrCode.UNAVAIL)) + warnings.warn("The torch.distributed should to be initialized for hccs detection") + return 1 global hccl_detect_group if hccl_detect_group is None: rank = int(os.getenv('RANK', -1)) local_world_size = int(os.getenv('LOCAL_WORLD_SIZE', -1)) if rank == -1 or local_world_size == -1: - raise ValueError("Environment variable 'RANK' or 'LOCAL_WORLD_SIZE' is not set." + pta_error(ErrCode.UNAVAIL)) + warnings.warn("Environment variable 'RANK' or 'LOCAL_WORLD_SIZE' is not set.") + return 1 worker_index = rank // local_world_size + local_rank = rank % local_world_size local_ranks = [] for i in range(local_world_size): local_ranks.append(local_world_size * worker_index + i) try: hccl_detect_group = torch.distributed.new_group(ranks=local_ranks) - hccl_detect_group._get_backend(torch.device('npu'))._set_hccl_comm_for_hccs_detect() + comm = hccl_detect_group._get_backend(torch.device('npu')).get_hccl_comm(local_rank) except Exception as err: return 1 - return torch_npu._C._npu_stress_detect(detecct_type) + return torch_npu._C._npu_stress_detect(mode, comm) def current_blas_handle(): -- Gitee