====== Example: GPT ======
* 참고
* https://github.com/karpathy/minGPT
* https://github.com/huggingface/transformers/blob/master/src/transformers/modeling_gpt2.py
* https://github.com/openai/finetune-transformer-lm
* https://github.com/graykode/gpt-2-Pytorch
* https://github.com/Andras7/gpt2-pytorch
===== V2 =====
import argparse
import math
import numpy as np
import plotille
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
from gr.pygr import mlab
from IPython import embed
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--dropout', type=float, default=0.1)
parser.add_argument('--lr', type=float, default=0.0001)
parser.add_argument('--max_epoch', type=int, default=200)
parser.add_argument('--batch_size', type=int, default=128)
parser.add_argument('--data_repeat', type=int, default=1)
parser.add_argument('--device', type=str, default='cuda')
parser.add_argument('--block_size', type=int, default=32)
parser.add_argument('--test_steps', type=int, default=512)
parser.add_argument('--n_workers', type=int, default=1)
parser.add_argument('--weight_decay', type=float, default=0.1)
parser.add_argument('--noise_scale', type=float, default=0.1)
parser.add_argument('--max_grad_norm', type=float, default=1.0)
parser.add_argument('--dataset', choices=['BasicDataset', 'MotionDataset'], default='MotionDataset')
return parser.parse_args()
args = parse_args()
class CausalSelfAttention(nn.Module):
"""
https://github.com/karpathy/minGPT/blob/master/mingpt/model.py
"""
def __init__(self, d_model, n_head, block_size, dropout):
super().__init__()
assert d_model % n_head == 0
# key, query, value projections for all heads
self.key = nn.Linear(d_model, d_model)
self.query = nn.Linear(d_model, d_model)
self.value = nn.Linear(d_model, d_model)
# regularization
self.attn_drop = nn.Dropout(dropout)
self.resid_drop = nn.Dropout(dropout)
# output projection
self.proj = nn.Linear(d_model, d_model)
# causal mask to ensure that attention is only applied to the left in the input sequence
self.register_buffer(
"mask",
torch.tril(torch.ones(block_size, block_size)).view(1, 1, block_size, block_size)
)
self.n_head = n_head
def forward(self, x, layer_past=None):
B, T, C = x.size()
# calculate query, key, values for all heads in batch and move head forward to be the batch dim
k = self.key(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
q = self.query(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
v = self.value(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
# causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = att.masked_fill(self.mask[:,:,:T,:T] == 0, -1e10) # todo: just use float('-inf') instead?
att = F.softmax(att, dim=-1)
att = self.attn_drop(att)
y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side
# output projection
y = self.resid_drop(self.proj(y))
return y
class Block(nn.Module):
""" an unassuming Transformer block """
def __init__(self, d_model, n_head, block_size, dropout):
super().__init__()
self.ln1 = nn.LayerNorm(d_model)
self.ln2 = nn.LayerNorm(d_model)
self.attn = CausalSelfAttention(d_model, n_head, block_size, dropout)
self.mlp = nn.Sequential(
nn.Linear(d_model, 4 * d_model),
nn.GELU(),
nn.Linear(4 * d_model, d_model),
nn.Dropout(dropout),
)
def forward(self, x):
x = x + self.attn(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x
class GPTModel(nn.Module):
def __init__(self, input_dims, output_dims, block_size):
super().__init__()
self.n_layers = 6
self.n_heads = 8
self.d_model = 512
self.block_size = block_size
self.we = nn.Linear(input_dims, self.d_model, bias=True)
self.wp = nn.Parameter(torch.zeros(1, self.block_size, self.d_model))
self.blocks = nn.Sequential(*[
Block(self.d_model, self.n_heads, self.block_size, args.dropout)
for _ in range(self.n_layers)
])
self.norm = nn.LayerNorm(self.d_model)
self.wd = nn.Linear(self.d_model, output_dims, bias=True)
self.apply(self._init_weights)
print(f'n_params: {sum(p.numel() for p in self.parameters())}')
def _init_weights(self, module):
if isinstance(module, (nn.Linear, nn.Embedding)):
module.weight.data.normal_(mean=0.0, std=0.02)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def forward(self, src):
B, T, C = src.size()
src_embed = self.we(src)
pos_embed = self.wp[:, :T, :]
hx = src_embed + pos_embed
hx = self.blocks(hx)
hx = self.norm(hx)
out = self.wd(self.norm(hx))
src = torch.cat([src[:, 1:, :], out[:, -1:, :]], dim=1).detach()
return out, src
class BasicDataset(Dataset):
def __init__(self, block_size, repeat, noise_scale):
self.block_size = block_size
self.data = np.sin(np.arange(10240) / 10.)
# self.data = np.sin(np.arange(10240) / 10.) * 0.5 + 2.5
# self.data = np.abs(np.sin(np.arange(10240) / 10.))
# data = np.sin(np.arange(10240) / 10.) * (np.sin(np.arange(10240) / 10.) > 0.0)
self.data = self.data.astype(np.float32)
self.data = self.data.reshape(-1, 1)
self.data_std = self.data.std(0)
self.repeat = repeat
self.noise_scale = noise_scale
def __len__(self):
# return math.ceil(len(self.data) / (self.block_size + 1))
return len(self.data) * self.repeat
def __getitem__(self, idx):
# we're actually going to "cheat" and pick a spot in the dataset at random
i = np.random.randint(0, len(self.data) - (self.block_size + 1))
chunk = self.data[i: i+self.block_size+1]
chunk += np.random.normal(0, args.noise_scale, chunk.shape) * self.data_std
x = torch.tensor(chunk[:-1], dtype=torch.float32)
y = torch.tensor(chunk[1:], dtype=torch.float32)
return x, y
def get_test_data(self, test_steps, device):
i = np.random.randint(0, len(self.data) - (test_steps + 1))
idx = np.arange(i, i+test_steps)
data = self.data[idx].reshape(1, -1, 1)
tgt = torch.tensor(data, device=device)
src = tgt[:, :args.block_size]
gen = tgt[:, :args.block_size]
return tgt, src, gen
class MotionDataset(Dataset):
def __init__(self, block_size, repeat, noise_scale):
self.block_size = block_size
import urllib, json
url = "https://raw.githubusercontent.com/xbpeng/DeepMimic/master/data/motions/humanoid3d_backflip.txt"
self.data = json.loads(urllib.request.urlopen(url).read())['Frames']
self.data = np.array(self.data, dtype=np.float32)
self.data = np.hstack([self.data[:, 3:4], self.data])
self.data = np.tile(self.data, (100, 1))
self.dims = self.data.shape[-1]
self.data_mean = self.data.mean(0, keepdims=True)
self.data_std = self.data.std(0, keepdims=True)
self.data = (self.data - self.data_mean) / self.data_std
self.data = self.data.astype(np.float32)
self.repeat = repeat
self.noise_scale = noise_scale
def __len__(self):
# return math.ceil(len(self.data) / (self.block_size + 1))
return len(self.data) * self.repeat
def __getitem__(self, idx):
# we're actually going to "cheat" and pick a spot in the dataset at random
i = np.random.randint(0, len(self.data) - (self.block_size + 1))
chunk = self.data[i: i+self.block_size+1]
chunk += np.random.normal(0, args.noise_scale, chunk.shape)
x = torch.tensor(chunk[:-1], dtype=torch.float32)
y = torch.tensor(chunk[1:], dtype=torch.float32)
return x, y
def get_test_data(self, test_steps, device):
i = np.random.randint(0, len(self.data) - (test_steps + 1))
idx = np.arange(i, i+test_steps)
data = self.data[idx].reshape(1, -1, self.dims)
tgt = torch.tensor(data, device=device)
src = tgt[:, :args.block_size]
gen = tgt[:, :args.block_size]
return tgt, src, gen
if __name__ == '__main__':
# create the dataloader
Dataset = globals()[args.dataset]
dataset = Dataset(args.block_size, args.data_repeat, args.noise_scale)
loader = DataLoader(dataset, batch_size=args.batch_size, num_workers=args.n_workers)
# create the model
dim = dataset.data.shape[-1]
model = GPTModel(dim, dim, args.block_size).to(args.device)
# create the optimizer
no_decay = ["bias", "LayerNorm.weight"]
params_decay = [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)]
params_nodecay = [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)]
optim_groups = [
{"params": params_decay, "weight_decay": args.weight_decay},
{"params": params_nodecay, "weight_decay": 0.0},
]
optimizer = optim.AdamW(optim_groups, lr=args.lr, betas=(0.9, 0.95))
def warmup_cosine(optimizer, lr_max, epoch, warmup=1.0):
s = float(epoch <= warmup)
w = s*(epoch / warmup) + (1-s)*(0.5 * (1 + np.cos(np.pi * epoch)))
for param_group in optimizer.param_groups:
param_group['lr'] = w * lr_max
step = 0
train_loss_list = list()
test_score_list = list()
for epoch in tqdm.trange(args.max_epoch):
# fitting
model.train()
for i, (src, tgt) in tqdm.tqdm(enumerate(loader), total=len(loader), leave=False):
src, tgt = src.to(args.device), tgt.to(args.device)
gen, _ = model(src)
optimizer.zero_grad()
loss = (0.5 * (tgt - gen) ** 2).mean()
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), args.max_grad_norm)
optimizer.step()
warmup_cosine(optimizer, args.lr, epoch + i / len(loader))
step += 1 / len(loader)
train_loss_list.append((step, loss.item()))
tqdm.tqdm.write(plotille.scatter(*zip(*train_loss_list[-1000:]), height=25))
# eval
model.eval()
tgt, src, gen = dataset.get_test_data(args.test_steps, args.device)
with torch.no_grad():
for i in range(args.test_steps - args.block_size):
gen_, src = model(src)
gen = torch.cat([gen, gen_[:, -1:, :]], dim=1)
loss = (0.5 * (tgt - gen) ** 2).mean()
score = (-loss).exp()
test_score_list.append((step, score.item()))
mlab.plot(tgt.cpu().numpy()[0, :, 0])
mlab.oplot(gen.cpu().numpy()[0, :, 0])
tqdm.tqdm.write(plotille.scatter(*zip(*test_score_list[-1000:]), height=25))
tqdm.tqdm.write(str(args))
embed()
===== V1 =====
import numpy as np
import plotille
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
from gr.pygr import mlab
from IPython import embed
from traitlets.config.loader import ArgumentParser
def parse_args():
parser = ArgumentParser()
parser.add_argument('--custom_mha', type=lambda x: x in ('1', 'true'), default=False)
parser.add_argument('--custom_block', type=lambda x: x in ('1', 'true'), default=True)
parser.add_argument('--dropout', type=float, default=0.1)
parser.add_argument('--lr', type=float, default=0.00025)
return parser.parse_args()
args = parse_args()
class MultiheadAttention(nn.Module):
def __init__(self, key_dim, num_heads, drop=0.1):
super().__init__()
self.scale = np.power(key_dim, 0.5)
self.n_heads = num_heads
self.dropout = nn.Dropout(drop)
def forward(self, q, k, v, attn_mask):
q = self.split_heads(q)
k = self.split_heads(k, key=True)
v = self.split_heads(v)
w = torch.matmul(q, k)
w = w / self.scale
w.masked_fill_(attn_mask, -np.inf)
attn = F.softmax(w, dim=-1)
attn = self.dropout(attn)
context = torch.matmul(attn, v)
context = self.merge_heads(context)
return context, attn
def split_heads(self, x, key=False):
seq, bs, emb = x.size()
d_k = emb // self.n_heads
x = x.view(seq, bs, self.n_heads, d_k)
if key:
# bs, self.n_heads, d_k, seq
x = x.permute(1, 2, 3, 0)
else:
# bs, self.n_heads, seq, d_k
x = x.permute(1, 2, 0, 3)
return x
def merge_heads(self, x):
bs, heads, seq, d_k = x.size()
x = x.permute(2, 0, 1, 3)
x = x.reshape(seq, bs, self.n_heads * d_k)
return x
class MHA(nn.Module):
def __init__(self, embed_dim, num_heads, dropout):
super().__init__()
self.n_heads = num_heads
self.qkv = nn.Linear(embed_dim, 3 * embed_dim, bias=False)
if args.custom_mha:
self.attn = MultiheadAttention(embed_dim, num_heads)
else:
self.attn = nn.MultiheadAttention(embed_dim, num_heads, dropout)
self.out = nn.Linear(embed_dim, embed_dim)
layers = (self.qkv, self.out)
for layer in layers:
torch.nn.init.xavier_uniform_(layer.weight)
self.out.bias.data.zero_()
def forward(self, x, mask):
seq, bsz, emb = x.size()
q, k, v = self.qkv(x).split(emb, dim=2)
context, weight = self.attn(q, k, v, attn_mask=mask)
return self.out(context)
class MLP(nn.Module):
def __init__(self, embed_dim, factor=4):
super(MLP, self).__init__()
self.fc = nn.Linear(embed_dim, embed_dim * factor)
self.fc2 = nn.Linear(embed_dim * factor, embed_dim)
torch.nn.init.normal_(self.fc.weight, std=0.02)
torch.nn.init.uniform_(self.fc.bias, -0.001, 0.001)
torch.nn.init.normal_(self.fc2.weight, std=0.02)
torch.nn.init.uniform_(self.fc2.bias, -0.001, 0.001)
def forward(self, x):
x = self.fc(x)
x = F.gelu(x)
x = self.fc2(x)
return x
class CustomBlock(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
self.ln_1 = nn.LayerNorm(embed_dim)
self.attn = MHA(embed_dim, num_heads, dropout)
self.ln_2 = nn.LayerNorm(embed_dim)
self.mlp = MLP(embed_dim)
def forward(self, x, src_mask=None):
x = x + self.attn(self.ln_1(x), src_mask)
x = x + self.mlp(self.ln_2(x))
return x
class Block(nn.TransformerEncoderLayer):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
super().__init__(d_model, nhead, dim_feedforward, dropout)
self.activation = F.gelu
def forward(self, src, src_mask=None, src_key_padding_mask=None):
# MHA
x = self.norm1(src)
x = self.self_attn(x, x, x, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
src = src + self.dropout1(x)
# MLP
x = self.linear2(self.dropout(self.activation(self.linear1(self.norm2(src)))))
src = src + self.dropout2(x)
return src
class GPTModel(nn.Module):
def __init__(self, input_dims, output_dims, max_len):
super().__init__()
self.n_layers = 3
self.n_heads = 16
self.d_model = 512
self.max_len = max_len
self.we = nn.Linear(input_dims, self.d_model, bias=False)
self.wp = nn.Embedding(self.max_len, self.d_model, padding_idx=0)
if args.custom_block:
self.blocks = nn.ModuleList([
CustomBlock(self.d_model, self.n_heads, dropout=args.dropout) for _ in range(self.n_layers)
])
else:
self.blocks = nn.ModuleList([
Block(self.d_model, self.n_heads, dropout=args.dropout) for _ in range(self.n_layers)
])
self.norm = nn.LayerNorm(self.d_model)
self.wd = nn.Linear(self.d_model, output_dims, bias=False)
torch.nn.init.normal_(self.we.weight, std=0.02)
torch.nn.init.uniform_(self.wp.weight, -0.01, 0.01)
torch.nn.init.normal_(self.wd.weight, std=0.02)
def forward(self, src):
src_embed = self.we(src)
pos_idx = torch.arange(len(src), device=src.device)
pos_embed = self.wp(pos_idx).unsqueeze(1)
hx = src_embed + pos_embed
src_mask = self.generate_src_mask(src.size(0), src.device)
for block in self.blocks:
hx = block(hx, src_mask=src_mask)
hx = self.norm(hx)
out = self.wd(self.norm(hx))
src = torch.cat([src[1:], out[-1:]], dim=0).detach()
return out, src
@staticmethod
def generate_src_mask(size, device):
mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
mask = mask.float().to(device)
mask = mask.masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
if __name__ == '__main__':
n_epochs = 2500
prev_steps = 32
next_steps = 2
test_steps = 512
bsz = 32 # 8 # 4 # 128
device = 'cuda'
dataset = np.sin(np.arange(10240) / 10.) * 0.5 + 2.5
model = GPTModel(1, 1, prev_steps + next_steps).to(device)
optimizer = optim.Adam(model.parameters(), lr=args.lr, betas=(0.9, 0.95), eps=1e-8)
# scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
def warmup_cosine(optimizer, lr_max, epoch, warmup=1.0):
s = float(epoch <= warmup)
w = s*(epoch / warmup) + (1-s)*(0.5 * (1 + np.cos(np.pi * epoch)))
for param_group in optimizer.param_groups:
param_group['lr'] = w * lr_max
step = 0
train_loss_list = list()
test_loss_list = list()
for epoch in tqdm.trange(n_epochs):
# make batch id
bid = np.arange(len(dataset)-(prev_steps + next_steps))
np.random.shuffle(bid)
bid = bid[:len(bid) // bsz * bsz]
bid = bid.reshape((len(bid) // bsz, 1, bsz))
pos = np.arange(prev_steps + next_steps).reshape(1, -1, 1)
idxes = bid + pos # mini-batch x seq x data-index
# fitting
for i, idx in enumerate(tqdm.tqdm(idxes, leave=False)):
data = dataset[idx].reshape((prev_steps + next_steps, bsz, 1))
tgt = torch.tensor(
data + np.random.normal(0, 0.5, data.shape), # data + noise
dtype=torch.float32, device=device
)
gen, _ = model(tgt)
optimizer.zero_grad()
loss = (0.5 * (tgt[1:] - gen[:-1]) ** 2).mean()
loss.backward()
optimizer.step()
# scheduler.step(epoch + i / len(idxes))
warmup_cosine(optimizer, args.lr, epoch + i / len(idxes))
step += 1 / len(idxes)
train_loss_list.append((step, loss.item()))
# eval
idx = np.random.randint(0, len(dataset)-(prev_steps + test_steps), 1).reshape(-1, 1)
idx = idx + np.arange(prev_steps + test_steps).reshape(-1, 1)
data = dataset[idx].reshape(prev_steps + test_steps, 1, 1)
tgt = torch.tensor(
data + np.random.normal(0, 0.5, data.shape),
dtype=torch.float32, device=device
)
src = tgt[:prev_steps]
gen = tgt[:prev_steps]
with torch.no_grad():
for _ in range(test_steps):
gen_, src = model(src)
gen = torch.cat([gen, gen_[-1:]], dim=0)
mlab.plot(data.reshape(-1))
mlab.oplot(gen.squeeze_().cpu().numpy())
loss = (0.5 * (data.reshape(-1) - gen.squeeze_().cpu().numpy()) ** 2).mean()
test_loss_list.append((step, loss.item()))
tqdm.tqdm.write(plotille.scatter(*zip(*train_loss_list[-1000:]), height=25))
tqdm.tqdm.write(plotille.scatter(*zip(*test_loss_list[-1000:]), height=25))
tqdm.tqdm.write(str(args))
embed()
{{tag>GPT example}}