diff --git a/test/distributed/rpc/test_multi_machines_multi_devices_client.py b/test/distributed/rpc/test_multi_machines_multi_devices_client.py new file mode 100644 index 0000000000000000000000000000000000000000..2225650ace5d1d2f795888d16f365c120588d954 --- /dev/null +++ b/test/distributed/rpc/test_multi_machines_multi_devices_client.py @@ -0,0 +1,53 @@ +import os +import torch +from torch import multiprocessing as mp +import torch.distributed.rpc as rpc +import torch_npu +from torch_npu.distributed.rpc.options import NPUTensorPipeRpcBackendOptions +from torch_npu.testing.common_distributed import skipIfUnsupportMultiNPU +from torch_npu.testing.testcase import TestCase, run_tests + + +class TestRpc(TestCase): + world_size = 4 + local_size = 2 + test_case_num = 7 + + @classmethod + def worker_loop(cls, process_id, world_size): + npu_id_ = f'npu:{process_id}' + worker_name_ = f'worker{process_id}' + torch.npu.set_device(npu_id_) + rpc.init_rpc(worker_name_, rank=process_id, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def echo(cls, input1, input2): + return (input1, input2) + + def _test_case(self): + os.environ['GLOO_SOCKET_IFNAME'] = 'enp189s0f0' + os.environ['TP_SOCKET_IFNAME'] = 'enp189s0f0' + os.environ['HCCL_SOCKET_IFNAME'] = 'enp189s0f0' + os.environ['MASTER_ADDR'] = '90.90.85.191' + os.environ['MASTER_PORT'] = '29501' + + ctx = mp.get_context('spawn') + ps = [] + for i in range(self.local_size, self.world_size): + p = ctx.Process(target=TestRpc.worker_loop, args=(i, self.world_size)) + p.start() + ps.append(p) + + for p in ps: + p.join() + + @skipIfUnsupportMultiNPU(2) + def test_spawn_subprocess(self): + for _ in range(self.test_case_num): + self._test_case() + + +if __name__ == '__main__': + run_tests() diff --git a/test/distributed/rpc/test_multi_machines_multi_devices_master.py b/test/distributed/rpc/test_multi_machines_multi_devices_master.py new file mode 100644 index 0000000000000000000000000000000000000000..148bdcb48b120609e8c1da680f1fa3e396e31eda --- /dev/null +++ b/test/distributed/rpc/test_multi_machines_multi_devices_master.py @@ -0,0 +1,249 @@ +import os +import time +import warnings +import torch +import torch.distributed as dist +from torch import multiprocessing as mp +import torch.distributed.rpc as rpc +import torch_npu +from torch_npu.distributed.rpc.options import NPUTensorPipeRpcBackendOptions +from torch_npu.testing.common_distributed import skipIfUnsupportMultiNPU +from torch_npu.testing.testcase import TestCase, run_tests + + +class TestRpc(TestCase): + world_size = 4 + local_size = 2 + + def setUp(self): + os.environ['GLOO_SOCKET_IFNAME'] = 'enp189s0f0' + os.environ['TP_SOCKET_IFNAME'] = 'enp189s0f0' + os.environ['HCCL_SOCKET_IFNAME'] = 'enp189s0f0' + os.environ['MASTER_ADDR'] = '90.90.85.191' + os.environ['MASTER_PORT'] = '29501' + + @classmethod + @rpc.functions.async_execution + def async_add_chained(cls, to, x, y, z): + # This function runs on "worker1" and returns immediately when + # the callback is installed through the `then(cb)` API. In the + # mean time, the `rpc_async` to "worker2" can run concurrently. + # When the return value of that `rpc_async` arrives at + # "worker1", "worker1" will run the lambda function accordingly + # and set the value for the previously returned `Future`, which + # will then trigger RPC to send the result back to "worker0". + return rpc.rpc_async(to, TestRpc.echo, args=(x, y)).then( + lambda fut: fut.wait() + ) + + @classmethod + def echo(cls, input1, input2): + return (input1, input2) + + @classmethod + def set_options(cls): + options = NPUTensorPipeRpcBackendOptions(num_worker_threads=8, device_maps={'worker1': {'npu:0': 'npu:1'}}) + options.rpc_timeout = 300 + options.num_worker_threads = 16 + options.set_device_map('worker2', {'npu:0': 'npu:2'}) + options.set_device_map('worker3', {'npu:0': 'npu:3'}) + + return options + + @classmethod + def init_worker_info(cls, process_id): + npu_id_ = f'npu:{process_id}' + worker_name_ = f'worker{process_id}' + torch.npu.set_device(npu_id_) + warnings.filterwarnings('ignore', category=UserWarning) + return npu_id_, worker_name_ + + @classmethod + def _test_async_call_for_cpu(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + times = [] + for i in range(2): + start = time.time() + fut = rpc.rpc_async('worker2', TestRpc.echo, args=(inputs[i], inputs[i])) + rets = fut.wait() + times.append(time.time() - start) + print('test_async_call_for_cpu_1M cost:', times[0], 's test_async_call_for_cpu_1G cost:', times[1], 's') + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def _test_sync_call_for_cpu(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + times = [] + for i in range(2): + start = time.time() + rets = rpc.rpc_sync('worker2', TestRpc.echo, args=(inputs[i], inputs[i])) + times.append(time.time() - start) + print('test_sync_call_for_cpu_1M cost:', times[0], 's test_sync_call_for_cpu_1G cost:', times[1], 's') + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def _test_async_call_for_npu(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + times = [] + for i in range(2): + start = time.time() + fut = rpc.rpc_async('worker2', TestRpc.echo, args=(inputs[i].npu(), inputs[i].npu())) + rets = fut.wait() + times.append(time.time() - start) + print('test_async_call_for_npu_1M cost:', times[0], 's test_async_call_for_npu_1G cost:', times[1], 's') + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def _test_sync_call_for_npu(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + times = [] + for i in range(2): + start = time.time() + rets = rpc.rpc_sync('worker2', TestRpc.echo, args=(inputs[i].npu(), inputs[i].npu())) + times.append(time.time() - start) + print('test_sync_call_for_npu_1M cost:', times[0], 's test_sync_call_for_npu_1G cost:', times[1], 's') + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def _test_remote_rref(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + times = [] + for i in range(2): + start = time.time() + rref1 = rpc.remote('worker1', TestRpc.echo, args=(inputs[i].npu(), inputs[2])) + rref2 = rpc.remote('worker2', TestRpc.echo, args=(inputs[i].npu(), inputs[3])) + rref3 = rpc.remote('worker3', TestRpc.echo, args=(inputs[i].npu(), inputs[4])) + rref = rref1.to_here() + rref2.to_here() + rref3.to_here() + times.append(time.time() - start) + print('test_remote_rref_1M cost:', times[0], 's test_remote_rref_1G cost:', times[1], 's') + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def _test_async_execution(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + times = [] + for i in range(2): + start = time.time() + ret = rpc.rpc_sync('worker1', TestRpc.async_add_chained, + args=('worker2', inputs[i].npu(), inputs[2], inputs[3])) + times.append(time.time() - start) + print('test_async_execution_1M cost:', times[0], 's test_async_execution_1G cost:', times[1], 's') + elif pid == 1: + options = NPUTensorPipeRpcBackendOptions(num_worker_threads=8, device_maps={'worker2': {'npu:1': 'npu:2'}}) + options.rpc_timeout = 300 + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + @classmethod + def _test_get_worker_info(cls, pid, inputs, world_size): + npu_id_, worker_name_ = TestRpc.init_worker_info(pid) + if pid == 0: + options = TestRpc.set_options() + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE, rpc_backend_options=options) + + start = time.time() + worker_info = rpc.get_worker_info(worker_name_) + print('test_get_worker_info cost:', time.time() - start, 's') + else: + rpc.init_rpc(worker_name_, rank=pid, world_size=world_size, + backend=rpc.backend_registry.BackendType.NPU_TENSORPIPE) + rpc.shutdown() + + def _test_multiprocess(self, f, inputs): + ctx = mp.get_context('spawn') + ps = [] + for i in range(self.local_size): + p = ctx.Process(target=f, args=(i, inputs, self.world_size)) + p.start() + ps.append(p) + + for p in ps: + p.join() + + @skipIfUnsupportMultiNPU(2) + def test_async_call_for_cpu(self): + inputs = [torch.rand(1024, 1024).cpu(), torch.rand(1024, 1024, 1024).cpu()] + self._test_multiprocess(TestRpc._test_async_call_for_cpu, inputs) + + @skipIfUnsupportMultiNPU(2) + def test_sync_call_for_cpu(self): + inputs = [torch.rand(1024, 1024).cpu(), torch.rand(1024, 1024, 1024).cpu()] + self._test_multiprocess(TestRpc._test_sync_call_for_cpu, inputs) + + @skipIfUnsupportMultiNPU(2) + def test_async_call_for_npu(self): + inputs = [torch.rand(1024, 1024), torch.rand(1024, 1024, 1024)] + self._test_multiprocess(TestRpc._test_async_call_for_npu, inputs) + + @skipIfUnsupportMultiNPU(2) + def test_sync_call_for_npu(self): + inputs = [torch.rand(1024, 1024), torch.rand(1024, 1024, 1024)] + self._test_multiprocess(TestRpc._test_sync_call_for_npu, inputs) + + @skipIfUnsupportMultiNPU(2) + def test_remote_rref(self): + inputs = [torch.rand(1024, 1024), torch.rand(1024, 1024, 1024), 1, 2, 3] + self._test_multiprocess(TestRpc._test_remote_rref, inputs) + + @skipIfUnsupportMultiNPU(2) + def test_async_execution(self): + inputs = [torch.rand(1024, 1024), torch.rand(1024, 1024, 1024), 1, 1] + self._test_multiprocess(TestRpc._test_async_execution, inputs) + + @skipIfUnsupportMultiNPU(2) + def test_get_worker_info(self): + self._test_multiprocess(TestRpc._test_get_worker_info, []) + + +if __name__ == '__main__': + run_tests()