all_reduce
All Reduce
Simple Ring Reduce Example
- simple_ring_reduce_example.py
import numpy as np from IPython import embed world_size = 4 dims = 4 def gen_grad(rank): return np.random.random(dims) class Host: def __init__(self, rank, data): self.data = data self.rank = rank self.accum = np.zeros(dims) self.accum[:] = self.data[:] # self.accum = np.zeros_like(self.data) self.left, self.right = None, None self.buff = None def send(self, i): self.right.buff = self.data[:] def recv(self): self.accum += self.buff self.data = self.buff def reduce(self, op): self.data = self.accum / world_size if __name__ == '__main__': # 데이터 생성 data = np.array([gen_grad(rank) for rank in range(world_size)]) # data = np.array([ # [1., 1., 1., 1.], # [2., 2., 2., 2.], # [3., 3., 3., 3.], # [4., 4., 4., 4.], # ]) # 정답 mean = np.mean(data, axis=0) # Host 생성 hosts = [Host(rank, data[:]) for rank, data in enumerate(data)] # 링 연결 for host in hosts: host.left = hosts[((host.rank - 1) + world_size) % world_size] host.right = hosts[(host.rank + 1) % world_size] for i in range(world_size - 1): # chunk 전달 for host in hosts: host.send(i) # chunk 수신 for host in hosts: host.recv() # # reduce for host in hosts: host.reduce('mean') print(hosts[0].data) print(np.allclose(hosts[0].data, hosts[1].data)) print(np.allclose(hosts[0].data, mean)) embed()
Ring Reduce (Chunk) Example
- ring_reduce_chunk_example.py
import numpy as np from IPython import embed from queue import Queue world_size = 4 dims = 4 def gen_grad(rank): return np.random.random(dims) class Host: def __init__(self, rank, data): self.data = data self.cs = dims // world_size self.rank = rank self.accum = np.zeros(dims) j = (self.rank + 1) % world_size start = self.cs * j end = self.cs * (j + 1) self.accum[start: end] = self.data[start: end] self.left, self.right = None, None self.buff = None def send_accum(self, i): j = (self.rank - i) % world_size start = self.cs * j end = self.cs * (j + 1) self.right.buff = self.accum[start: end] + self.data[start: end] def recv_accum(self, i): j = (self.rank - i - 1) % world_size start = self.cs * j end = self.cs * (j + 1) self.accum[start: end] += self.buff def send_copy(self, i): j = (self.rank - i + 1) % world_size start = self.cs * j end = self.cs * (j + 1) self.right.buff = self.accum[start: end] def recv_copy(self, i): j = (self.rank - i) % world_size start = self.cs * j end = self.cs * (j + 1) self.accum[start: end] = self.buff def reduce(self): self.data = self.accum / world_size if __name__ == '__main__': # 데이터 생성 data = np.array([gen_grad(rank) for rank in range(world_size)]) data = np.array([ [1.1, 1.2, 1.3, 1.4], [2.01, 2.02, 2.03, 2.04], [3.001, 3.002, 3.003, 3.004], [4.0001, 4.0002, 4.0003, 4.0004], ]) # 정답 mean = np.mean(data, axis=0) # Host 생성 hosts = [Host(rank, data[:]) for rank, data in enumerate(data)] # 링 연결 for host in hosts: host.left = hosts[((host.rank - 1) + world_size) % world_size] host.right = hosts[(host.rank + 1) % world_size] # accum chunk for i in range(world_size - 1): # chunk 전달 for host in hosts: host.send_accum(i) # chunk 수신 for host in hosts: host.recv_accum(i) # copy chunk for i in range(world_size - 1): # chunk 전달 for host in hosts: host.send_copy(i) # chunk 수신 for host in hosts: host.recv_copy(i) # # reduce for host in hosts: host.reduce() print(hosts[0].data) print(np.allclose(hosts[0].data, hosts[1].data)) print(np.allclose(hosts[0].data, mean)) embed()
참고
all_reduce.txt · 마지막으로 수정됨: 2024/03/23 02:38 저자 127.0.0.1