diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index 4806baf2ef8492216ccf06d0888e44f9d6d0f3d7..4bffdb266c7a710e1c4936152b88e738cda343b9 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -1810,9 +1810,10 @@ c10::intrusive_ptr ProcessGroupHCCL::collective( } // No need to detect batch_isend_irecv inputs is incorrect, need require special treatments. + // Broadcast only need detect src rank, need require special treatments. if (c10_npu::model_state().get_model_mode() == c10_npu::ModelMode::L_TRAIN && c10_npu::option::OptionsManager::GetSilenceCheckFlag() != c10_npu::option::CHECK_CLOSE - && opType != c10d::OpType::UNKNOWN) { + && opType != c10d::OpType::UNKNOWN && opType != c10d::OpType::BROADCAST) { for (const auto i : c10::irange(inputs.size())) { npuGuard.set_index(devices[i].index()); c10_npu::NPUStreamGuard guard(hcclStreams[i]); @@ -2278,6 +2279,20 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( return HCCL_SUCCESS; }, + [&](std::vector& hcclStreams, c10::intrusive_ptr&) { + // Only need detect src rank. + if (c10_npu::model_state().get_model_mode() == c10_npu::ModelMode::L_TRAIN + && c10_npu::option::OptionsManager::GetSilenceCheckFlag() != c10_npu::option::CHECK_CLOSE) { + const std::vector& ranks = groupRanks(); + if (opts.rootRank == ranks[rank_]) { + for (const auto i : c10::irange(tensors.size())) { + c10_npu::NPUStreamGuard guard(hcclStreams[0]); + silenceCheck(tensors[i], c10d::OpType::BROADCAST); + } + } + } + }, + [&](std::vector& hcclStreams, c10::intrusive_ptr&) {}, c10d::OpType::BROADCAST); } diff --git a/torch_npu/csrc/profiler/profiler_mgr.cpp b/torch_npu/csrc/profiler/profiler_mgr.cpp index 38199805210612f7f0838ce01c373ab7070ceb19..efb598f36a264f40cd27fee4f992a3e454a6609c 100644 --- a/torch_npu/csrc/profiler/profiler_mgr.cpp +++ b/torch_npu/csrc/profiler/profiler_mgr.cpp @@ -74,7 +74,6 @@ void ProfilerMgr::EnableMsProfiler(uint32_t *deviceIdList, uint32_t deviceNum, a void ProfilerMgr::Start(const NpuTraceConfig &npu_config, bool cpu_trace) { - c10_npu::npuSynchronizeDevice(); if (npu_trace_.load() == true) { aclprofAicoreMetrics aic_metrics = ACL_AICORE_NONE; auto level_iter = trace_level_map_.find(npu_config.trace_level); diff --git a/torch_npu/profiler/analysis/_profiling_parser.py b/torch_npu/profiler/analysis/_profiling_parser.py index 9b8e3d8d3f7dfcb9044ee72965b4f91ef1055964..ccbdf08c5ad6337213325be67a8a09d30bed39e5 100644 --- a/torch_npu/profiler/analysis/_profiling_parser.py +++ b/torch_npu/profiler/analysis/_profiling_parser.py @@ -68,7 +68,7 @@ class ProfilingParser: return if not ProfilerPathManager.get_cann_path(self._profiler_path): return - if not CannPackageManager.cann_package_support_export_db(): + if not CannPackageManager.SUPPORT_EXPORT_DB: raise RuntimeError("Current CANN package version does not support export db. " "If you want to export db, you can install supported CANN package version.") diff --git a/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py b/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py index 228ae69525dc7ea597079fe01c30e6b774dab469..8086cbb2e0dd9be148610025b457bbfc60ee765a 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_cann_package_manager.py @@ -6,21 +6,23 @@ from ._constant import print_error_msg __all__ = [] +def check_cann_package_support_export_db() -> bool: + err_msg = "Failed to check if current CANN package version support export db!" + try: + msprof_path = shutil.which("msprof") + if not msprof_path: + print_error_msg(f"{err_msg} msprof command not found!") + return False + COMMAND_SUCCESS = 0 + completed_process = subprocess.run([msprof_path, "--help"], capture_output=True, shell=False, text=True) + if completed_process.returncode != COMMAND_SUCCESS: + print_error_msg(f"{err_msg} Failed to run command: msprof --help!") + return False + return "--type" in completed_process.stdout + except Exception: + print_error_msg(err_msg) + return False + + class CannPackageManager: - @classmethod - def cann_package_support_export_db(cls) -> bool: - err_msg = "Failed to check if current CANN package version support export db!" - try: - msprof_path = shutil.which("msprof") - if not msprof_path: - print_error_msg(f"{err_msg} msprof command not found!") - raise RuntimeError(f"{err_msg} msprof command not found!") - - COMMAND_SUCCESS = 0 - completed_process = subprocess.run([msprof_path, "--help"], capture_output=True, shell=False, text=True) - if completed_process.returncode != COMMAND_SUCCESS: - print_error_msg(f"{err_msg} Failed to run command: msprof --help!") - raise RuntimeError(f"{err_msg} Failed to run command: msprof --help!") - return "--type" in completed_process.stdout - except Exception as err: - raise RuntimeError(err_msg) from err + SUPPORT_EXPORT_DB = check_cann_package_support_export_db() diff --git a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py index 5353d4c87734e1ac376c51ff2a639a47be105a4c..40d7be57945add9f5039f9de05f08a8b119c4242 100644 --- a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py +++ b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py @@ -60,14 +60,14 @@ class ParserDepsConfig: Constant.MEMORY_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PARSER, Constant.MEMORY_PREPARE, Constant.FWK_API_DB_PARSER]}, Constant.STEP_INFO_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.DB_PARSER, Constant.TREE_BUILD_PARSER]}, + Constant.DEPS: [Constant.DB_PARSER, Constant.TREE_BUILD_PARSER, Constant.MEMORY_DB_PARSER]}, Constant.COMMUNICATION_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PARSER, Constant.CANN_ANALYZE_PARSER, Constant.STEP_INFO_DB_PARSER]}, Constant.TRACE_STEP_TIME_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.DB_PARSER, Constant.STEP_INFO_DB_PARSER]}, + Constant.DEPS: [Constant.DB_PARSER, Constant.STEP_INFO_DB_PARSER, Constant.COMMUNICATION_DB_PARSER]}, Constant.GC_RECORD_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.DB_PARSER]}, + Constant.DEPS: [Constant.DB_PARSER, Constant.TRACE_STEP_TIME_DB_PARSER]}, Constant.MEMORY_TIMELINE_PARSER: {} } @@ -84,6 +84,6 @@ class ParserDepsConfig: Constant.MEMORY_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PARSER, Constant.FWK_API_DB_PARSER]}, Constant.GC_RECORD_DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.DB_PARSER]}, + Constant.DEPS: [Constant.DB_PARSER, Constant.MEMORY_DB_PARSER]}, Constant.MEMORY_TIMELINE_PARSER: {} } diff --git a/torch_npu/profiler/profiler_interface.py b/torch_npu/profiler/profiler_interface.py index 80da5197f46ba9c5f40bc3fa64faf4d988861c77..468363c347e8919ebf06c7f0057f65031596225b 100644 --- a/torch_npu/profiler/profiler_interface.py +++ b/torch_npu/profiler/profiler_interface.py @@ -148,7 +148,7 @@ class _ProfInterface: print_warn_msg("Experimental config will not be uesd while ProfilerActivity.NPU is not set.") if ProfilerActivity.NPU in self.activities and self.experimental_config.export_type == Constant.Db: - if not CannPackageManager.cann_package_support_export_db(): + if not CannPackageManager.SUPPORT_EXPORT_DB: raise RuntimeError("Current cann package does not support export db. " "If you want to export db, you can install supported CANN package version.") diff --git a/torch_npu/utils/_step.py b/torch_npu/utils/_step.py index 960c0afff2703c9885986627f7a6a19326679b6e..89e0efe61760772ffabd7cbb93c508e531a0e2b4 100644 --- a/torch_npu/utils/_step.py +++ b/torch_npu/utils/_step.py @@ -44,30 +44,43 @@ class PerfDumpState: perf_dump_state = PerfDumpState() perf_dump_enable = False -IS_IN_BACKWARD = 0 +IS_IN_BACKWARD = False -def input_hook(idx, asd_flag): +def input_hook(idx, asd_flag, first_module_id, first_module_name, module_name): def hook(grad): global IS_IN_BACKWARD if idx != "": - IS_IN_BACKWARD = IS_IN_BACKWARD & 1 # 011 & 001 = 001 + # only silent check _silent_fault_detector_v2.silent_fault_check(idx, asd_flag, grad) else: - IS_IN_BACKWARD = IS_IN_BACKWARD & 2 # 011 & 010 = 010 - - if not IS_IN_BACKWARD: + IS_IN_BACKWARD = False torch_npu._C._npu_set_call_state("forward") + + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: input_hook, idx is {idx} IS_IN_BACKWARD is {IS_IN_BACKWARD}, first_module_id is {first_module_id}, first_module_name is {first_module_name}, module_name is {module_name}") return return hook -def output_hook(grad): - global IS_IN_BACKWARD - IS_IN_BACKWARD = 3 # 011 - torch_npu._C._npu_set_call_state("backward") - return grad +def output_hook(first_module_id, first_module_name, module_name): + def hook1(grad): + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + global IS_IN_BACKWARD + if IS_IN_BACKWARD: + print(f"{rankprefix} SilentCheckv2: output_hook, exception scene. IS_IN_BACKWARD is {IS_IN_BACKWARD} before set. first_module_id is {first_module_id}, first_module_name is {first_module_name}, module_name is {module_name}") + IS_IN_BACKWARD = True + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: output_hook, IS_IN_BACKWARD is {IS_IN_BACKWARD}, first_module_id is {first_module_id}, first_module_name is {first_module_name}, module_name is {module_name}") + torch_npu._C._npu_set_call_state("backward") + return grad + return hook1 def _is_inner_module(module): @@ -87,20 +100,27 @@ class SilentCheckState: self.input_hook_flag = False self.is_training = False self.first_module_id = "" + self.first_module_name = "" self.first_weight = None self.last_weight = None self.last_tensor = None self.last_tensor_id = None self.first_tensor_id = None - def init_module_info(self, module_id, training): + def init_module_info(self, module_id, training, module_name): self.first_module_id = module_id self.first_forward = False self.is_training = training + self.first_module_name = module_name if self.is_training: torch_npu._C._npu_set_module_train_state("train") else: torch_npu._C._npu_set_module_train_state("infer") + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: init_module_info, first_module_id is {module_id}, is_training is {training}, module_name is {module_name}") def check_tensor_dtype(self, tensor): if not self.dtype_support: @@ -122,20 +142,20 @@ class SilentCheckState: self.first_weight = param break - def register_input_hook_before_call(self, asd_flag, *args): + def register_input_hook_before_call(self, asd_flag, module, *args): # Search the first tensor (if the first tensor is input) if self.is_training and not self.input_hook_flag: for x in args: if isinstance(x, torch.Tensor) and x.requires_grad: - x.register_hook(input_hook(self.first_module_id, asd_flag)) + x.register_hook(input_hook(self.first_module_id, asd_flag, self.first_module_id, self.first_module_name, module._get_name())) self.input_hook_flag = True break - def register_input_hook_after_call(self, output): + def register_input_hook_after_call(self, output, module): # Search the first tensor (if the first tensor is output of an inner module) if not self.input_hook_flag: if isinstance(output, torch.Tensor) and output.requires_grad: - output.register_hook(input_hook(self.first_module_id, asd_enable)) + output.register_hook(input_hook(self.first_module_id, asd_enable, self.first_module_id, self.first_module_name, module._get_name())) self.input_hook_flag = True self.first_tensor_id = id(output) @@ -152,20 +172,27 @@ class SilentCheckState: self.last_tensor_id = id(output) self.last_tensor = output - def init_all_hook(self, asd_flag): + def init_all_hook(self, asd_flag, module): if self.is_training: # Otherwise, there is only one weight in the outer module if self.first_tensor_id != self.last_tensor_id: - if self.last_tensor is not None: - self.last_tensor.register_hook(output_hook) - if self.last_weight_hook_handles.get(self.first_module_id, None) is None: - if self.last_weight is not None: - last_weight_handle = self.last_weight.register_hook(output_hook) + if self.last_weight is not None and self.first_weight is not None: + if not self.input_hook_flag: + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + print(f"{rankprefix} SilentCheckv2: init_all_hook, not find input hook. first_module_id is {self.first_module_id}, first_module_name is {self.first_module_name}, is_training is {self.is_training}, module_name is {module._get_name()}") + if self.last_weight_hook_handles.get(self.first_module_id, None) is None: + last_weight_handle = self.last_weight.register_hook(output_hook(self.first_module_id, self.first_module_name, module._get_name())) self.last_weight_hook_handles[self.first_module_id] = last_weight_handle - if self.weight_hook_handles.get(self.first_module_id, None) is None: - if self.first_weight is not None: - first_weight_handle = self.first_weight.register_hook(input_hook("", asd_flag)) + if self.weight_hook_handles.get(self.first_module_id, None) is None: + first_weight_handle = self.first_weight.register_hook(input_hook("", asd_flag, self.first_module_id, self.first_module_name, module._get_name())) self.weight_hook_handles[self.first_module_id] = first_weight_handle + elif self.last_weight is not None or self.first_weight is not None: + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + print(f"{rankprefix} SilentCheckv2: init_all_hook, weight or last weight is None. first_module_id is {self.first_module_id}, first_module_name is {self.first_module_name}, is_training is {self.is_training}, module_name is {module._get_name()}") self.init_marks[self.first_module_id] = True @@ -285,7 +312,7 @@ def _custom_call(self, *args, **kwargs): if asd_enable and not IS_IN_BACKWARD: if silent_check.first_forward: - silent_check.init_module_info(id(self), self.training) + silent_check.init_module_info(id(self), self.training, self._get_name()) self.outer = True if silent_check.is_training and not silent_check.init_marks.get(silent_check.first_module_id, False): @@ -301,7 +328,7 @@ def _custom_call(self, *args, **kwargs): warnings.warn(f"Warning: Module has unsupported dtype tensor, silent check will be closed.") # Search the first tensor (if the first tensor is input) - silent_check.register_input_hook_before_call(asd_enable, *args) + silent_check.register_input_hook_before_call(asd_enable, self, *args) tmp = original_call(self, *args, **kwargs) @@ -310,7 +337,7 @@ def _custom_call(self, *args, **kwargs): silent_check.search_first_weight(self) # Search the first tensor (if the first tensor is output of an inner module) - silent_check.register_input_hook_after_call(tmp) + silent_check.register_input_hook_after_call(tmp, self) # Search the last weight (only in inner module) silent_check.search_last_weight(self) @@ -325,7 +352,7 @@ def _custom_call(self, *args, **kwargs): if asd_enable and not IS_IN_BACKWARD: if hasattr(self, "outer") and self.outer: - silent_check.init_all_hook(asd_enable) + silent_check.init_all_hook(asd_enable, self) silent_check.init_param() self.outer = False @@ -345,6 +372,20 @@ def _parse_perf_config(): return config_dict +original_broadcast = torch.distributed.broadcast + + +def _broadcast(tensor, src, group=None, async_op=False): + global silent_check + global IS_IN_BACKWARD + rankprefix = "" + if torch.distributed.is_available() and torch.distributed.is_initialized(): + rankprefix = f"[rank{torch.distributed.get_rank()}]" + if rankprefix == "[rank0]": + print(f"{rankprefix} SilentCheckv2: broadcast, IS_IN_BACKWARD is {IS_IN_BACKWARD}, first_module_id is {silent_check.first_module_id}, is_training is {silent_check.is_training}, first_module_name is {silent_check.first_module_name}") + original_broadcast(tensor, src, group, async_op) + + def add_perf_dump_patch(): global perf_dump_enable global asd_enable @@ -366,3 +407,4 @@ def add_perf_dump_patch(): if perf_dump_enable or asd_enable: Module.__call__ = _custom_call + torch.distributed.broadcast = _broadcast