!wget https://www.perseus.tufts.edu/hopper/dltext?doc=Perseus%3Atext%3A1999.02.0008 -O atticus.xml
Decoder-only Transformers: Generative Pre-trained Transformers (GPTs)#
With the release of ChatGPT by OpenAI in the autumn of 2022, many began to flock to “AI” treating it like magic. Today, we will investigate the modeling technqiues at the core of this technology, the decoder-only transformer.
The original transformer, as preposed by Vaswami et al., contained two parts: the encoder and the decoder. This architecture is still used for transformer-based machine translation, but researchers have also split up these two different parts and found they have useful features by themselves.
In a future lesson, we will take a close look at the encoder and how it is suited for representing the semantic meaning with word vectors. Today, though, we will explore the decoder and what it is capable of.
Learning objectives:
Understand how to inference GPTs and how they are trained in order that that inferencing is possible.
Examine the internal states of models including attention heads and MLPs. Learn more about activations and how the model works in practice.
Connect sentence transformer (encoder-only) work to how GPTs work (decoder-only).
Access and utilize the free GPU resources on Colab, and learn why we need GPUs in order to do this work.
Note on terminology: Unfortunately, there are many overlapping terms in this field. For example, a “GPT” is just a “Decoder-only transformer” that has been trained on a huge amount of data. (We’ll talk about “pretraining” vs. training soon.) This confusion becomes all the more manifest when talking about “Artificial Intelligence” and “Large Language Models”. This terminology is problematic and a serious deteriment to the field. I will attempt to be as consistent as possible with my use of terms.
Parts of the Decoder-only transformer#
The decoder-only transformer is made up of several parts (see the schematic below):
Embeddings: as with all of the language modeling techniques we have seen in these notebooks, the transformer relies on embeddings to internally represent token meaning. In this case we have two different types:
Input embedding: This is the embedding for the new token entering the model. The embedding of the next predicted token becomes the next input to the model, as in RNNs.
Output embeddings: This is the embedding for the next token entering the model.
Positional Encodings: These are added to the input embeddings to give the model information about the position of each token in the sequence. Like the token embeddings, this is just an embedding layer that learns what areas of the
block_sizeare more important based on the tokens.Masked Multi-Head Attention: We got acquainted with attention in our exploration of machine translation, where we used it to move between our encoder and decoder. For transformer attention, we will model attention as learnable parameters by our model and do away with modeling weights on our embeddings directly. This is were the title for Vaswami et al.’s paper “Attention is all you need” comes from. In addition, we will also have a “causal” mask, where the model will learn to predict the next word in a sequence.
Normalization: These layers make sure that all of the data passing through the network is regularized and well behaved, not causing any gradients that would disrupt the model.
Feed forward: This layer allows the model to process the information from the attention layer through non-linear transformations, increasing the model’s capacity to learn complex patterns
Last linear layer: This last linear layer allows the model to make its predictions for the next token in the sequence.
Softmax: As we have seen since word2vec, this function transforms the logits of a linear layer into a probability distribution from which we can sample from and get the index of the predicted next token.
It is worth noting that a “Block” is made up of the masked mulit-head attention, the normalization layers and the feed forward layer. This Block can be repeated many times before a prediction is actually made. In fact, the only difference between smaller and larger models often comes down to how many repetitions of these blocks there are.
Last note: This notebook is heavily inspired by Andrej Karpathy’s fabulous Let’s build GPT: from scratch, in code, spelled out. In fact, it’s mostly the same, besides these textual additions for explanation and what data we use. I would highly recommend that you also watch this video. Karpathy does a wondeful job explaining these concepts with code and is treasure to the deep learning world.
Data#
Let’s start off by preparing our data. As we have seen, this process does not depend on a certain language, so I will be using Cicero’s Letters to Atticus from Perseus.
Unlike more common GPT implementations, I will not be tokenizing the text in a standard method. Instead, each of our tokens will be character or letter. This makes tokenization easier as we are not talking about tokenization, the details of which could take up an entire course, in this notebook. This, however, will serious hold back our performance. At the end of the notebook, I incorporate a standard tokenizer into model training.
# extracting text from XML
from bs4 import BeautifulSoup
import re
soup = BeautifulSoup(open("atticus.xml", "r").read(), features="xml")
letters = []
for d in soup.find_all("div2"):
dateline = d.dateline.extract().get_text().strip()
salute = d.salute.extract().get_text().strip()
text = re.sub(r"\s+", " ", d.get_text().strip().replace("\n", ""))
letters.append(dateline + "\n" + salute + "\n" + text)
text = "\n\n".join(letters)
print(len(text))
print(text[:1000])
# "tokenization": getting each character for simplicity
chars = sorted(list(set(text)))
vocab_size = len(chars)
print("".join(chars))
print(vocab_size)
# these are data structures that we can use to easily move between the integer representation of the text and the character representation
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: "".join([itos[i] for i in l])
print(encode("salve mundus"))
print(decode(encode("salve mundus")))
import torch
data = torch.tensor(
encode(text), dtype=torch.long
) # turning our encoded data into a tensor
print(data.shape, data.dtype)
print(data[:1000])
# reserving 10% of the data for validation
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]
block_size = 8 # small block size to get started
train_data[: block_size + 1] # first block_size chunk
Masked Language Modeling task: Our goal for this model is to have it predict the next token given all of the tokens in our sequence thus far, as we have seen in other models. Below is how we would set this up for training, also called collation.
x = train_data[:block_size]
y = train_data[1 : block_size + 1]
for t in range(block_size):
context = x[: t + 1]
target = y[t]
print(f"when input token(s) is/are {context} the target: {target}")
print(
f"when input character(s) is/are *{decode([c.item() for c in context])}* the target: *{decode([target.item()])}*"
)
print()
print(itos[1])
# putting it all together
torch.manual_seed(1337) # seed for reproducibility
batch_size = 4 # how many starting ids we get initially
block_size = 8 # size of context as before
def get_batch(split):
# generate a small batch of data of inputs x and targets y
data = train_data if split == "train" else val_data # choosing the right data split
ix = torch.randint(
len(data) - block_size, (batch_size,)
) # get a random batch of ids
x = torch.stack(
[data[i : i + block_size] for i in ix]
) # create contexts for each id
y = torch.stack(
[data[i + 1 : i + block_size + 1] for i in ix]
) # create the targets for each context
return x, y
xb, yb = get_batch("train")
print("inputs:")
print(xb.shape)
print(xb)
print("targets:")
print(yb.shape)
print(yb)
print("-" * 20)
for b in range(batch_size):
for t in range(block_size):
context = xb[b, : t + 1]
target = yb[b, t]
print(f"when input token(s) is/are {context} the target: {target}")
print(
f"when input character(s) is/are *{decode([c.item() for c in context])}* the target: *{decode([target.item()])}*"
)
print()
if b < batch_size - 1:
print("-" * 20)
print("Next set of contexts/targets")
Super simple Language Model#
Before we start looking at the decoder-only transformer, let’s just see if our data is working by training a super simple model. Again this idea/code is taken from Karpathy’s video.
All this model does is use the embedding table to model token meaning, particularly bad here as our “tokens” are just single characters. It just gets the embeddings from the context by passing through the embedding table (called logits) and then uses cross entropy loss (softmax) to get a loss and logits for the next token.
import torch
import torch.nn as nn
from torch.nn import functional as F
torch.manual_seed(1337)
class BigramLanguageModel(nn.Module):
def __init__(self, vocab_size):
super().__init__()
# each token directly reads off the logits for the next token from a lookup table
self.token_embedding_table = nn.Embedding(vocab_size, vocab_size)
def forward(self, idx, targets=None):
# idx and targets are both (B,T) tensor of integers
logits = self.token_embedding_table(idx) # (B,T,C)
if targets is None:
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B * T, C)
targets = targets.view(B * T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens):
# idx is (B, T) array of indices in the current context
for _ in range(max_new_tokens):
# get the predictions
logits, loss = self(idx)
# focus only on the last time step
logits = logits[:, -1, :] # becomes (B, C)
# apply softmax to get probabilities
probs = F.softmax(logits, dim=-1) # (B, C)
# sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
# append sampled index to the running sequence
idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
return idx
m = BigramLanguageModel(vocab_size)
logits, loss = m(xb, yb)
print(logits.shape)
print(loss)
print(
decode(
m.generate(idx=torch.zeros((1, 1), dtype=torch.long), max_new_tokens=100)[
0
].tolist()
)
)
# optimizer for this very simple network
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)
batch_size = 32
for steps in range(1000): # increase number of steps for "good" results...
# sample a batch of data
xb, yb = get_batch("train")
# evaluate the loss
logits, loss = m(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
print(loss.item()) # much smaller loss
print(
decode( # from our "tokenizer"
m.generate( # from our model
idx=torch.zeros((1, 1), dtype=torch.long), # empty starting context
max_new_tokens=500,
)[0].tolist()
)
) # but these results are not very good
Attention#
Now that we know our data is working and we can use it to reduce the loss in very simple network, we can increase the complexity by examining the attention mechanism at the core of the transformer architecture.
As Karpathy tell us, attention is really just a mathematical trick for aggregating weights in parallelizable and easy to compute way. It consists of having two matrices, called \(a\) and \(b\), and taking their cross product such that the resulting tensor is a weighted average of the two. In fact this is always what we do when we take a cross product, but this \(a\) matrix is special.
We create \(a\) as a matrix with a top right triangle of zeros. This matrix will tell the result matrix which parts of the \(b\) matrix to pay attention to and so this triangular shape with tell the model to only look at certain tokens, specifically so that it replicated the order of the sequence of tokens through the time dimension of our training example.
a = torch.tril(
torch.ones(3, 3)
) # tril creates a matrix with the top triangle made of zeros (masked)
a
# normalize a
a = a / torch.sum(a, 1, keepdim=True)
a # now the "weight" of each row is split up between the non-zero terms
torch.manual_seed(22091997) # for reproducibility
b = torch.randint(0, 10, (3, 2)).float() # random matrix
b
When we take the cross product of \(a\) and \(b\) the sizes must line up:
\(a\) - 3 x 3
\(b\) - 3 x 8
\(c\) (result) - 3 x 8
c = a @ b
c # the weights from a have been distributed across b
# the first row is exactly the same because a tells c to only pay ATTENTION to the first element of b
print(a[0]) # just 1, 0, 0, refers to rows of b
print()
print(b[0])
print(c[0])
# second row is the average of the first two rows of b
print(
a[1]
) # tells c to pay ATTENTION to the first two rows of b but weight your attention by .5 (the normal average)
print()
print(b[1])
print(c[1])
# second row spelled out without `a` matrix
print(f"First row of b: {b[0]}")
print(f"Second row of b: {b[1]}")
print(f"Normal average of first two rows of b: {(b[0] + b[1])/2}")
print(f"Second row of c: {c[1]}") # same!
# third row is the average of all three rows of b
print(
a[2]
) # tells c to pay ATTENTION to all three rows of b but weight your attention by .33 (the normal average)
print()
print(b[2])
print(c[2])
# third row spelled out without `a` matrix
print(f"First row of b: {b[1]}")
print(f"Second row of b: {b[2]}")
print(f"Normal average of first two rows of b: {(b[1] + b[2])/2}")
print(f"Second row of c: {c[2]}") # same!
# consider the following toy example:
torch.manual_seed(1337)
B, T, C = 4, 8, 2 # batch, time, channels
x = torch.randn(B, T, C)
x.shape, x
# We want x[b,t] = mean_{i<=t} x[b,i]
# doing this without a matrix - slow with large matrices
xbow = torch.zeros((B, T, C))
for b in range(B):
for t in range(T):
xprev = x[b, : t + 1] # (t,C)
xbow[b, t] = torch.mean(xprev, 0)
# version 2: using matrix multiply for a weighted aggregation
wei = torch.tril(torch.ones(T, T))
wei = wei / wei.sum(1, keepdim=True)
xbow2 = wei @ x # (B, T, T) @ (B, T, C) ----> (B, T, C)
xbow
xbow2
Self-attention#
As we can see, attention allows us to scale importance of tokens over the sequence of the training example. In our network, we want to learn how best to scale this \(a\) matrix so that it is all we need to predict the next token of the model. This is where the famous title of the paper that introduced the transformer comes from: “Attention Is All You Need”.
To do this scaling, we introduce three new matrices: a key matrix (K), a value matrix (V) and a query matrix (Q). These linear projections learn the affinities between different tokens, so that when we apply the \(a\) matrix, we do so in a data-driven, non-abritrary weight aggregation rather than a simple average.
We can conceptually understand what these linear projections are doing in the schematic and description below:
Token embedding for a given token: “What I am” (
x, below)Key vector for a given token: “What do I contain” (
k, below)Query vector for a given token: “What am I looking for” (
q, below)Value vector for a given token: “What I will communicate to you” (
v, below)

Random example#
This example uses a random matrix. Next we’ll look at this with a real training example.
B, T, C = 4, 8, 32 # batch, time, channels
x = torch.randn(B, T, C) # this would be from our training examples in a real model
x.shape
# creating our keys and values for each token in the training example
head_size = 16
key = nn.Linear(C, head_size, bias=False)
query = nn.Linear(C, head_size, bias=False)
# key projection
k = key(x)
k.shape
# query projection
q = query(x)
q.shape
# determine affinities between what each token wants and what token has
# each element is a score between what each token wants and what token has
weights = q @ k.transpose(-2, -1) # need to reshape to make matmul work
weights.shape
tril = torch.tril(torch.ones(T, T)) # apply the triangular matrix
weights = weights.masked_fill(
tril == 0, float("-inf")
) # these weights are now scaled by the affinities from above
weights.shape, weights
weights = F.softmax(weights, dim=-1) # pass through softmax to get a prob distribution
weights.shape, weights
# value projection
value = nn.Linear(C, head_size, bias=False)
v = value(x)
v.shape
# last matmul to apply the values to the affinities
out = weights @ v
out.shape, out
out[0], out[0].shape
Real training example#
Now we can see what this looks like with a real training example.
batch_size = 4
block_size = 8
x, y = get_batch("train")
x.shape, y.shape
n_embd = 64
token_embedding_table = nn.Embedding(vocab_size, n_embd)
position_embedding_table = nn.Embedding(block_size, n_embd)
tok_emb = token_embedding_table(x) # token embeddings
pos_emb = position_embedding_table(torch.arange(block_size)) # position embeddings
tok_emb.shape, pos_emb.shape
x = tok_emb + pos_emb # elementwise addition to create x
x.shape
# initialize our projections
head_size = 16
key = nn.Linear(n_embd, head_size, bias=False)
query = nn.Linear(n_embd, head_size, bias=False)
value = nn.Linear(n_embd, head_size, bias=False)
k = key(x)
q = query(x)
k.shape, q.shape
weights = q @ k.transpose(-2, -1) * block_size**-0.5 # scaling
weights.shape
tril = torch.tril(torch.ones(T, T))
weights = weights.masked_fill(tril == 0, float("-inf"))
weights.shape
weights = F.softmax(weights, dim=-1)
weights.shape
v = value(x)
v.shape
out = weights @ v
out.shape
Notes from Karpathy:
Attention is a communication mechanism. Can be seen as nodes in a directed graph looking at each other and aggregating information with a weighted sum from all nodes that point to them, with data-dependent weights.
There is no notion of space. Attention simply acts over a set of vectors. This is why we need to positionally encode tokens.
Each example across batch dimension is of course processed completely independently and never “talk” to each other
In an “encoder” attention block just delete the single line that does masking with
tril, allowing all tokens to communicate. This block here is called a “decoder” attention block because it has triangular masking, and is usually used in autoregressive settings, like language modeling.“self-attention” just means that the keys and values are produced from the same source as queries. In “cross-attention”, the queries still get produced from x, but the keys and values come from some other, external source (e.g. an encoder module)
“Scaled” attention additional divides
weiby 1/sqrt(head_size). This makes it so when input Q,K are unit variance, wei will be unit variance too and Softmax will stay diffuse and not saturate too much.
To finish this illustration, I will take this example to a calculation of loss for this training example.
class FeedFoward(nn.Module):
"""a simple linear layer followed by a non-linearity"""
def __init__(self, n_embd, dropout=0.0):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.ReLU(),
nn.Linear(4 * n_embd, n_embd),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
ffwd = FeedFoward(n_embd)
lm_head = nn.Linear(n_embd, vocab_size)
x = ffwd(x) # out
logits = lm_head(x)
logits.shape
B, T, C = logits.shape
logits = logits.view(B * T, C)
targets = y.view(B * T) # targets from above
loss = F.cross_entropy(logits, targets)
loss.item() # loss for this training example
Now that we can calculate a loss value for this model, we could call loss.backward() and get the gradients needed to take a step with our optimizer (optimizer.step()).
Modeling#
Seeing self-attention gave us the tools needed to fully implement the decoder-only transformer. Below are all of the modules that we wrote out above.
Head#
A single “head” of attention is just what we saw above with some slight alterations.
class Head(nn.Module):
"""one head of self-attention"""
def __init__(self, head_size, n_embd=64, dropout=0.0):
super().__init__()
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
self.dropout = nn.Dropout(dropout) # standard dropout
def forward(self, x):
B, T, C = x.shape
k = self.key(x) # (B,T,C)
q = self.query(x) # (B,T,C)
# compute attention scores ("affinities")
wei = q @ k.transpose(-2, -1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf")) # (B, T, T)
wei = F.softmax(wei, dim=-1) # (B, T, T)
wei = self.dropout(wei)
# perform the weighted aggregation of the values
v = self.value(x) # (B,T,C)
out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
return out
ex_head = Head(head_size=16)
ex_head
B, T, C = 4, 8, 64 # pos + tok embeddings
x = torch.randn(B, T, C)
out = ex_head(x)
out.shape
Multihead#
Now we can group together these head into multi-headed attention. This module is able to do the attention calculation in parallel. That’s all that’s different about it.
class MultiHeadAttention(nn.Module):
"""multiple heads of self-attention in parallel"""
def __init__(self, num_heads, head_size, n_embd=64, dropout=0.0):
super().__init__()
self.heads = nn.ModuleList(
[Head(head_size, n_embd=n_embd) for _ in range(num_heads)]
)
self.proj = nn.Linear(n_embd, n_embd)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
out = torch.cat([h(x) for h in self.heads], dim=-1)
out = self.dropout(self.proj(out))
return out
ex_multihead = MultiHeadAttention(num_heads=4, head_size=16)
ex_multihead
out = ex_multihead(x)
out.shape # 16 * 4 = 64
Feed forward and Block#
As we saw above, the feed forward layer added a nonlinearity which allows the model to learn more complex features. In a normal RNN, this layer takes as input the actual token embeddings, but in the transformer, it takes in the output of the attention heads.
The Block just wraps all of what we’ve seen so far in a single module.
class FeedFoward(nn.Module):
"""a simple linear layer followed by a non-linearity"""
def __init__(self, n_embd, dropout=0.0):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.ReLU(),
nn.Linear(4 * n_embd, n_embd),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
class Block(nn.Module):
"""Transformer block: communication followed by computation"""
def __init__(self, n_embd, n_head, dropout=0.0):
# n_embd: embedding dimension, n_head: the number of heads we'd like
super().__init__()
head_size = n_embd // n_head
self.sa = MultiHeadAttention(n_head, head_size, n_embd=n_embd, dropout=dropout)
self.ffwd = FeedFoward(n_embd)
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
x = x + self.sa(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
return x
Full model#
We have seen all of the foundations of the transformer, so now we can put it all together in a single model.
class Transformer(nn.Module):
def __init__(self, n_embd, n_head, n_layer, device):
super().__init__()
# each token directly reads off the logits for the next token from a lookup table
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
self.position_embedding_table = nn.Embedding(block_size, n_embd)
self.blocks = nn.Sequential(
*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]
)
self.ln_f = nn.LayerNorm(n_embd) # final layer norm
self.lm_head = nn.Linear(n_embd, vocab_size)
self.device = device
def forward(self, idx, targets=None):
B, T = idx.shape
# idx and targets are both (B,T) tensor of integers
tok_emb = self.token_embedding_table(idx) # (B,T,C)
pos_emb = self.position_embedding_table(
torch.arange(T, device=self.device)
) # (T,C)
x = tok_emb + pos_emb # (B,T,C)
x = self.blocks(x) # (B,T,C)
x = self.ln_f(x) # (B,T,C)
logits = self.lm_head(x) # (B,T,vocab_size)
if targets is None:
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B * T, C)
targets = targets.view(B * T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens):
# idx is (B, T) array of indices in the current context
for _ in range(max_new_tokens):
# crop idx to the last block_size tokens
idx_cond = idx[:, -block_size:]
# get the predictions
logits, loss = self(idx_cond)
# focus only on the last time step
logits = logits[:, -1, :] # becomes (B, C)
# apply softmax to get probabilities
probs = F.softmax(logits, dim=-1) # (B, C)
# sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
# append sampled index to the running sequence
idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
return idx
Initial Training#
Finally, we can start training with our Latin dataset.
Two things are not typical about this trainining:
Our tokenizer is still primitive
We are running this on the CPU
import torch
import torch.nn as nn
from torch.nn import functional as F
# hyperparameters
batch_size = 16 # how many independent sequences will we process in parallel
block_size = 32 # what is the maximum context length for predictions
max_iters = 5000 # amount of epochs
eval_interval = 100 # every this many epochs we look at the validation set
learning_rate = 1e-3 # learning rate for the optimizer
device = "cuda" if torch.cuda.is_available() else "cpu" # what device to use
eval_iters = 200 # how many iterations in the evaluation
n_embd = 64 # embedding size
n_head = 4 # attention heads
n_layer = 4 # how many blocks
dropout = 0.0 # amount of dropout
# ------------
model = Transformer(n_embd=n_embd, n_head=n_head, n_layer=n_layer, device=device)
m = model.to(device)
print(sum(p.numel() for p in m.parameters()) / 1e6, "M parameters")
# function for estimating the loss during evaluation
@torch.no_grad()
def estimate_loss():
out = {}
model.eval()
for split in ["train", "val"]:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
X, Y = get_batch(split)
logits, loss = model(X.to("cuda"), Y.to("cuda"))
losses[k] = loss.item()
out[split] = losses.mean()
model.train()
return out
train_losses = []
valid_losses = []
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
for epoch in range(max_iters):
if epoch % eval_interval == 0 or epoch == max_iters - 1:
losses = estimate_loss()
train_losses.append(losses["train"])
valid_losses.append(losses["val"])
print(
f"step {epoch}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}"
)
xb, yb = get_batch("train")
xb, yb = xb.to(device), yb.to(device)
logits, loss = model(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
import matplotlib.pyplot as plt
plt.plot(train_losses, label="train")
plt.plot(valid_losses, label="valid")
plt.legend()
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(decode(m.generate(context, max_new_tokens=2000)[0].tolist()))
More typical training#
As opposed to above, this code trains the transformer with a more typical tokenizer and on the GPU.
# tiktoken: same tokenizer as gpt3/3.5/4
!pip install tiktoken -Uq
# extracting text from XML
from bs4 import BeautifulSoup
import re
soup = BeautifulSoup(open("atticus.xml", "r").read(), features="xml")
letters = []
for d in soup.find_all("div2"):
dateline = d.dateline.extract().get_text().strip()
salute = d.salute.extract().get_text().strip()
text = re.sub(r"\s+", " ", d.get_text().strip().replace("\n", ""))
letters.append(dateline + "\n" + salute + "\n" + text)
text = "\n\n".join(letters)
print(len(text))
text[:100]
import tiktoken
tokenizer = tiktoken.get_encoding("cl100k_base")
# example
tokenizer.encode(text[:100])
import torch
data = torch.tensor(
tokenizer.encode(text), dtype=torch.long
) # turning our encoded data into a tensor, as above
print(data.shape, data.dtype)
print(data[:100])
vocab_size = tokenizer.n_vocab # number of all unique tokens in the tokenizer
vocab_size
# reserving 10% of the data for validation, as above
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]
block_size = 8 # small block size to get started, as above
train_data[: block_size + 1] # first block_size chunk, as above
x = train_data[:block_size]
y = train_data[1 : block_size + 1]
for t in range(block_size):
context = x[: t + 1]
target = y[t]
print(f"when input token(s) is/are {context} the target: {target}")
print(
f"when input character(s) is/are *{tokenizer.decode([c.item() for c in context])}* the target: *{tokenizer.decode([target.item()])}*"
)
print()
# putting it all together
torch.manual_seed(1337) # seed for reproducibility
batch_size = 4 # how many starting ids we get initially
block_size = 8 # size of context as before
def get_batch(split):
# generate a small batch of data of inputs x and targets y
data = train_data if split == "train" else val_data # choosing the right data split
ix = torch.randint(
len(data) - block_size, (batch_size,)
) # get a random batch of ids
x = torch.stack(
[data[i : i + block_size] for i in ix]
) # create contexts for each id
y = torch.stack(
[data[i + 1 : i + block_size + 1] for i in ix]
) # create the targets for each context
return x, y
xb, yb = get_batch("train")
print("inputs:")
print(xb.shape)
print(xb)
print("targets:")
print(yb.shape)
print(yb)
print("-" * 20)
for b in range(batch_size):
for t in range(block_size):
context = xb[b, : t + 1]
target = yb[b, t]
print(f"when input token(s) is/are {context} the target: {target}")
print(
f"when input character(s) is/are *{tokenizer.decode([c.item() for c in context])}* the target: *{tokenizer.decode([target.item()])}*"
)
print()
if b < batch_size - 1:
print("-" * 20)
print("Next set of contexts/targets")
Model#
The same code as above, repeated here so you don’t need to run it above.
import torch.nn as nn
class Head(nn.Module):
"""one head of self-attention"""
def __init__(self, head_size, n_embd=64, dropout=0.0):
super().__init__()
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
self.dropout = nn.Dropout(dropout) # standard dropout
def forward(self, x):
B, T, C = x.shape
k = self.key(x) # (B,T,C)
q = self.query(x) # (B,T,C)
# compute attention scores ("affinities")
wei = q @ k.transpose(-2, -1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)
wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf")) # (B, T, T)
wei = F.softmax(wei, dim=-1) # (B, T, T)
wei = self.dropout(wei)
# perform the weighted aggregation of the values
v = self.value(x) # (B,T,C)
out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
return out
class MultiHeadAttention(nn.Module):
"""multiple heads of self-attention in parallel"""
def __init__(self, num_heads, head_size, n_embd=64, dropout=0.0):
super().__init__()
self.heads = nn.ModuleList([Head(head_size, n_embd) for _ in range(num_heads)])
self.proj = nn.Linear(n_embd, n_embd)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
out = torch.cat([h(x) for h in self.heads], dim=-1)
out = self.dropout(self.proj(out))
return out
class FeedFoward(nn.Module):
"""a simple linear layer followed by a non-linearity"""
def __init__(self, n_embd, dropout=0.0):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.ReLU(),
nn.Linear(4 * n_embd, n_embd),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
class Block(nn.Module):
"""Transformer block: communication followed by computation"""
def __init__(self, n_embd, n_head, dropout=0.0):
# n_embd: embedding dimension, n_head: the number of heads we'd like
super().__init__()
head_size = n_embd // n_head
self.sa = MultiHeadAttention(n_head, head_size, n_embd=n_embd, dropout=dropout)
self.ffwd = FeedFoward(n_embd)
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
x = x + self.sa(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
return x
class Transformer(nn.Module):
def __init__(self, n_embd, n_head, n_layer, device):
super().__init__()
# each token directly reads off the logits for the next token from a lookup table
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
self.position_embedding_table = nn.Embedding(block_size, n_embd)
self.blocks = nn.Sequential(
*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]
)
self.ln_f = nn.LayerNorm(n_embd) # final layer norm
self.lm_head = nn.Linear(n_embd, vocab_size)
self.device = device
def forward(self, idx, targets=None):
B, T = idx.shape
# idx and targets are both (B,T) tensor of integers
tok_emb = self.token_embedding_table(idx) # (B,T,C)
pos_emb = self.position_embedding_table(
torch.arange(T, device=self.device)
) # (T,C)
x = tok_emb + pos_emb # (B,T,C)
x = self.blocks(x) # (B,T,C)
x = self.ln_f(x) # (B,T,C)
logits = self.lm_head(x) # (B,T,vocab_size)
if targets is None:
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B * T, C)
targets = targets.view(B * T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens):
# idx is (B, T) array of indices in the current context
for _ in range(max_new_tokens):
# crop idx to the last block_size tokens
idx_cond = idx[:, -block_size:]
# get the predictions
logits, loss = self(idx_cond)
# focus only on the last time step
logits = logits[:, -1, :] # becomes (B, C)
# apply softmax to get probabilities
probs = F.softmax(logits, dim=-1) # (B, C)
# sample from the distribution
idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
# append sampled index to the running sequence
idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
return idx
Training#
Training this model with a full tokenizer. This is deeply inefficient, but a useful example.
import torch
import torch.nn as nn
from torch.nn import functional as F
# hyperparameters
batch_size = 16 # how many independent sequences will we process in parallel
block_size = 64 # what is the maximum context length for predictions
max_iters = 10000 # amount of epochs
eval_interval = 100 # every this many epochs we look at the validation set
learning_rate = 5e-5 # learning rate for the optimizer
device = "cuda" if torch.cuda.is_available() else "cpu" # what device to use
eval_iters = 200 # how many iterations in the evaluation
n_embd = 128 # embedding size
n_head = 16 # attention heads
n_layer = 8 # how many blocks
dropout = 0.0 # amount of dropout
# ------------
model = Transformer(n_embd=n_embd, n_head=n_head, n_layer=n_layer, device=device)
m = model.to(device)
print(sum(p.numel() for p in m.parameters()) / 1e6, "M parameters")
# function for estimating the loss during evaluation
@torch.no_grad()
def estimate_loss():
out = {}
model.eval()
for split in ["train", "val"]:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
X, Y = get_batch(split)
X, Y = X.to(device), Y.to(device)
logits, loss = model(X, Y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train()
return out
train_losses = []
valid_losses = []
## training
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
for epoch in range(max_iters):
if epoch % eval_interval == 0 or epoch == max_iters - 1:
losses = estimate_loss()
train_losses.append(losses["train"])
valid_losses.append(losses["val"])
print(
f"step {epoch}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}"
)
xb, yb = get_batch("train")
xb, yb = xb.to(device), yb.to(device)
logits, loss = model(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
import matplotlib.pyplot as plt
plt.plot(train_losses, label="train")
plt.plot(valid_losses, label="valid")
plt.legend()
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(tokenizer.decode(m.generate(context, max_new_tokens=2000)[0].tolist()))
context_openning = torch.tensor(
tokenizer.encode("Scr. Romae"), dtype=torch.long, device=device
).unsqueeze(dim=0)
print(tokenizer.decode(m.generate(context_openning, max_new_tokens=2000)[0].tolist()))