Transfomer 框架 | 逐层剖析,从原理到代码

本文是笔者根据 2017 年由 Google 的研究者发表的论文《Attention is all you need》深蓝学院的 LLM 课程 总结的一份 Transformer 框架的技术文档。

Transformer 是当下非常流行的深度学习模型,自从其提出来后,它迅速成为了许多 NLP 任务的基础架构,比如机器翻译、文本摘要、情感分析和问答系统,大名鼎鼎的 ChatGPT 系列其实也是基于 Transformer 模型构建的,后来 Transformer 也被大量用于机器视觉的运用,例如 Non-local、ViT 等等。总而言之,Transformer 是一款非常强大的模型。


Transfomer 整体结构

下图展示了 Transformer 的整体结构:

《Attention is all you need》论文原图

编码器 & 解码器

Transformer 主要由两部分组成:即编码器(Encoder)解码器(Decoder)

  • 编码器的作用是处理输入序列,将输入数据转换成一系列连续的表示形式,这些表示包含了输入数据的复杂内部结构信息。在处理自然语言任务时,这通常意味着将一句话或文档编码为一系列向量,每个向量对应输入序列中的一个元素(比如一个单词)。
  • 解码器的作用是基于编码器的输出以及之前已生成的输出,逐步生成目标序列。对于自然语言生成任务来说,解码器会一个接一个地生成词汇,直到产生结束符号为止。

整体来看,编码器负责理解输入数据,将其转化为一系列高维的内部表示;解码器则负责将这些表示转换为意义明确的输出序列。两者协同工作。以机器翻译为例,以下展示了 Transformer 的整体工作原理。

嵌入层 & 位置编码

嵌入层

嵌入层(Embedding Layer)是 Transformer 输入的第一层,一般而言是通过某种特定的变换(比如 Word2Vec 技术),将输入单词的 One-Hot 编码转化为空间中连续的向量,其维度与模型维度 \(d_{model}\) 一致。

通过这种方式,模型能够将离散的、符号化的输入转换为连续的向量,这些向量是神经网络能够处理的形式。

位置编码

位置编码(Positioning Encoding)是 Transformer 模型中的一个关键创新,用于解决模型缺乏序列顺序信息的问题。因为 Transformer 不使用递归神经网络结构,所以需要某种机制来利用输入序列中单词的位置信息。

Transformer 的原始论文提供了一种使用三角函数的位置编码方案:

对于序列中的每个位置 \(pos\) 和维度索引 \(i\) ,位置编码如下:

  • 对于偶数索引 \(2i\)\[ PE_{(pos, 2 i)} = sin(\frac{ {pos}}{10000 ^ {\frac{2i}{d_{model}}}}) \]

  • 对于奇数索引 \(2i + 1\)\[ PE_{(pos, 2 i + 1)} = cos(\frac{pos}{10000 ^ {\frac{2i}{d_{model}}}}) \]

这说明了对于不同维度的特征,其位置编码有不一样的频率。

为什么使用正弦函数进行位置编码:
  1. 正弦函数具有周期性,可以很好地捕获一些自然语言的周期性问题与重复出现的模式;
  2. 事实上在自然语言中,很多语法结构是重复出现的。

注意力机制

注意力机制是 Transformer 的核心组件之一,其允许模型在处理序列的每个元素时动态地聚焦于序列中的其他部分,这种方式对于理解序列内各元素之间的复杂关系尤其有效。

自注意力机制

自注意力(Self-Attention)是一种特殊形式的注意力机制,使模型能够在处理序列的每一个元素时,考虑到序列中的所有元素。对于给定的输入序列,自注意力允许每一个输出在生成时加权引入输入序列中所有位置的信息。

在自注意力机制中,每一个输入会被映射到三个不同的向量,它们通常由学习得到的权重矩阵生成:

  • 查询向量(Query) \[ q_0 = x_0 W_{xq} \]

  • 键向量(Key) \[ k_0 = x_0 W_{xk} \]

  • 值向量(Value) \[ v_0 = x_0 W_{xv} \]

对于序列中的每一个元素,自注意力的计算过程可以分解为以下步骤:

  1. 点积计算:计算查询向量与键向量的点积,得到一个分数,表示该元素与序列中每个元素的兼容性或关联程度。
  2. 缩放:通常将点积的结果除以一个缩放因子(通常是键向量维度的平方根),以避免梯度消失或爆炸。
  3. Softmax 归一化:应用 Softmax 函数将这些分数转换为概率,用于确定每个元素应该赋予多少关注度。
  4. 加权和:利用 Softmax 的输出作为权重,结合值向量计算加权和,得到自注意力的输出。

如上图,计算出所有加权后,我们就可以计算出自注意力的输出: \[ \hat x_i = \sum_{j} \hat a_i ^ j v_j \] 自注意力机制的矩阵表达: \[ Z = softmax(\frac{Q K ^ T}{\sqrt{d_k}}) V \] 其中: \[ Q = [q_0 ^ T \ q_1 ^ T\ ... \ q_n ^ T] ^ T \]

\[ K = [k_0 ^ T \ k_1 ^ T\ ... \ k_n ^ T] ^ T \]

\[ V = [v_0 ^ T \ v_1 ^ T\ ... \ v_n ^ T] ^ T \]

\[ Z = [\hat x_0 ^ T \ \hat x_1 ^ T\ ... \ \hat x_n ^ T] ^ T \]

多头注意力机制

多头注意力(Multi-Head Attention)是将自注意力扩展为多个并行的头部。它允许模型在不同的子空间中并行地学习输入之间的不同表示。每个头部执行上述自注意力计算,但是会有不同的权重矩阵。

对于不同的权重矩阵 \(W_{xq}^{(i)}\)\(W_{xk}^{(i)}\)\(W_{xv}^{(i)}\),可以计算出不同的输出 \(Z_i\)

假设我们有一共 8 个头部,即 24 个权重矩阵,则我们一共可以得到 8 个输出。

接下来我们考虑将 8 个头部的输出矩阵进行拼接,然后右乘一个权重矩阵 \(W_O\) 即可得到最后的输出。

\[ Z = [Z_0 \ Z_1 \ ... Z_7] W_O \] 通过这种方式,模型可以捕获数据在不同表示空间中的不同特征并且可以在不同级别的抽象上理解信息

编码器中的多头注意力

多头注意力机制的优势

  1. 并行计算:注意力机制计算每个元素的输出时,不依赖于其他元素的输出,因此可以高效地并行处理。
  2. 捕捉长距离依赖:自注意力可以直接计算序列内任意两个元素之间的交互,不受它们在序列中位置距离的影响,这对于捕捉长距离依赖关系非常有效。
  3. 灵活的关注焦点:模型可以学习在不同的上下文中关注序列的不同部分,这种动态的关注机制对于理解和生成语言至关重要。
解码器中的多头注意力机制
  • 添加掩码的多头注意力

    Masked Multi-Head Attention 是解码器(Decoder)部分的核心组件,它使得解码器能够在生成序列时只关注到当前位置之前的输出,而不是之后的输出,从而防止信息的提前泄露。

    在具体操作中,解码器的每个时间步都会生成一个掩码(mask),用于屏蔽(mask out)那些不应被当前位置所“看到”的未来位置。然后在计算 Softmax 函数前,这个掩码会被加到注意力对数分数上(即,对应未来位置的分数变成非常大的负数)。这样经过Softmax 激活函数后,这些位置的注意力权重接近于零,确保模型不会考虑未来的单词。

    举个简单的例子,假设在某一时间步,我们正在尝试生成一个句子的第三个词,Masked Attention 会确保注意力机制只会考虑第一个词和第二个词,而不是之后的词。掩码矩阵可能如下所示:

    \[ \begin{bmatrix} 0 & -\infty & -\infty & -\infty \\ 0 & 0 & -\infty & -\infty \\ 0 & 0 & 0 & -\infty \\ 0 & 0 & 0 & 0 \\ \end{bmatrix} \] 这个矩阵会与注意力对数分数相加,确保在进行 Softmax 时,每一行的未来位置的权重都接近于零。

  • 编码器-解码器注意力

    编码器-解码器注意力层在解码器中扮演着关键角色,它允许解码器聚焦编码器的输出。这个注意力层作为桥梁将编码器的信息传递给解码器。解码器通过这个层来查看编码器的输出,并结合自身已生成的部分翻译来预测下一个单词。

    这一层的多头注意力机制使用解码器的输出作为查询(Query),而将编码器的输出作为键(Key)和值(Value)。通过计算查询与键的相似度,得到一个注意力权重,然后用这个权权重来加权对应的值,生成这一层的输出。

层归一化与残差连接

Transformer 架构中使用了层归一化(Layer Normalization)残差连接(Residual Connection)来促进深度网络的训练。这两种技术都是为了解决训练深层神经网络时常遇到的问题,比如梯度消失和梯度爆炸。

  • 层归一化

    层归一化是一种在训练深度神经网络时常用的技术,其目的是稳定神经网络的学习过程。层归一化的工作原理是对单个样本的所有激活进行归一化,这和批量归一化(Batch Normalization)不同,后者是对同一层中不同样本的同一激活进行归一化。层归一化可以使得网络层的输出更加稳定。

    具体步骤

    对于一个具体的样本,在网络的每一层(通常在非线性激活函数前),层归一化按以下步骤进行:

    1. 计算所有激活值 \(x_i\) 的均值 \(\mu\) 和方差 \(\sigma ^ 2\)
    2. 对每个 \(x_i\) 进行归一化:\(x_i := \frac{x_i - \mu}{\sqrt{\sigma ^ 2 + \epsilon}}\)(这里的 \(\epsilon\) 是一个很小的正实数,用于防止出现分母为 0 的情况)
    3. 应用可学习的参数 \(\gamma\)\(\beta\) 来缩放和位移归一化后的值:\(x_i := \gamma x_i + \beta\)
  • 残差连接

    残差连接,也称作跳跃连接(Skip Connection),是指在网络的某一层上将输入直接加到该层的输出上。它允许梯度直接流过网络,这有助于训练过程中更有效地传播梯度,减轻梯度消失的问题。

    在 Transformer 中,残差连接的数学表达式可以写为: \[ Output = LayerNorm(Input + Sublayer(Input)) \] 这里的 \(Sublayer()\) 指多头注意力层或者前馈神经网络层的输出。\(LayerNorm()\) 则表示层归一化。通过这种方式,即使网络非常深,输入信息也可以在网络中更远地传播。

前馈神经网络

前馈神经网络(Feed-Forward Network,FFN)是每个编码器和解码器层的主要组成部分之一。

Transformer 中的 FNN 由两个全连接层组成,中间有一个 ReLU 激活函数,数学表达式如下: \[ FNN(x) = max(0, xW_1 + b_1) W_2 + b_2 \] 前馈神经网络为 Transformer 模型提供了非线性处理能力。虽然自注意力层非常擅长处理输入序列中元素之间的依赖关系,但它本质上是一个线性操作。前馈神经网络通过非线性变换,增加了模型的表达能力。

除了使用两个线性层,这里也可以使用两个一维卷积层实现同样的功能: \[ FFN(x) = max(0, x * w_1 + b_1) * w_2 + b_2 \]

代码实现

组件一:注意力机制

缩放点积注意力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import torch
from torch import nn

d_k = 64
d_v = 64
# 向量维度

class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()

def forward(self, Q, K, V, attn_mask):
'''
Q(..., n_seq, d_k)
K(..., n_seq, d_k)
V(..., n_seq, d_v)
attn_mask(..., n_seq, n_seq)
'''
scores = torch.matmul(Q, K.transpose(-2, -1)) / d_k ** 0.5
scores.masked_fill_(attn_mask, -1e9) # 掩码机制
# scores(..., n_seq, n_seq)
weights = torch.softmax(scores, dim=-1)
# weights(..., n_seq, n_seq)
context = torch.matmul(weights, V)
# context(..., n_seq, d_v)

return context, weights # 返回上下文向量和注意力分数
'''
context(..., n_seq, d_v)
weights(..., n_seq, n_seq)
'''
多头注意力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import torch
from torch import nn
from ScaledDotProductAttention import *

d_embedding = 512
n_heads = 8
batch_size = 3

class MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_embedding, d_k * n_heads)
self.W_K = nn.Linear(d_embedding, d_k * n_heads)
self.W_V = nn.Linear(d_embedding, d_v * n_heads)
self.linear = nn.Linear(d_v * n_heads, d_embedding)
self.layer_norm = nn.LayerNorm(d_embedding)

def forward(self, x1, x2, x3, attn_mask):
'''
x1, x2, x3(batch_size, n_seq, d_embedding)
attn_mask(batch_size, n_seq, n_seq)
'''
# 对于自注意力而言,x1 == x2 == x3
residual, batch_size = x1, x1.size(0) # 保留残差连接

# 拆分头
q_s = self.W_Q(x1).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
k_s = self.W_K(x2).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
v_s = self.W_V(x3).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
'''
W_Q(x1), W_K(x2) (batch_size, n_seq, d_k * n_heads)
q_s, k_s(batch_size, n_heads, n_seq, d_k)
W_V(x3) (batch_size, n_seq, d_v * n_heads)
v_s(batch_size, n_heads, n_seq, d_v)
'''

# 将注意力掩码复制到多头 attn_mask
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# attn_mask(batch_size, n_heads, n_seq, n_seq)

# 使用缩放点积注意力计算上下文向量和注意力权重
context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
'''
context(batch_size, n_heads, n_seq, d_v)
weights(batch_size, n_heads, n_seq, n_seq)
'''
# 拼接输出
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
# context(batch_size, n_seq, n_heads * d_v)

# 线性变换
output = self.linear(context)
# output(batch_size, n_seq, d_embedding)

# 残差连接和层归一化
output = self.layer_norm(output + residual)
# output(batch_size, n_seq, d_embedding)

return output, weights
'''
output(batch_size, n_seq, d_embedding)
weights(batch_size, n_heads, n_seq, n_seq)
'''

组件二:前馈神经网络

使用线性全连接层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import torch
from torch import nn

d_mid = 2048

class FFN(nn.Module):
def __init__(self):
super(FFN, self).__init__()
self.linear1 = nn.Linear(d_embedding, d_mid)
self.linear2 = nn.Linear(d_mid, d_embedding)
self.layer_norm = nn.LayerNorm(d_embedding)

def forward(self, inputs):
residual = inputs
output = torch.relu(self.linear1(inputs))
output = self.linear2(output)
output = self.layer_norm(output + residual) # 残差连接与层归一化

return output
使用一维卷积层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import torch
from torch import nn

d_mid = 2048

class FFN(nn.Module):
def __init__(self):
super(FFN, self).__init__()
self.conv1 = nn.Conv1d(d_embedding, d_mid, kernel_size=1)
self.conv2 = nn.Conv1d(d_mid, d_embedding, kernel_size=1)
self.layer_norm = nn.LayerNorm(d_embedding)

def forward(self, inputs):
residual = inputs
output = torch.relu(self.conv1(inputs.transpose(1, 2)))
output = self.conv2(output).transpose(1, 2)
output = self.layer_norm(output + residual)

return output

组件三:正弦位置编码表

1
2
3
4
5
6
7
8
9
10
11
12
def get_sin_enc_table(n_position, d_embedding):
sinusoid_table = torch.zeros(n_position, d_embedding, dtype=torch.float64)
for pos_i in range(n_position):
for hid_j in range(d_embedding):
sinusoid_table[pos_i, hid_j] = pos_i / (10000 ** ((hid_j & ~1) / d_embedding))
sinusoid_table[:, 0::2] = torch.sin(sinusoid_table[:, 0::2])
sinusoid_table[:, 1::2] = torch.cos(sinusoid_table[:, 1::2])

return sinusoid_table
'''
sinusoid_table(n_position, d_embedding)
'''

组件四:填充位置掩码生成函数

填充位置掩码是 Transformer 中一种特殊的方法,用来防止对填充部分进行学习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_attn_pad_mask(seq_q, seq_k):
'''
len_q == len_k
seq_q(batch_size, n_seq)
seq_k(batch_size, n_seq)
'''
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # 因为在许多 NLP 任务中 0 被用作填充值
# pad_attn_mask(batch_size, 1, n_seq)
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k)

return pad_attn_mask
'''
pad_attn_mask(batch_size, n_seq, n_seq)
'''

组件五:编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import torch
from torch import nn
from MultiHeadAttention import *
from FFN import *
from functions import *

class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.ffn = FFN()

def forward(self, enc_inputs, enc_self_attn_mask):
'''
enc_inputs(batch_size, n_seq, d_embedding)
enc_self_attn_mask(batch_size, n_seq, n_seq)
'''
enc_outputs, attn_weights = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask)
enc_outputs = self.ffn(enc_outputs)

return enc_outputs, attn_weights
'''
enc_outputs(batch_size, n_seq, d_embedding)
attn_weights(batch_size, n_heads, n_seq, n_seq)
'''


n_layers = 6 # EncoderLayer 的层数
class Encoder(nn.Module):
def __init__(self, corpus):
super(Encoder, self)
self.src_emb = nn.Embedding(len(corpus.src_vocab), d_embedding)
self.pos_emb = nn.Embedding.from_pretrained(get_sin_enc_table(corpus.src_len + 1, d_embedding=d_embedding), freeze=True)
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

def forward(self, enc_inputs):
# 生成位置编码并与词嵌入向量相加
pos_indices = torch.arange(1, enc_inputs.size(1) + 1).unsqueeze(0).to(enc_inputs)
enc_outputs = self.src_emb(enc_inputs) + self.pos_emb(pos_indices)
# enc_outputs(batch_size, n_seq, d_embedding)

enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # 生成自注意力掩码
# enc_self_attn_mask(batch_size, n_seq, n_seq)

enc_self_attn_weights = []
for layer in self.layers:
enc_outputs, enc_self_attn_weight = layer(enc_outputs, enc_self_attn_mask)
enc_self_attn_weights.append(enc_self_attn_weight)
'''
enc_outputs(batch_size, n_seq, d_embedding)
enc_self_attn_weight(batch_szie, n_heads, n_seq, n_seq)
'''

return enc_outputs, enc_self_attn_weights # 返回编码器输出和编码器注意力权重

组件六:后续位置掩码

前文已经提及,在序列生成任务中,解码器每个时间步都依赖于前面已生成的部分序列,后续位置掩码是为了防止当前位置依赖未来的信息。

1
2
3
4
5
def get_attn_subsequent_mask(seq):
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
subsequent_mask = torch.triu(torch.ones(attn_shape)).bool()

return subsequent_mask

组件七:解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import torch
from torch import nn
from MultiHeadAttention import *
from FFN import *
from functions import *

class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.ffn = FFN()

def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
'''
dec_inputs, enc_outputs(batch_size, n_seq, d_embedding)
dec_self_attn_mask, dec_enc_attn_mask(batch_size, n_seq, n_seq)
'''
# 多头自注意力层
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)
'''
dec_outputs(batch_size, n_seq, d_embedding)
dec_self_attn(batch_size, n_heads, n_seq, n_seq)
'''
# 编码器-解码器多头注意力层
# 解码器的第一层注意力输出生成 Q, 编码器的最终输出生成 K & V
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)
dec_outputs = self.ffn(dec_outputs)
'''
dec_outputs(batch_size, n_seq, d_embedding)
dec_enc_attn(batch_size, n_heads, n_seq, n_seq)
'''

return dec_outputs, dec_self_attn, dec_enc_attn


n_layers = 6 # DecoderLayer 的层数
class Decoder(nn.Module):
def __init__(self, corpus):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(len(corpus.tgt_vacab), d_embedding)
self.pos_emb = nn.Embedding.from_pretrained(get_sin_enc_table(corpus.tgt_len + 1, d_embedding=d_embedding), freeze=True)
self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])

def forward(self, dec_inputs, enc_inputs, enc_outputs):
'''
dec_inputs, enc_inputs, enc_outputs(batch_size, n_seq)
'''
# enc_inputs 在这里仅用于生成掩码
pos_indices = torch.arange(1, dec_inputs.size(1) + 1).unsqueeze(0).to(dec_inputs)
# pos_indices(1, n_seq) -> pos_emb(pos_indices) (1, n_seq, embedding)
dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(pos_indices)
# (batch_size, n_seq, d_embedding) + (1, n_seq, d_embedding) -> dec_outputs(batch_size, n_seq, d_embedding)
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) # 填充位掩码
# dec_self_attn_pad_mask(batch_size, n_seq, n_seq)
dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs) # 后续位掩码
# dec_self_attn_subsequent_mask(batch_size, n_seq, n_seq)
dec_self_attn_mask = torch.gt(dec_self_attn_pad_mask + dec_self_attn_subsequent_mask, 0) # 两掩码相加
# dec_self_attn_mask(batch_size, n_seq, n_seq)

dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # 解码器-编码器掩码
# dec_enc_attn_mask(batch_size, n_seq, n_seq)

dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
# dec_outputs(batch_size, n_seq, d_embedding)
# dec_self_attn, dec_enc_attn(batch_size, n_heads, n_seq, n_seq)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)

return dec_outputs, dec_self_attns, dec_enc_attns

完整组合

有了以上铺垫,我们就可以实现完整的 Transformer 框架了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import torch
from torch import nn
from MultiHeadAttention import *
from FFN import *
from functions import *
from Encoder import *
from Decoder import *

class Transformer(nn.Module):
def __init__(self, corpus):
super(Transformer, self).__init__()
self.encoder = Encoder(corpus=corpus) # 编码器
self.decoder = Decoder(corpus=corpus) # 解码器
self.proj = nn.Linear(d_embedding, len(corpus.tgt_vacab), bias=False) # 最后的全连接

def forward(self, enc_inputs, dec_inputs):
'''
enc_inputs, dec_inputs(batch_size, n_seq)
'''
# 将输入传递给编码器, 获取编码器输出和自注意力权重
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
'''
enc_outputs(batch_size, n_seq, d_embedding)
enc_self_attns(n_layers, batch_size, n_heads, n_seq, n_seq)
'''
# 将解码器输入、编码器输入和编码器输出输入给解码器
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)
'''
dec_outputs(batch_size, n_seq, d_embedding)
dec_self_attns, dec_enc_attns(n_layers, n_heads, batch_size, n_seq, n_seq)
'''
# 将解码器输出传递给最后的全连接层
dec_logits = self.proj(dec_outputs)
# dec_logits(batch_size, n_seq, num_of_voc)

return dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns

案例:基于 Transformer 的翻译器

定义语料库类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from collections import Counter # 导入 Counter 类
import jieba # 使用 jieba 库分词

# 定义 TranslationCorpus 类
class TranslationCorpus:
def __init__(self, sentences):
self.sentences = sentences
self.src_sen = [' '.join(jieba.cut(sentence[0])) for sentence in sentences] # 中文句子库
self.tgt_sen = [sentence[1] for sentence in sentences] # 英文句子库
# 计算源语言和目标语言的最大句子长度,并分别加 1 和 2 以容纳填充符和特殊符号
self.src_len = max(len(s.split()) for s in self.src_sen) + 1 # 这里包括 <pad>
self.tgt_len = max(len(s.split()) for s in self.tgt_sen) + 2 # 这里包括 <sos>, <eos>
# 创建源语言和目标语言的词汇表
self.src_vocab, self.tgt_vocab = self.create_vocabularies()
# 创建索引到单词的映射
self.src_idx2word = {v: k for k, v in self.src_vocab.items()}
self.tgt_idx2word = {v: k for k, v in self.tgt_vocab.items()}

# 创建词汇表的函数
def create_vocabularies(self):
# 统计源语言和目标语言的单词频率
src_counter = Counter(w for sen in self.src_sen for w in sen.split())
tgt_counter = Counter(w for sen in self.tgt_sen for w in sen.split())
# 创建源语言和目标语言的词汇表,并为每个单词分配一个唯一的索引
src_vocab = {'<pad>': 0, **{word: i + 1 for i, word in enumerate(src_counter)}}
tgt_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, **{word: i + 3 for i, word in enumerate(tgt_counter)}}

return src_vocab, tgt_vocab
# 创建批次数据的函数
def make_batch(self, batch_size, test_batch=False):
input_batch, output_batch, target_batch = [], [], []
# 随机选择句子索引
sentence_indices = torch.randperm(len(self.sentences))[:batch_size]
for index in sentence_indices:
src_sentence, tgt_sentence = self.src_sen[index], self.tgt_sen[index]
# 将源语言和目标语言的句子转换为索引序列
src_seq = [self.src_vocab[word] for word in src_sentence.split()]
tgt_seq = [self.tgt_vocab['<sos>']] + [self.tgt_vocab[word] for word in tgt_sentence.split()] + [self.tgt_vocab['<eos>']]
# 对源语言和目标语言的序列进行填充
src_seq += [self.src_vocab['<pad>']] * (self.src_len - len(src_seq))
tgt_seq += [self.tgt_vocab['<pad>']] * (self.tgt_len - len(tgt_seq))
# 将处理好的序列添加到批次中
input_batch.append(src_seq)
output_batch.append([self.tgt_vocab['<sos>']] + ([self.tgt_vocab['<pad>']] * (self.tgt_len - 2)) if test_batch else tgt_seq[:-1])
target_batch.append(tgt_seq[1:])
# 将批次转换为 LongTensor 类型
input_batch = torch.LongTensor(input_batch)
output_batch = torch.LongTensor(output_batch)
target_batch = torch.LongTensor(target_batch)

return input_batch, output_batch, target_batch

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 准备一些中译英语料
sentences = [
['我', 'I'],
['我是', 'I am'],
['中国人', 'Chinese'],
['美国人', 'American'],
['苹果', 'apples'],
['香蕉', 'bananas'],
['菠萝', 'pineapples'],
['我和中国人', 'Chinese and I'],
['我和美国人', 'American and I'],
['中国人和美国人', 'Chinese and American'],
['美国人喜欢苹果', 'American like apples'],
['中国人喜欢菠萝', 'Chinese like pineapples'],
['我是中国人', 'I am Chinese'],
['我是美国人', 'I am American'],
['我喜欢苹果', 'I like apples'],
['我喜欢香蕉', 'I like bananas'],
['我喜欢菠萝', 'I like pineapples']
]
corpus = TranslationCorpus(sentences) # 创建语料库类实例

import torch.optim as optim # 导入优化器
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer(corpus).to(device) # 创建模型实例
criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = optim.Adam(model.parameters(), lr=0.0001) # 优化器
for epoch in range(100): # 训练 100 轮
optimizer.zero_grad() # 梯度清零
# Generate a batch of input, output, and target sequences
enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size) # 创建训练数据
enc_inputs, dec_inputs, target_batch = enc_inputs.to(device), dec_inputs.to(device), target_batch.to(device)
outputs, _, _, _ = model(enc_inputs, dec_inputs) # 获取模型输出
# target_batch(batch_size, n_seq)
# outputs(batch_size, n_seq, num_of_voc)
loss = criterion(outputs.view(-1, len(corpus.tgt_vocab)), target_batch.view(-1)) # 计算损失
if (epoch + 1) % 20 == 0: # 打印损失
print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}")
loss.backward()
optimizer.step()

输出

1
2
3
4
5
Epoch: 0020 cost = 0.566277
Epoch: 0040 cost = 0.339869
Epoch: 0060 cost = 0.029208
Epoch: 0080 cost = 0.006405
Epoch: 0100 cost = 0.004169

解码器输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建一个大小为 1 的批次,目标语言序列 dec_inputs 在测试阶段,仅包含句子开始符号 <sos>
enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size=1,test_batch=True)
enc_inputs, dec_inputs, target_batch = enc_inputs.to(device), dec_inputs.to(device), target_batch.to(device)
print("编码器输入:", enc_inputs) # 打印编码器输入
print("解码器输入:", dec_inputs) # 打印解码器输入
print("目标数据:", target_batch) # 打印目标数据
predict, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs) # 用模型进行翻译
predict = predict.view(-1, len(corpus.tgt_vocab)) # 将预测结果维度重塑
predict = predict.data.max(1, keepdim=True)[1] # 找到每个位置概率最大的词汇的索引
# 解码预测的输出,将所预测的目标句子中的索引转换为单词
translated_sentence = [corpus.tgt_idx2word[idx.item()] for idx in predict.squeeze()]
# 将输入的源语言句子中的索引转换为单词
input_sentence = [corpus.src_idx2word[idx.item()] for idx in enc_inputs[0]]
print(input_sentence, '->', translated_sentence) # 打印原始句子和翻译后的句子

输出结果

1
2
3
4
编码器输入: tensor([[ 1, 10,  7,  0,  0,  0]])
解码器输入: tensor([[1, 0, 0, 0]])
目标数据: tensor([[ 3, 11, 8, 2]])
['我', '喜欢', '香蕉', '<pad>', '<pad>', '<pad>'] -> ['I', 'I', 'I', 'I']

这里只生成了一个 token,因为还没有使用到 Transformer 解码层的自回归机制。

生成式解码

在 Transformer 模型中,我们通过最大化预测正确词的概率来优化模型。而在推理的过程中,我们可以以选择概率最大的词作为下一个词,这就是我们常说的贪心搜索。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def greedy_decoder(model, enc_input, start_symbol, steps=5):
enc_outputs, enc_self_attns = model.encoder(enc_input)
dec_input = torch.zeros(1, steps).type_as(enc_input)
next_symbol = start_symbol
for i in range(0, steps):
dec_input[0, i] = next_symbol
if next_symbol == corpus.tgt_vocab['<eos>']: break
dec_output, _, _ = model.decoder(dec_input, enc_input, enc_outputs)
proj = model.projection(dec_output)
# proj(batch_size=1, n_seq, num_of_voc)
prob = proj.squeeze(0).max(dim=-1, keepdim=False)[1]
# prob(batch_size=1, n_seq)
next_word = prob.data[i]
next_symbol = next_word.item() # 预测下一词
final_output = dec_input

return final_output
1
2
3
4
5
6
7
8
9
10
# 使用贪婪搜索翻译文本
enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size=1, test_batch=True)
enc_inputs, dec_inputs, target_batch = enc_inputs.to(device), dec_inputs.to(device), target_batch.to(device)
print("编码器输入:", enc_inputs) # 打印编码器输入
print("解码器输入:", dec_inputs) # 打印解码器输入
print("目标数据:", target_batch) # 打印目标数据
greedy_dec_output = greedy_decoder(model, enc_inputs, start_symbol=corpus.tgt_vocab['<sos>'], steps=corpus.tgt_len)
greedy_dec_output_words = [corpus.tgt_idx2word[n.item()] for n in greedy_dec_output.squeeze()]
enc_inputs_words = [corpus.src_idx2word[code.item()] for code in enc_inputs[0]]
print(enc_inputs_words, '->', greedy_dec_output_words)

输出

1
2
3
4
编码器输入: tensor([[1, 6, 7, 0, 0]])
解码器输入: tensor([[1, 0, 0, 0]])
目标数据: tensor([[3, 7, 8, 2]])
['我', '喜欢', '苹果', '<pad>', '<pad>'] -> ['<sos>', 'I', 'like', 'apples', '<eos>']

测试

接下来我们现在找个训练集没有出现的数据来测试模型的泛化能力:

1
2
3
4
5
6
7
8
test_enc_inputs = [corpus.src_vocab[w] for w in list(jieba.cut('美国人喜欢香蕉'))]
test_enc_inputs = torch.LongTensor(test_enc_inputs + (corpus.src_len - len(test_enc_inputs)) * [corpus.src_vocab['<pad>']]).unsqueeze(0)
dec_inputs = torch.LongTensor([corpus.tgt_vocab['<sos>']] + (corpus.tgt_len - 1) * [corpus.tgt_vocab['<pad>']]).unsqueeze(0)

greedy_test_dec_output = greedy_decoder(model, test_enc_inputs, start_symbol=corpus.tgt_vocab['<sos>'], steps=corpus.tgt_len)
ans = [corpus.tgt_idx2word[idx.item()] for idx in greedy_test_dec_output[0]]

print(ans)

输出

1
['<sos>', 'American', 'like', 'bananas', '<eos>']

Transformer 的发展

NLP 领域

自从 Transformer 架构在 2017 年被提出后,大型语言模型的发展经历了显著的变革和快速的进步。

Transformer 在 NLP 中的发展

  1. 2018 年,BERT(Bidirectional Encoder Representations from Transformers)由 Google 提出,BERT 是基于 Transformer 的首个重要应用。
  2. 2018 - 2020 年,OpenAI 发布了 GPT(Generative Pretrained Transformer)系列模型,包括 GPT-2 和 GPT-3。GPT-3 特别因其巨大的规模(1750亿个参数)和强大的生成能力而受到关注。
  3. 2021 - 2023 年,模型如 GPT-4 和 Google 的 PaLM(Pathways Language Model)通过扩大规模和改进训练技巧,进一步提高了性能。
  4. ...

视觉领域

Transformer 最初是为自然语言处理(NLP)任务设计的,但其核心思想—基于注意力机制的模型架构—也被发现对于处理视觉信息非常有效。因此,Transformer 在机器视觉领域的应用也变得越来越广泛。

ViT

Google Research 在 2020 年推出了 Vision Transformer(ViT),这是首次将 Transformer 完全应用于图像识别任务。ViT 将图像分割成固定大小的图像块,将每个图像块展平并映射到一个高维空间(就像NLP中的词嵌入),然后在这些块上应用标准的 Transformer 模型。ViT 在多个图像识别基准测试中表现出色,与当时的卷积神经网络(CNN)模型相比,它在一些任务上达到了更好的性能。

MAE

MAE(Masked Autoencoders)技术是一种自监督学习方法,它是通过预训练一个神经网络以重建被随机遮挡(mask)的输入数据的方法来学习数据的有效表示。这个技术通常与自编码器(Autoencoder)相关,自编码器是一个试图通过较小的隐藏层来重构输入的神经网络结构。MAE 通过随机遮挡输入数据的一部分,然后让网络预测这些遮挡部分的原始内容,从而迫使模型学习到数据的内在结构和模式。

MAE 技术可以应用于多种类型的数据,包括图像、文本、声音等。在视觉任务中,MAE 的一个例子是,输入一张图像,然后在图像上随机选择一些像素或区域并将其遮挡,随后模型的任务是预测这些遮挡区域的像素值。这种方法迫使模型学习到图像的低级特征(如边缘和纹理)以及更高级的概念(如对象的部分和整体结构),因为要正确重建被遮挡的部分,模型需要理解其周围的上下文。


Transfomer 框架 | 逐层剖析,从原理到代码
https://goer17.github.io/2023/11/15/Transfomer 框架/
作者
Captain_Lee
发布于
2023年11月15日
许可协议