사용자 도구

사이트 도구


all_reduce

All Reduce

Simple Ring Reduce Example

simple_ring_reduce_example.py
import numpy as np
from IPython import embed
 
 
world_size = 4
dims = 4
 
 
def gen_grad(rank):
    return np.random.random(dims)
 
 
class Host:
    def __init__(self, rank, data):
        self.data = data
 
        self.rank = rank
        self.accum = np.zeros(dims)
        self.accum[:] = self.data[:]
        # self.accum = np.zeros_like(self.data)
        self.left, self.right = None, None
        self.buff = None
 
    def send(self, i):
        self.right.buff = self.data[:]
 
    def recv(self):
        self.accum += self.buff
        self.data = self.buff
 
    def reduce(self, op):
        self.data = self.accum / world_size
 
 
if __name__ == '__main__':
 
    # 데이터 생성
    data = np.array([gen_grad(rank) for rank in range(world_size)])
    # data = np.array([
    #     [1., 1., 1., 1.], 
    #     [2., 2., 2., 2.],
    #     [3., 3., 3., 3.],
    #     [4., 4., 4., 4.],
    # ])
 
    # 정답
    mean = np.mean(data, axis=0)
 
    # Host 생성
    hosts = [Host(rank, data[:]) for rank, data in enumerate(data)]
 
    # 링 연결
    for host in hosts:
        host.left = hosts[((host.rank - 1) + world_size) % world_size]
        host.right = hosts[(host.rank + 1) % world_size]
 
    for i in range(world_size - 1):
        # chunk 전달
        for host in hosts:
            host.send(i)
 
        # chunk 수신
        for host in hosts:
            host.recv()
 
    # # reduce
    for host in hosts:
        host.reduce('mean')
 
    print(hosts[0].data)
    print(np.allclose(hosts[0].data, hosts[1].data))
    print(np.allclose(hosts[0].data, mean))
    embed()
 

Ring Reduce (Chunk) Example

ring_reduce_chunk_example.py
 
import numpy as np
from IPython import embed
from queue import Queue
 
 
world_size = 4
dims = 4
 
 
def gen_grad(rank):
    return np.random.random(dims)
 
 
class Host:
    def __init__(self, rank, data):
        self.data = data
        self.cs = dims // world_size
 
        self.rank = rank
        self.accum = np.zeros(dims)
 
        j = (self.rank + 1) % world_size
        start = self.cs * j
        end = self.cs * (j + 1)
        self.accum[start: end] = self.data[start: end]
 
        self.left, self.right = None, None
        self.buff = None
 
    def send_accum(self, i):
        j = (self.rank - i) % world_size
        start = self.cs * j
        end = self.cs * (j + 1)
        self.right.buff = self.accum[start: end] + self.data[start: end]
 
    def recv_accum(self, i):
        j = (self.rank - i - 1) % world_size
        start = self.cs * j
        end = self.cs * (j + 1)
        self.accum[start: end] += self.buff
 
    def send_copy(self, i):
        j = (self.rank - i + 1) % world_size
        start = self.cs * j
        end = self.cs * (j + 1)
        self.right.buff = self.accum[start: end]
 
    def recv_copy(self, i):
        j = (self.rank - i) % world_size
        start = self.cs * j
        end = self.cs * (j + 1)
        self.accum[start: end] = self.buff
 
    def reduce(self):
        self.data = self.accum / world_size
 
 
if __name__ == '__main__':
 
    # 데이터 생성
    data = np.array([gen_grad(rank) for rank in range(world_size)])
    data = np.array([
        [1.1, 1.2, 1.3, 1.4], 
        [2.01, 2.02, 2.03, 2.04],
        [3.001, 3.002, 3.003, 3.004],
        [4.0001, 4.0002, 4.0003, 4.0004],
    ])
 
    # 정답
    mean = np.mean(data, axis=0)
 
    # Host 생성
    hosts = [Host(rank, data[:]) for rank, data in enumerate(data)]
 
    # 링 연결
    for host in hosts:
        host.left = hosts[((host.rank - 1) + world_size) % world_size]
        host.right = hosts[(host.rank + 1) % world_size]
 
    # accum chunk
    for i in range(world_size - 1):
        # chunk 전달
        for host in hosts:
            host.send_accum(i)
 
        # chunk 수신
        for host in hosts:
            host.recv_accum(i)
 
    # copy chunk
    for i in range(world_size - 1):
        # chunk 전달
        for host in hosts:
            host.send_copy(i)
 
        # chunk 수신
        for host in hosts:
            host.recv_copy(i)
 
    # # reduce
    for host in hosts:
        host.reduce()
 
    print(hosts[0].data)
    print(np.allclose(hosts[0].data, hosts[1].data))
    print(np.allclose(hosts[0].data, mean))
    embed()
 

참고

all_reduce.txt · 마지막으로 수정됨: 2024/03/23 02:38 저자 127.0.0.1