import numpy as np from IPython import embed from queue import Queue world_size = 4 dims = 4 def gen_grad(rank): return np.random.random(dims) class Host: def __init__(self, rank, data): self.data = data self.cs = dims // world_size self.rank = rank self.accum = np.zeros(dims) j = (self.rank + 1) % world_size start = self.cs * j end = self.cs * (j + 1) self.accum[start: end] = self.data[start: end] self.left, self.right = None, None self.buff = None def send_accum(self, i): j = (self.rank - i) % world_size start = self.cs * j end = self.cs * (j + 1) self.right.buff = self.accum[start: end] + self.data[start: end] def recv_accum(self, i): j = (self.rank - i - 1) % world_size start = self.cs * j end = self.cs * (j + 1) self.accum[start: end] += self.buff def send_copy(self, i): j = (self.rank - i + 1) % world_size start = self.cs * j end = self.cs * (j + 1) self.right.buff = self.accum[start: end] def recv_copy(self, i): j = (self.rank - i) % world_size start = self.cs * j end = self.cs * (j + 1) self.accum[start: end] = self.buff def reduce(self): self.data = self.accum / world_size if __name__ == '__main__': # 데이터 생성 data = np.array([gen_grad(rank) for rank in range(world_size)]) data = np.array([ [1.1, 1.2, 1.3, 1.4], [2.01, 2.02, 2.03, 2.04], [3.001, 3.002, 3.003, 3.004], [4.0001, 4.0002, 4.0003, 4.0004], ]) # 정답 mean = np.mean(data, axis=0) # Host 생성 hosts = [Host(rank, data[:]) for rank, data in enumerate(data)] # 링 연결 for host in hosts: host.left = hosts[((host.rank - 1) + world_size) % world_size] host.right = hosts[(host.rank + 1) % world_size] # accum chunk for i in range(world_size - 1): # chunk 전달 for host in hosts: host.send_accum(i) # chunk 수신 for host in hosts: host.recv_accum(i) # copy chunk for i in range(world_size - 1): # chunk 전달 for host in hosts: host.send_copy(i) # chunk 수신 for host in hosts: host.recv_copy(i) # # reduce for host in hosts: host.reduce() print(hosts[0].data) print(np.allclose(hosts[0].data, hosts[1].data)) print(np.allclose(hosts[0].data, mean)) embed()