diff --git a/test/torch_npu_schema.json b/test/torch_npu_schema.json index f73cc70ee945c2661bb08bacd4b3ad444b97ac25..f048d8ed2505fdbd5537f00d1e4a71fa535f1698 100644 --- a/test/torch_npu_schema.json +++ b/test/torch_npu_schema.json @@ -1233,7 +1233,7 @@ "signature": "(device_id)" }, "torch_npu.npu.stress_detect": { - "signature": "()" + "signature": "(mode=0)" }, "torch_npu.npu.seed": { "signature": "()" @@ -1677,7 +1677,7 @@ "signature": "(stream)" }, "torch_npu.npu.utils.stress_detect": { - "signature": "()" + "signature": "(mode=0)" }, "torch_npu.npu.utils.synchronize": { "signature": "(device=None)" diff --git a/third_party/acl/inc/aml/aml_fwk_detect.h b/third_party/acl/inc/aml/aml_fwk_detect.h index 2eb355b338156eaec2c65bb00375afe3c7c82ac6..26dbde4549643e80f7b8c675936dc8576feb85d1 100644 --- a/third_party/acl/inc/aml/aml_fwk_detect.h +++ b/third_party/acl/inc/aml/aml_fwk_detect.h @@ -32,8 +32,16 @@ typedef struct AmlAicoreDetectAttr { uint8_t reserve[64]; } AmlAicoreDetectAttr; +struct AmlP2PDetectAttr { + void *workspace; + uint64_t workspaceSize; + uint8_t reserve[64]; +}; + AmlStatus AmlAicoreDetectOnline(int32_t deviceId, const AmlAicoreDetectAttr *attr); +AmlStatus AmlP2PDetectOnline(int32_t devId, void *comm, const AmlP2PDetectAttr *attr); + #ifdef __cplusplus } #endif diff --git a/torch_npu/csrc/core/npu/interface/MlInterface.cpp b/torch_npu/csrc/core/npu/interface/MlInterface.cpp index b992b4a1880bdc4c0f27b3000c8685ba6730c230..4008c8eb27ac669a9f7e230444b1e3454726accc 100644 --- a/torch_npu/csrc/core/npu/interface/MlInterface.cpp +++ b/torch_npu/csrc/core/npu/interface/MlInterface.cpp @@ -14,6 +14,7 @@ namespace amlapi { REGISTER_LIBRARY(libascend_ml) LOAD_FUNCTION(AmlAicoreDetectOnline) +LOAD_FUNCTION(AmlP2PDetectOnline) bool IsExistAmlAicoreDetectOnline() { @@ -24,6 +25,15 @@ bool IsExistAmlAicoreDetectOnline() return isExist; } +bool IsExistAmlP2PDetectOnline() +{ + const static bool isExist = []() -> bool { + static auto func = GET_FUNC(AmlP2PDetectOnline); + return func != nullptr; + }(); + return isExist; +} + AmlStatus AmlAicoreDetectOnlineFace(int32_t deviceId, const AmlAicoreDetectAttr *attr) { typedef AmlStatus (*amlAicoreDetectOnline)(int32_t, const AmlAicoreDetectAttr *); @@ -35,5 +45,16 @@ AmlStatus AmlAicoreDetectOnlineFace(int32_t deviceId, const AmlAicoreDetectAttr return func(deviceId, attr); } +AmlStatus AmlP2PDetectOnlineFace(int32_t deviceId, void *comm, const AmlP2PDetectAttr *attr) +{ + typedef AmlStatus (*amlP2PDetectOnline)(int32_t, void *, const AmlP2PDetectAttr *); + static amlP2PDetectOnline func = nullptr; + if (func == nullptr) { + func = (amlP2PDetectOnline) GET_FUNC(AmlP2PDetectOnline); + } + TORCH_CHECK(func, "Failed to find function ", "AmlP2PDetectOnline", PTA_ERROR(ErrCode::NOT_FOUND)); + return func(deviceId, comm, attr); +} + } // namespace amlapi } // namespace c10_npu diff --git a/torch_npu/csrc/core/npu/interface/MlInterface.h b/torch_npu/csrc/core/npu/interface/MlInterface.h index 6af498629aa8b83ea2f3c571ed97a29d70882eb1..33389fcce3a53f7902c69ab7515dc7eb53a7d13b 100644 --- a/torch_npu/csrc/core/npu/interface/MlInterface.h +++ b/torch_npu/csrc/core/npu/interface/MlInterface.h @@ -8,10 +8,20 @@ namespace amlapi { */ bool IsExistAmlAicoreDetectOnline(); +/** + * This API is used to check whether AmlP2PDetectOnline exist. +*/ +bool IsExistAmlP2PDetectOnline(); + /** * This API is used to call AmlAicoreDetectOnline. */ AmlStatus AmlAicoreDetectOnlineFace(int32_t deviceId, const AmlAicoreDetectAttr *attr); +/** + * This API is used to call AmlP2PDetectOnline. +*/ +AmlStatus AmlP2PDetectOnlineFace(int32_t deviceId, void *comm, const AmlP2PDetectAttr *attr); + } // namespace amlapi } // namespace c10_npu diff --git a/torch_npu/csrc/npu/Module.cpp b/torch_npu/csrc/npu/Module.cpp index 8d8fb06843c02ba5d19ecd0ccadab8c733edb7d4..4c6a49ed02dd7b8ffd0b656e449b646b02ba4cd9 100644 --- a/torch_npu/csrc/npu/Module.cpp +++ b/torch_npu/csrc/npu/Module.cpp @@ -727,15 +727,29 @@ PyObject* THNPModule_maybeExchangeDevice_wrap(PyObject* self, PyObject* arg) END_HANDLE_TH_ERRORS } -PyObject* THNPModule_stressDetect_wrap(PyObject* self, PyObject* noargs) +PyObject* THNPModule_stressDetect_wrap(PyObject* self, PyObject* args) { HANDLE_TH_ERRORS + PyObject* value1 = nullptr; + PyObject* value2 = nullptr; + + if (!PyArg_ParseTuple(args, "OO", &value1, &value2)) { + ASCEND_LOGE("Stress detect failed, argument is invalid."); + return PyLong_FromLong(1); + } + int mode = THPUtils_unpackLong(value1); + int64_t comm = THPUtils_unpackLong(value2); + torch_npu::utils::npu_lazy_init(); - int device_id; - NPU_CHECK_ERROR_WITHOUT_UCE(c10_npu::GetDevice(&device_id)); + int deviceId; + aclError err = c10_npu::GetDevice(&deviceId); + if (err != ACL_ERROR_NONE) { + ASCEND_LOGE("Stress detect failed, error happened in GetDevice, err is %d.", err); + return PyLong_FromLong(1); + } - int ret = StressDetector::perform_stress_detect(device_id); + int ret = StressDetector::perform_stress_detect(deviceId, mode, comm); return PyLong_FromLong(ret); END_HANDLE_TH_ERRORS } @@ -1804,7 +1818,7 @@ static struct PyMethodDef THNPModule_methods[] = { {"_npu_restart_device", (PyCFunction)THNPModule_restart_device_wrap, METH_O, nullptr}, {"_npu_check_uce_in_memory", (PyCFunction)THNPModule_check_uce_in_memory_wrap, METH_O, nullptr}, {"_npu_get_uce_addr", (PyCFunction)THNPModule_get_uce_addr_wrap, METH_NOARGS, nullptr}, - {"_npu_stress_detect", (PyCFunction)THNPModule_stressDetect_wrap, METH_NOARGS, nullptr}, + {"_npu_stress_detect", (PyCFunction)THNPModule_stressDetect_wrap, METH_VARARGS, nullptr}, {"_npu_getLocalDevice", (PyCFunction)THNPModule_getLocalDevice_wrap, METH_NOARGS, nullptr}, {"_npu_getDeviceCount", (PyCFunction)THNPModule_getDeviceCount_wrap, METH_NOARGS, nullptr}, {"_npu_canDeviceAccessPeer", (PyCFunction)THNPModule_npuCanDeviceAccessPeer_wrap, METH_VARARGS, nullptr}, diff --git a/torch_npu/csrc/npu/Stress_detect.cpp b/torch_npu/csrc/npu/Stress_detect.cpp index 3fcade819bea6a6f011b04b5a5257c87b002021d..048fc2907fbe73bf2893712682e83cb418e7c964 100644 --- a/torch_npu/csrc/npu/Stress_detect.cpp +++ b/torch_npu/csrc/npu/Stress_detect.cpp @@ -16,6 +16,8 @@ std::mutex StressDetector::mtx; int StressDetector::device_id; void* StressDetector::workspaceAddr = nullptr; size_t StressDetector::workspaceSize = 0; +int StressDetector::stressMode = 0; +void* StressDetector::localHcclComm = nullptr; constexpr int kDetectSucceeded = 0; constexpr int kDetectFailed = 1; @@ -40,16 +42,33 @@ void StressDetector::worker_thread() // Execute the task int ret = -1; - if (c10_npu::amlapi::IsExistAmlAicoreDetectOnline()) { - AmlAicoreDetectAttr attr; - attr.mode = AML_DETECT_RUN_MODE_ONLINE; - attr.workspace = workspaceAddr; - attr.workspaceSize = workspaceSize; - ret = c10_npu::amlapi::AmlAicoreDetectOnlineFace(device_id, &attr); - ASCEND_LOGI("Stress detect with AmlAicoreDetectOnline, result is %d.", ret); - } else { - ret = c10_npu::acl::AclStressDetect(device_id, workspaceAddr, workspaceSize); - ASCEND_LOGI("Stress detect with StressDetect, result is %d.", ret); + try { + if (stressMode == 0) { + if (c10_npu::amlapi::IsExistAmlAicoreDetectOnline()) { + AmlAicoreDetectAttr attr; + attr.mode = AML_DETECT_RUN_MODE_ONLINE; + attr.workspace = workspaceAddr; + attr.workspaceSize = workspaceSize; + ret = c10_npu::amlapi::AmlAicoreDetectOnlineFace(device_id, &attr); + ASCEND_LOGI("Stress detect with AmlAicoreDetectOnline, result is %d.", ret); + } else { + ret = c10_npu::acl::AclStressDetect(device_id, workspaceAddr, workspaceSize); + ASCEND_LOGI("Stress detect with StressDetect, result is %d.", ret); + } + } else { + if (c10_npu::amlapi::IsExistAmlP2PDetectOnline()) { + AmlP2PDetectAttr attr; + attr.workspace = workspaceAddr; + attr.workspaceSize = workspaceSize; + ret = c10_npu::amlapi::AmlP2PDetectOnlineFace(device_id, localHcclComm, &attr); + ASCEND_LOGI("Stress detect with AmlP2PDetectOnline, result is %d.", ret); + } else { + ASCEND_LOGE("Stress detect with AmlP2PDetectOnline failed, CANN version lower than 8.1.RC2 and currently does not support AmlP2PDetectOnline."); + } + } + } catch (std::exception &e) { + ret = -1; + ASCEND_LOGE("Stress detect failed. type is %d, error:%s", stressMode, e.what()); } // Task complete, free memory @@ -90,7 +109,7 @@ int StressDetector::transfer_result(int detectResult) } // Synchronous stress detection task execution -int StressDetector::perform_stress_detect(int deviceid) +int StressDetector::perform_stress_detect(int deviceid, int mode, int64_t comm) { // If it's the first call, start the persistent thread if (!thread_initialized.load()) { @@ -132,6 +151,8 @@ int StressDetector::perform_stress_detect(int deviceid) StressDetector::device_id = deviceid; StressDetector::workspaceAddr = workspaceAddr; StressDetector::workspaceSize = workspaceSize; + StressDetector::stressMode = mode; + StressDetector::localHcclComm = reinterpret_cast(static_cast(comm)); // Mark new task submitted new_task_submitted.store(true); diff --git a/torch_npu/csrc/npu/Stress_detect.h b/torch_npu/csrc/npu/Stress_detect.h index 4319122be7c3024f5559730726bb2a31195c500e..47f020a8708fc42069108febdfadfc612cb492bb 100644 --- a/torch_npu/csrc/npu/Stress_detect.h +++ b/torch_npu/csrc/npu/Stress_detect.h @@ -9,10 +9,11 @@ #include #include #include "torch_npu/csrc/core/npu/NPUMacros.h" +#include "torch_npu/csrc/distributed/HCCLUtils.hpp" class StressDetector { public: - TORCH_NPU_API static int perform_stress_detect(int deviceid); + TORCH_NPU_API static int perform_stress_detect(int deviceid, int mode, int64_t comm); TORCH_NPU_API static void stop_worker_thread(); private: @@ -44,6 +45,8 @@ private: static int device_id; static void* workspaceAddr; static size_t workspaceSize; + static int stressMode; + static void* localHcclComm; // Flag to indicate if the thread has been initialized static std::atomic thread_initialized; diff --git a/torch_npu/npu/utils.py b/torch_npu/npu/utils.py index 4475a9fb4f098dfe442b3ed4adb1213bcb7644d7..05cd7de5857a67348e71bc7d4fe47b5726173a46 100644 --- a/torch_npu/npu/utils.py +++ b/torch_npu/npu/utils.py @@ -507,9 +507,37 @@ def clear_npu_overflow_flag(): torch_npu.npu_clear_float_status(float_status) -def stress_detect(): +hccl_detect_group = None + + +def stress_detect(mode=0): + if mode not in [0, 1]: + warnings.warn("Detecct_type should be 0 or 1. For details, 0 as `Online aicore detect`, 1 as `Online p2p detect`.") + return 1 torch_npu.npu._lazy_init() - return torch_npu._C._npu_stress_detect() + comm = 0 + if mode == 1: + if not torch.distributed.is_initialized(): + warnings.warn("The torch.distributed should to be initialized for hccs detection") + return 1 + global hccl_detect_group + if hccl_detect_group is None: + rank = int(os.getenv('RANK', -1)) + local_world_size = int(os.getenv('LOCAL_WORLD_SIZE', -1)) + if rank == -1 or local_world_size == -1: + warnings.warn("Environment variable 'RANK' or 'LOCAL_WORLD_SIZE' is not set.") + return 1 + worker_index = rank // local_world_size + local_rank = rank % local_world_size + local_ranks = [] + for i in range(local_world_size): + local_ranks.append(local_world_size * worker_index + i) + try: + hccl_detect_group = torch.distributed.new_group(ranks=local_ranks) + comm = hccl_detect_group._get_backend(torch.device('npu')).get_hccl_comm(local_rank) + except Exception as err: + return 1 + return torch_npu._C._npu_stress_detect(mode, comm) def current_blas_handle():