import numpy as np
from IPython import embed
from queue import Queue
world_size = 4
dims = 4
def gen_grad(rank):
return np.random.random(dims)
class Host:
def __init__(self, rank, data):
self.data = data
self.cs = dims // world_size
self.rank = rank
self.accum = np.zeros(dims)
j = (self.rank + 1) % world_size
start = self.cs * j
end = self.cs * (j + 1)
self.accum[start: end] = self.data[start: end]
self.left, self.right = None, None
self.buff = None
def send_accum(self, i):
j = (self.rank - i) % world_size
start = self.cs * j
end = self.cs * (j + 1)
self.right.buff = self.accum[start: end] + self.data[start: end]
def recv_accum(self, i):
j = (self.rank - i - 1) % world_size
start = self.cs * j
end = self.cs * (j + 1)
self.accum[start: end] += self.buff
def send_copy(self, i):
j = (self.rank - i + 1) % world_size
start = self.cs * j
end = self.cs * (j + 1)
self.right.buff = self.accum[start: end]
def recv_copy(self, i):
j = (self.rank - i) % world_size
start = self.cs * j
end = self.cs * (j + 1)
self.accum[start: end] = self.buff
def reduce(self):
self.data = self.accum / world_size
if __name__ == '__main__':
# 데이터 생성
data = np.array([gen_grad(rank) for rank in range(world_size)])
data = np.array([
[1.1, 1.2, 1.3, 1.4],
[2.01, 2.02, 2.03, 2.04],
[3.001, 3.002, 3.003, 3.004],
[4.0001, 4.0002, 4.0003, 4.0004],
])
# 정답
mean = np.mean(data, axis=0)
# Host 생성
hosts = [Host(rank, data[:]) for rank, data in enumerate(data)]
# 링 연결
for host in hosts:
host.left = hosts[((host.rank - 1) + world_size) % world_size]
host.right = hosts[(host.rank + 1) % world_size]
# accum chunk
for i in range(world_size - 1):
# chunk 전달
for host in hosts:
host.send_accum(i)
# chunk 수신
for host in hosts:
host.recv_accum(i)
# copy chunk
for i in range(world_size - 1):
# chunk 전달
for host in hosts:
host.send_copy(i)
# chunk 수신
for host in hosts:
host.recv_copy(i)
# # reduce
for host in hosts:
host.reduce()
print(hosts[0].data)
print(np.allclose(hosts[0].data, hosts[1].data))
print(np.allclose(hosts[0].data, mean))
embed()