diff --git a/test/npu/test_stream.py b/test/npu/test_stream.py index 343ce4052f1f5dab5ff1a5c709df177db82c3d10..7cdfd748be83bc8f7e88fba35560bdf6d0be98f3 100644 --- a/test/npu/test_stream.py +++ b/test/npu/test_stream.py @@ -32,6 +32,18 @@ class TestNpuStream(TestCase): self.assertTrue(current_stream.npu_stream == current_raw_stream) self.assertTrue(current_stream.npu_stream == interface_raw_stream) + def test_priority(self): + s = torch.npu.Stream() + self.assertTrue((s.stream_id >> 5) == 3) + s = torch.npu.Stream(priority=0) + self.assertTrue((s.stream_id >> 5) == 3) + s = torch.npu.Stream(priority=1) + self.assertTrue((s.stream_id >> 5) == 3) + s = torch.npu.Stream(priority=-1) + self.assertTrue((s.stream_id >> 5) == 4) + s = torch.npu.Stream(priority=-2) + self.assertTrue((s.stream_id >> 5) == 4) + if __name__ == "__main__": run_tests() diff --git a/torch_npu/csrc/core/npu/NPUStream.cpp b/torch_npu/csrc/core/npu/NPUStream.cpp index e499474a6fdd7d87a858aca12bd8b35b9f67d4c3..699461b2bad29b9d059a1cda20f93666202d9e0e 100644 --- a/torch_npu/csrc/core/npu/NPUStream.cpp +++ b/torch_npu/csrc/core/npu/NPUStream.cpp @@ -52,6 +52,7 @@ struct LeakyStreamInternals { static c10::DeviceIndex num_npus = -1; static constexpr int kStreamsPerPoolBits = 5; static constexpr int kStreamsPerPool = 1 << kStreamsPerPoolBits; +static constexpr int kMaxStreamPriorities = 2; static constexpr int kSyncLaunchStreamsPerPool = 4; // Default streams init flags static bool initialize_flag[C10_COMPILE_TIME_MAX_NPUS] = {false}; @@ -66,20 +67,29 @@ static LeakyStreamInternals secondary_streams[C10_COMPILE_TIME_MAX_NPUS]; static std::once_flag device_flags[C10_COMPILE_TIME_MAX_NPUS]; // SyncLaunch streams pool init flags static std::once_flag device_sync_launch_flags[C10_COMPILE_TIME_MAX_NPUS]; -static std::atomic npu_counters[C10_COMPILE_TIME_MAX_NPUS]; +static std::array< + std::array, C10_COMPILE_TIME_MAX_NPUS>, + kMaxStreamPriorities> + npu_counters; static std::atomic sync_stream_counters[C10_COMPILE_TIME_MAX_NPUS]; // npu_streams is a stream pool, each device has a stream pool, // and 8 streams are created in each pool. -static std::array npu_streams[C10_COMPILE_TIME_MAX_NPUS]; +static std::array< + std::array< + std::array, + C10_COMPILE_TIME_MAX_NPUS>, + kMaxStreamPriorities> + npu_streams; static thread_local std::unique_ptr current_streams = nullptr; static std::array sync_launch_streams[C10_COMPILE_TIME_MAX_NPUS]; enum class StreamIdType : uint8_t { DEFAULT = 0x0, - HCCL = 0x1, - SECONDARY = 0x2, - SYNCLAUNCH = 0x3, + SECONDARY = 0x1, + SYNCLAUNCH = 0x2, + NORMAL = 0x3, + HIGH = 0x4, }; std::ostream& operator<<(std::ostream& stream, StreamIdType s) @@ -88,8 +98,11 @@ std::ostream& operator<<(std::ostream& stream, StreamIdType s) case StreamIdType::DEFAULT: stream << "DEFAULT"; break; - case StreamIdType::HCCL: - stream << "HCCL"; + case StreamIdType::NORMAL: + stream << "NORMAL"; + break; + case StreamIdType::HIGH: + stream << "HIGH"; break; case StreamIdType::SECONDARY: stream << "SECONDARY"; @@ -157,9 +170,11 @@ static c10::StreamId NPUStream_getStreamId(const LeakyStreamInternals* ptr) if (ptr == &default_streams[device_index]) { return makeStreamId(StreamIdType::DEFAULT, 0); } - if (pointer_within(ptr, npu_streams[device_index])) { - return makeStreamId( - StreamIdType::HCCL, ptr - npu_streams[device_index].data()); + for (const auto p : c10::irange(kMaxStreamPriorities)) { + if (pointer_within(ptr, npu_streams[p][device_index])) { + return makeStreamId(StreamIdType(static_cast(StreamIdType::NORMAL) + p), + ptr - npu_streams[p][device_index].data()); + } } if (pointer_within(ptr, sync_launch_streams[device_index])) { return makeStreamId( @@ -196,7 +211,9 @@ static void initGlobalStreamState() } // Initializes default streams default_streams[device_id].device_index = device_id; - npu_counters[device_id] = 0; + for (const auto p : c10::irange(kMaxStreamPriorities)) { + npu_counters[p][device_id] = 0; + } auto& default_streamsi = default_streams[device_id]; NPU_CHECK_ERROR( acl::AclrtCreateStreamWithConfig(&default_streamsi.stream, 0, (ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC))); @@ -217,12 +234,14 @@ static void initDeviceStreamState(c10::DeviceIndex device_index) NPUGuard device_guard{device_index}; static int StreamsPerPool = GetStreamsPerPool(); for (auto i = decltype(StreamsPerPool){0}; i < StreamsPerPool; ++i) { - auto& npu_streami = npu_streams[device_index][i]; + for (const auto p : c10::irange(kMaxStreamPriorities)) { + auto& npu_streami = npu_streams[p][device_index][i]; - npu_streami.device_index = device_index; + npu_streami.device_index = device_index; - NPU_CHECK_ERROR( - acl::AclrtCreateStreamWithConfig(&npu_streami.stream, 0, (ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC))); + NPU_CHECK_ERROR(acl::AclrtCreateStreamWithConfig( + &npu_streami.stream, 0, (ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC))); + } } } @@ -290,8 +309,9 @@ LeakyStreamInternals* NPUStream_internals(NPUStream s) " Did you manufacture the StreamId yourself? Don't do that; use the", " official API like c10::cuda::getStreamFromPool() to get a new stream.", PTA_ERROR(ErrCode::PARAM)); return &default_streams[device_index]; - case StreamIdType::HCCL: - return &npu_streams[device_index][si]; + case StreamIdType::NORMAL: + case StreamIdType::HIGH: + return &npu_streams[static_cast(st) - static_cast(StreamIdType::NORMAL)][device_index][si]; case StreamIdType::SECONDARY: return &secondary_streams[device_index]; case StreamIdType::SYNCLAUNCH: @@ -351,7 +371,7 @@ aclrtStream NPUStream::stream() const return cur_ptr->stream; } -NPUStream getNPUStreamFromPool(c10::DeviceIndex device_index) +NPUStream getStreamFromPool(const int priority, c10::DeviceIndex device_index) { initNPUStreamsOnce(); if (device_index == -1) { @@ -362,30 +382,21 @@ NPUStream getNPUStreamFromPool(c10::DeviceIndex device_index) // Initializes the stream pools (once) std::call_once( device_flags[device_index], initDeviceStreamState, device_index); + auto pri_idx = std::clamp(-priority, 0, kMaxStreamPriorities - 1); + const auto idx = get_idx(npu_counters[pri_idx][device_index]); + return NPUStream_fromInternals(&npu_streams[pri_idx][device_index][idx]); +} - const auto idx = get_idx(npu_counters[device_index]); - return NPUStream_fromInternals(&npu_streams[device_index][idx]); +NPUStream getNPUStreamFromPool(c10::DeviceIndex device_index) +{ + return getStreamFromPool(0, device_index); } NPUStream getStreamFromPool(const bool isHighPriority, c10::DeviceIndex device_index) { initNPUStreamsOnce(); - if (device_index == -1) { - device_index = current_device(); - } - check_npu(device_index); - - // Initializes the stream pools (once) - std::call_once( - device_flags[device_index], initDeviceStreamState, device_index); - - if (isHighPriority) { - const auto idx = get_idx(npu_counters[device_index]); - return NPUStream_fromInternals(&npu_streams[device_index][idx]); - } - - const auto idx = get_idx(npu_counters[device_index]); - return NPUStream_fromInternals(&npu_streams[device_index][idx]); + int priority = isHighPriority ? -kMaxStreamPriorities + 1 : 0; + return getStreamFromPool(priority, device_index); } NPUStream getDefaultNPUStream(c10::DeviceIndex device_index) @@ -627,12 +638,14 @@ void recovery_all_npu_streams(c10::DeviceIndex device_index) acl::AclrtCreateStreamWithConfig(&secondary_streamsi.stream, 0, (ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC))); static int StreamsPerPool = GetStreamsPerPool(); for (auto i = decltype(StreamsPerPool){0}; i < StreamsPerPool; ++i) { - auto& npu_streami = npu_streams[device_index][i]; - if (npu_streami.stream == nullptr) { - continue; + for (const auto p : c10::irange(kMaxStreamPriorities)) { + auto& npu_streami = npu_streams[p][device_index][i]; + if (npu_streami.stream == nullptr) { + continue; + } + NPU_CHECK_ERROR(acl::AclrtCreateStreamWithConfig( + &npu_streami.stream, 0, (ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC))); } - NPU_CHECK_ERROR( - acl::AclrtCreateStreamWithConfig(&npu_streami.stream, 0, (ACL_STREAM_FAST_LAUNCH | ACL_STREAM_FAST_SYNC))); } } diff --git a/torch_npu/csrc/core/npu/NPUStream.h b/torch_npu/csrc/core/npu/NPUStream.h index bea6e28650ff24e8efe06be43b0754d344f7b472..9b7d488f13369246ac880f65c5312b39b2201147 100644 --- a/torch_npu/csrc/core/npu/NPUStream.h +++ b/torch_npu/csrc/core/npu/NPUStream.h @@ -112,6 +112,7 @@ public: private: c10::Stream stream_; }; +NPUStream getStreamFromPool(const int priority, c10::DeviceIndex device = -1); C10_NPU_API NPUStream getNPUStreamFromPool(c10::DeviceIndex device = -1); diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index f68b8404d680150b57e48458b8d8ac837d55f70b..de6c3d3e43827ecff15a0e76d5596fd48bee1d8b 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -94,7 +94,10 @@ inline c10_npu::NPUStream getNPUStreamByCurrentType(c10::DeviceIndex device = -1 { auto current_Stream = c10_npu::getCurrentNPUStream(device); if (!current_Stream.isSyncLaunchStream()) { - return c10_npu::getNPUStreamFromPool(device); + bool force_high = c10d::getCvarBool(TORCH_HCCL_HIGH_PRIORITY, false); + auto s = c10_npu::getStreamFromPool(force_high, device); + ASCEND_LOGD("Get stream, stream id: %zu", static_cast(s.id())) + return s; } return c10_npu::getNPUStreamFromSyncLaunchPool(device); } diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp index 2be4adcb3ed7782d94eb21e06ed02fa445428a98..039493d1a955b4393f49fb0bd52034e47929e62d 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.hpp @@ -69,6 +69,10 @@ static std::vector TORCH_HCCL_HEARTBEAT_TIMEOUT_SEC = { static std::vector TORCH_HCCL_COORD_CHECK_MILSEC = { "TORCH_HCCL_COORD_CHECK_MILSEC"}; +// Control whether to always use high priority streams +static std::vector TORCH_HCCL_HIGH_PRIORITY = { + "TORCH_HCCL_HIGH_PRIORITY"}; + // A struct to hold the latest status of the process group. struct ProcessGroupStatus { // the sequential number of the last collective enqueued into workMetaList_ diff --git a/torch_npu/csrc/npu/Stream.cpp b/torch_npu/csrc/npu/Stream.cpp index 180fede5ec3acb34785a91e2fa21bb48640938bf..23ebaf7b99ebd2ef9b993ab494420506dd67a8bc 100644 --- a/torch_npu/csrc/npu/Stream.cpp +++ b/torch_npu/csrc/npu/Stream.cpp @@ -61,7 +61,7 @@ static PyObject *THNPStream_pynew( c10_npu::NPUStream::unpack3( stream_id, device_index, static_cast(device_type)) : (is_sync_launch ? c10_npu::getNPUStreamFromSyncLaunchPool() : - c10_npu::getNPUStreamFromPool()); + c10_npu::getStreamFromPool(priority)); THNPStream *self = (THNPStream *)ptr.get(); self->stream_id = static_cast(stream.id());