本文是笔者根据 2017 年由 Google 的研究者发表的论文《Attention is all you
need》 与 深蓝学院的 LLM 课程
总结的一份 Transformer 框架的技术文档。
Transformer
是当下非常流行的深度学习模型,自从其提出来后,它迅速成为了许多 NLP
任务的基础架构,比如机器翻译、文本摘要、情感分析和问答系统,大名鼎鼎的
ChatGPT 系列其实也是基于 Transformer 模型构建的,后来 Transformer
也被大量用于机器视觉的运用,例如 Non-local、ViT
等等。总而言之,Transformer 是一款非常强大的模型。
Transfomer 整体结构
下图展示了 Transformer 的整体结构:
编码器 & 解码器
Transformer
主要由两部分组成:即编码器(Encoder) 和解码器(Decoder) 。
编码器 的作用是处理输入序列 ,将输入数据转换成一系列连续的表示形式,这些表示包含了输入数据的复杂内部结构信息。在处理自然语言任务时,这通常意味着将一句话或文档编码为一系列向量,每个向量对应输入序列中的一个元素(比如一个单词)。
解码器 的作用是基于编码器的输出以及之前已生成的输出,逐步生成目标序列 。对于自然语言生成任务来说,解码器会一个接一个地生成词汇,直到产生结束符号为止。
整体来看,编码器负责理解输入数据,将其转化为一系列高维的内部表示;解码器则负责将这些表示转换为意义明确的输出序列。两者协同工作。以机器翻译为例,以下展示了
Transformer 的整体工作原理。
嵌入层 & 位置编码
嵌入层
嵌入层(Embedding Layer) 是 Transformer
输入的第一层,一般而言是通过某种特定的变换(比如 Word2Vec
技术),将输入单词的 One-Hot
编码转化为空间中连续的向量,其维度与模型维度 \(d_{model}\) 一致。
通过这种方式,模型能够将离散的、符号化的输入转换为连续的向量,这些向量是神经网络能够处理的形式。
位置编码
位置编码(Positioning Encoding) 是 Transformer
模型中的一个关键创新,用于解决模型缺乏序列顺序信息的问题。因为
Transformer
不使用递归神经网络结构,所以需要某种机制来利用输入序列中单词的位置信息。
Transformer 的原始论文提供了一种使用三角函数的位置编码方案:
对于序列中的每个位置 \(pos\)
和维度索引 \(i\) ,位置编码如下:
对于偶数索引 \(2i\) : \[
PE_{(pos, 2 i)} = sin(\frac{ {pos}}{10000 ^ {\frac{2i}{d_{model}}}})
\]
对于奇数索引 \(2i + 1\) : \[
PE_{(pos, 2 i + 1)} = cos(\frac{pos}{10000 ^ {\frac{2i}{d_{model}}}})
\]
这说明了对于不同维度的特征,其位置编码有不一样的频率。
为什么使用正弦函数进行位置编码:
正弦函数具有周期性,可以很好地捕获一些自然语言的周期性问题与重复出现的模式;
事实上在自然语言中,很多语法结构是重复出现的。
注意力机制
注意力机制是 Transformer
的核心组件之一,其允许模型在处理序列的每个元素时动态地聚焦于序列中的其他部分 ,这种方式对于理解序列内各元素之间的复杂关系尤其有效。
自注意力机制
自注意力(Self-Attention) 是一种特殊形式的注意力机制,使模型能够在处理序列的每一个元素时,考虑到序列中的所有元素。对于给定的输入序列,自注意力允许每一个输出在生成时加权引入输入序列中所有位置的信息。
在自注意力机制中,每一个输入会被映射到三个不同的向量,它们通常由学习得到的权重矩阵生成:
查询向量(Query) \[
q_0 = x_0 W_{xq}
\]
键向量(Key) \[
k_0 = x_0 W_{xk}
\]
值向量(Value) \[
v_0 = x_0 W_{xv}
\]
对于序列中的每一个元素,自注意力的计算过程可以分解为以下步骤:
点积计算 :计算查询向量与键向量的点积,得到一个分数,表示该元素与序列中每个元素的兼容性或关联程度。
缩放 :通常将点积的结果除以一个缩放因子(通常是键向量维度的平方根),以避免梯度消失或爆炸。
Softmax 归一化 :应用 Softmax
函数将这些分数转换为概率,用于确定每个元素应该赋予多少关注度。
加权和 :利用 Softmax
的输出作为权重,结合值向量计算加权和,得到自注意力的输出。
如上图,计算出所有加权后,我们就可以计算出自注意力的输出: \[
\hat x_i = \sum_{j} \hat a_i ^ j v_j
\] 自注意力机制的矩阵表达: \[
Z = softmax(\frac{Q K ^ T}{\sqrt{d_k}}) V
\] 其中: \[
Q = [q_0 ^ T \ q_1 ^ T\ ... \ q_n ^ T] ^ T
\]
\[
K = [k_0 ^ T \ k_1 ^ T\ ... \ k_n ^ T] ^ T
\]
\[
V = [v_0 ^ T \ v_1 ^ T\ ... \ v_n ^ T] ^ T
\]
\[
Z = [\hat x_0 ^ T \ \hat x_1 ^ T\ ... \ \hat x_n ^ T] ^ T
\]
多头注意力机制
多头注意力(Multi-Head
Attention) 是将自注意力扩展为多个并行的头部。它允许模型在不同的子空间中并行地学习输入之间的不同表示。每个头部执行上述自注意力计算,但是会有不同的权重矩阵。
对于不同的权重矩阵 \(W_{xq}^{(i)}\) 、\(W_{xk}^{(i)}\) 、\(W_{xv}^{(i)}\) ,可以计算出不同的输出 \(Z_i\) 。
假设我们有一共 8 个头部,即 24 个权重矩阵,则我们一共可以得到 8
个输出。
接下来我们考虑将 8 个头部的输出矩阵进行拼接,然后右乘一个权重矩阵
\(W_O\) 即可得到最后的输出。
\[
Z = [Z_0 \ Z_1 \ ... Z_7] W_O
\]
通过这种方式,模型可以捕获数据在不同表示空间中的不同特征 ,并且可以在不同级别的抽象上理解信息 。
多头注意力机制的优势
并行计算 :注意力机制计算每个元素的输出时,不依赖于其他元素的输出,因此可以高效地并行处理。
捕捉长距离依赖 :自注意力可以直接计算序列内任意两个元素之间的交互,不受它们在序列中位置距离的影响,这对于捕捉长距离依赖关系非常有效。
灵活的关注焦点 :模型可以学习在不同的上下文中关注序列的不同部分,这种动态的关注机制对于理解和生成语言至关重要。
解码器中的多头注意力机制
添加掩码的多头注意力
Masked Multi-Head Attention
是解码器(Decoder)部分的核心组件,它使得解码器能够在生成序列时只关注到当前位置之前的输出,而不是之后的输出,从而防止信息的提前泄露。
在具体操作中,解码器的每个时间步都会生成一个掩码(mask),用于屏蔽(mask
out)那些不应被当前位置所“看到”的未来位置。然后在计算 Softmax
函数前,这个掩码会被加到注意力对数分数上(即,对应未来位置的分数变成非常大的负数)。这样经过Softmax
激活函数后,这些位置的注意力权重接近于零,确保模型不会考虑未来的单词。
举个简单的例子,假设在某一时间步,我们正在尝试生成一个句子的第三个词,Masked
Attention
会确保注意力机制只会考虑第一个词和第二个词,而不是之后的词。掩码矩阵可能如下所示:
\[
\begin{bmatrix}
0 & -\infty & -\infty & -\infty \\
0 & 0 & -\infty & -\infty \\
0 & 0 & 0 & -\infty \\
0 & 0 & 0 & 0 \\
\end{bmatrix}
\] 这个矩阵会与注意力对数分数相加,确保在进行 Softmax
时,每一行的未来位置的权重都接近于零。
编码器-解码器注意力
编码器-解码器注意力层在解码器中扮演着关键角色,它允许解码器聚焦编码器的输出。这个注意力层作为桥梁将编码器的信息传递给解码器。解码器通过这个层来查看编码器的输出,并结合自身已生成的部分翻译来预测下一个单词。
这一层的多头注意力机制使用解码器的输出作为查询(Query) ,而将编码器的输出作为键(Key)和值(Value) 。通过计算查询与键的相似度,得到一个注意力权重,然后用这个权权重来加权对应的值,生成这一层的输出。
层归一化与残差连接
Transformer 架构中使用了层归一化(Layer
Normalization) 和残差连接(Residual
Connection) 来促进深度网络的训练。这两种技术都是为了解决训练深层神经网络时常遇到的问题,比如梯度消失和梯度爆炸。
层归一化
层归一化是一种在训练深度神经网络时常用的技术,其目的是稳定神经网络的学习过程。层归一化的工作原理是对单个样本的所有激活进行归一化 ,这和批量归一化(Batch
Normalization)不同,后者是对同一层中不同样本的同一激活进行归一化。层归一化可以使得网络层的输出更加稳定。
具体步骤
对于一个具体的样本,在网络的每一层(通常在非线性激活函数前),层归一化按以下步骤进行:
计算所有激活值 \(x_i\) 的均值 \(\mu\) 和方差 \(\sigma ^ 2\)
对每个 \(x_i\) 进行归一化:\(x_i := \frac{x_i - \mu}{\sqrt{\sigma ^ 2 +
\epsilon}}\) (这里的 \(\epsilon\)
是一个很小的正实数,用于防止出现分母为 0 的情况)
应用可学习的参数 \(\gamma\) 和
\(\beta\)
来缩放和位移归一化后的值:\(x_i := \gamma x_i
+ \beta\)
残差连接
残差连接,也称作跳跃连接(Skip
Connection),是指在网络的某一层上将输入直接加到该层的输出上。它允许梯度直接流过网络,这有助于训练过程中更有效地传播梯度,减轻梯度消失的问题。
在 Transformer 中,残差连接的数学表达式可以写为: \[
Output = LayerNorm(Input + Sublayer(Input))
\] 这里的 \(Sublayer()\)
指多头注意力层或者前馈神经网络层的输出。\(LayerNorm()\)
则表示层归一化。通过这种方式,即使网络非常深,输入信息也可以在网络中更远地传播。
前馈神经网络
前馈神经网络(Feed-Forward
Network,FFN) 是每个编码器和解码器层的主要组成部分之一。
Transformer 中的 FNN 由两个全连接层组成,中间有一个 ReLU
激活函数,数学表达式如下: \[
FNN(x) = max(0, xW_1 + b_1) W_2 + b_2
\] 前馈神经网络为 Transformer
模型提供了非线性处理能力。虽然自注意力层非常擅长处理输入序列中元素之间的依赖关系,但它本质上是一个线性操作 。前馈神经网络通过非线性变换,增加了模型的表达能力。
除了使用两个线性层,这里也可以使用两个一维卷积层实现同样的功能:
\[
FFN(x) = max(0, x * w_1 + b_1) * w_2 + b_2
\]
代码实现
组件一:注意力机制
缩放点积注意力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import torchfrom torch import nn d_k = 64 d_v = 64 class ScaledDotProductAttention (nn.Module): def __init__ (self ): super (ScaledDotProductAttention, self).__init__() def forward (self, Q, K, V, attn_mask ): ''' Q(..., n_seq, d_k) K(..., n_seq, d_k) V(..., n_seq, d_v) attn_mask(..., n_seq, n_seq) ''' scores = torch.matmul(Q, K.transpose(-2 , -1 )) / d_k ** 0.5 scores.masked_fill_(attn_mask, -1e9 ) weights = torch.softmax(scores, dim=-1 ) context = torch.matmul(weights, V) return context, weights ''' context(..., n_seq, d_v) weights(..., n_seq, n_seq) '''
多头注意力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import torchfrom torch import nnfrom ScaledDotProductAttention import * d_embedding = 512 n_heads = 8 batch_size = 3 class MultiHeadAttention (nn.Module): def __init__ (self ): super (MultiHeadAttention, self).__init__() self.W_Q = nn.Linear(d_embedding, d_k * n_heads) self.W_K = nn.Linear(d_embedding, d_k * n_heads) self.W_V = nn.Linear(d_embedding, d_v * n_heads) self.linear = nn.Linear(d_v * n_heads, d_embedding) self.layer_norm = nn.LayerNorm(d_embedding) def forward (self, x1, x2, x3, attn_mask ): ''' x1, x2, x3(batch_size, n_seq, d_embedding) attn_mask(batch_size, n_seq, n_seq) ''' residual, batch_size = x1, x1.size(0 ) q_s = self.W_Q(x1).view(batch_size, -1 , n_heads, d_k).transpose(1 , 2 ) k_s = self.W_K(x2).view(batch_size, -1 , n_heads, d_k).transpose(1 , 2 ) v_s = self.W_V(x3).view(batch_size, -1 , n_heads, d_k).transpose(1 , 2 ) ''' W_Q(x1), W_K(x2) (batch_size, n_seq, d_k * n_heads) q_s, k_s(batch_size, n_heads, n_seq, d_k) W_V(x3) (batch_size, n_seq, d_v * n_heads) v_s(batch_size, n_heads, n_seq, d_v) ''' attn_mask = attn_mask.unsqueeze(1 ).repeat(1 , n_heads, 1 , 1 ) context, weights = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask) ''' context(batch_size, n_heads, n_seq, d_v) weights(batch_size, n_heads, n_seq, n_seq) ''' context = context.transpose(1 , 2 ).contiguous().view(batch_size, -1 , n_heads * d_v) output = self.linear(context) output = self.layer_norm(output + residual) return output, weights ''' output(batch_size, n_seq, d_embedding) weights(batch_size, n_heads, n_seq, n_seq) '''
组件二:前馈神经网络
使用线性全连接层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import torchfrom torch import nn d_mid = 2048 class FFN (nn.Module): def __init__ (self ): super (FFN, self).__init__() self.linear1 = nn.Linear(d_embedding, d_mid) self.linear2 = nn.Linear(d_mid, d_embedding) self.layer_norm = nn.LayerNorm(d_embedding) def forward (self, inputs ): residual = inputs output = torch.relu(self.linear1(inputs)) output = self.linear2(output) output = self.layer_norm(output + residual) return output
使用一维卷积层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import torchfrom torch import nn d_mid = 2048 class FFN (nn.Module): def __init__ (self ): super (FFN, self).__init__() self.conv1 = nn.Conv1d(d_embedding, d_mid, kernel_size=1 ) self.conv2 = nn.Conv1d(d_mid, d_embedding, kernel_size=1 ) self.layer_norm = nn.LayerNorm(d_embedding) def forward (self, inputs ): residual = inputs output = torch.relu(self.conv1(inputs.transpose(1 , 2 ))) output = self.conv2(output).transpose(1 , 2 ) output = self.layer_norm(output + residual) return output
组件三:正弦位置编码表
1 2 3 4 5 6 7 8 9 10 11 12 def get_sin_enc_table (n_position, d_embedding ): sinusoid_table = torch.zeros(n_position, d_embedding, dtype=torch.float64) for pos_i in range (n_position): for hid_j in range (d_embedding): sinusoid_table[pos_i, hid_j] = pos_i / (10000 ** ((hid_j & ~1 ) / d_embedding)) sinusoid_table[:, 0 ::2 ] = torch.sin(sinusoid_table[:, 0 ::2 ]) sinusoid_table[:, 1 ::2 ] = torch.cos(sinusoid_table[:, 1 ::2 ]) return sinusoid_table ''' sinusoid_table(n_position, d_embedding) '''
组件四:填充位置掩码生成函数
填充位置掩码是 Transformer
中一种特殊的方法,用来防止对填充部分进行学习。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def get_attn_pad_mask (seq_q, seq_k ): ''' len_q == len_k seq_q(batch_size, n_seq) seq_k(batch_size, n_seq) ''' batch_size, len_q = seq_q.size() batch_size, len_k = seq_k.size() pad_attn_mask = seq_k.data.eq(0 ).unsqueeze(1 ) pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) return pad_attn_mask ''' pad_attn_mask(batch_size, n_seq, n_seq) '''
组件五:编码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import torchfrom torch import nnfrom MultiHeadAttention import *from FFN import *from functions import *class EncoderLayer (nn.Module): def __init__ (self ): super (EncoderLayer, self).__init__() self.enc_self_attn = MultiHeadAttention() self.ffn = FFN() def forward (self, enc_inputs, enc_self_attn_mask ): ''' enc_inputs(batch_size, n_seq, d_embedding) enc_self_attn_mask(batch_size, n_seq, n_seq) ''' enc_outputs, attn_weights = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) enc_outputs = self.ffn(enc_outputs) return enc_outputs, attn_weights ''' enc_outputs(batch_size, n_seq, d_embedding) attn_weights(batch_size, n_heads, n_seq, n_seq) ''' n_layers = 6 class Encoder (nn.Module): def __init__ (self, corpus ): super (Encoder, self) self.src_emb = nn.Embedding(len (corpus.src_vocab), d_embedding) self.pos_emb = nn.Embedding.from_pretrained(get_sin_enc_table(corpus.src_len + 1 , d_embedding=d_embedding), freeze=True ) self.layers = nn.ModuleList([EncoderLayer() for _ in range (n_layers)]) def forward (self, enc_inputs ): pos_indices = torch.arange(1 , enc_inputs.size(1 ) + 1 ).unsqueeze(0 ).to(enc_inputs) enc_outputs = self.src_emb(enc_inputs) + self.pos_emb(pos_indices) enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) enc_self_attn_weights = [] for layer in self.layers: enc_outputs, enc_self_attn_weight = layer(enc_outputs, enc_self_attn_mask) enc_self_attn_weights.append(enc_self_attn_weight) ''' enc_outputs(batch_size, n_seq, d_embedding) enc_self_attn_weight(batch_szie, n_heads, n_seq, n_seq) ''' return enc_outputs, enc_self_attn_weights
组件六:后续位置掩码
前文已经提及,在序列生成任务中,解码器每个时间步都依赖于前面已生成的部分序列,后续位置掩码是为了防止当前位置依赖未来的信息。
1 2 3 4 5 def get_attn_subsequent_mask (seq ): attn_shape = [seq.size(0 ), seq.size(1 ), seq.size(1 )] subsequent_mask = torch.triu(torch.ones(attn_shape)).bool () return subsequent_mask
组件七:解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import torchfrom torch import nnfrom MultiHeadAttention import *from FFN import *from functions import *class DecoderLayer (nn.Module): def __init__ (self ): super (DecoderLayer, self).__init__() self.dec_self_attn = MultiHeadAttention() self.dec_enc_attn = MultiHeadAttention() self.ffn = FFN() def forward (self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask ): ''' dec_inputs, enc_outputs(batch_size, n_seq, d_embedding) dec_self_attn_mask, dec_enc_attn_mask(batch_size, n_seq, n_seq) ''' dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask) ''' dec_outputs(batch_size, n_seq, d_embedding) dec_self_attn(batch_size, n_heads, n_seq, n_seq) ''' dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask) dec_outputs = self.ffn(dec_outputs) ''' dec_outputs(batch_size, n_seq, d_embedding) dec_enc_attn(batch_size, n_heads, n_seq, n_seq) ''' return dec_outputs, dec_self_attn, dec_enc_attn n_layers = 6 class Decoder (nn.Module): def __init__ (self, corpus ): super (Decoder, self).__init__() self.tgt_emb = nn.Embedding(len (corpus.tgt_vacab), d_embedding) self.pos_emb = nn.Embedding.from_pretrained(get_sin_enc_table(corpus.tgt_len + 1 , d_embedding=d_embedding), freeze=True ) self.layers = nn.ModuleList([DecoderLayer() for _ in range (n_layers)]) def forward (self, dec_inputs, enc_inputs, enc_outputs ): ''' dec_inputs, enc_inputs, enc_outputs(batch_size, n_seq) ''' pos_indices = torch.arange(1 , dec_inputs.size(1 ) + 1 ).unsqueeze(0 ).to(dec_inputs) dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(pos_indices) dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs) dec_self_attn_mask = torch.gt(dec_self_attn_pad_mask + dec_self_attn_subsequent_mask, 0 ) dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) dec_self_attns, dec_enc_attns = [], [] for layer in self.layers: dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask) dec_self_attns.append(dec_self_attn) dec_enc_attns.append(dec_enc_attn) return dec_outputs, dec_self_attns, dec_enc_attns
完整组合
有了以上铺垫,我们就可以实现完整的 Transformer 框架了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import torchfrom torch import nnfrom MultiHeadAttention import *from FFN import *from functions import *from Encoder import *from Decoder import *class Transformer (nn.Module): def __init__ (self, corpus ): super (Transformer, self).__init__() self.encoder = Encoder(corpus=corpus) self.decoder = Decoder(corpus=corpus) self.proj = nn.Linear(d_embedding, len (corpus.tgt_vacab), bias=False ) def forward (self, enc_inputs, dec_inputs ): ''' enc_inputs, dec_inputs(batch_size, n_seq) ''' enc_outputs, enc_self_attns = self.encoder(enc_inputs) ''' enc_outputs(batch_size, n_seq, d_embedding) enc_self_attns(n_layers, batch_size, n_heads, n_seq, n_seq) ''' dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs) ''' dec_outputs(batch_size, n_seq, d_embedding) dec_self_attns, dec_enc_attns(n_layers, n_heads, batch_size, n_seq, n_seq) ''' dec_logits = self.proj(dec_outputs) return dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns
定义语料库类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 from collections import Counter import jieba class TranslationCorpus : def __init__ (self, sentences ): self.sentences = sentences self.src_sen = [' ' .join(jieba.cut(sentence[0 ])) for sentence in sentences] self.tgt_sen = [sentence[1 ] for sentence in sentences] self.src_len = max (len (s.split()) for s in self.src_sen) + 1 self.tgt_len = max (len (s.split()) for s in self.tgt_sen) + 2 self.src_vocab, self.tgt_vocab = self.create_vocabularies() self.src_idx2word = {v: k for k, v in self.src_vocab.items()} self.tgt_idx2word = {v: k for k, v in self.tgt_vocab.items()} def create_vocabularies (self ): src_counter = Counter(w for sen in self.src_sen for w in sen.split()) tgt_counter = Counter(w for sen in self.tgt_sen for w in sen.split()) src_vocab = {'<pad>' : 0 , **{word: i + 1 for i, word in enumerate (src_counter)}} tgt_vocab = {'<pad>' : 0 , '<sos>' : 1 , '<eos>' : 2 , **{word: i + 3 for i, word in enumerate (tgt_counter)}} return src_vocab, tgt_vocab def make_batch (self, batch_size, test_batch=False ): input_batch, output_batch, target_batch = [], [], [] sentence_indices = torch.randperm(len (self.sentences))[:batch_size] for index in sentence_indices: src_sentence, tgt_sentence = self.src_sen[index], self.tgt_sen[index] src_seq = [self.src_vocab[word] for word in src_sentence.split()] tgt_seq = [self.tgt_vocab['<sos>' ]] + [self.tgt_vocab[word] for word in tgt_sentence.split()] + [self.tgt_vocab['<eos>' ]] src_seq += [self.src_vocab['<pad>' ]] * (self.src_len - len (src_seq)) tgt_seq += [self.tgt_vocab['<pad>' ]] * (self.tgt_len - len (tgt_seq)) input_batch.append(src_seq) output_batch.append([self.tgt_vocab['<sos>' ]] + ([self.tgt_vocab['<pad>' ]] * (self.tgt_len - 2 )) if test_batch else tgt_seq[:-1 ]) target_batch.append(tgt_seq[1 :]) input_batch = torch.LongTensor(input_batch) output_batch = torch.LongTensor(output_batch) target_batch = torch.LongTensor(target_batch) return input_batch, output_batch, target_batch
训练
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 sentences = [ ['我' , 'I' ], ['我是' , 'I am' ], ['中国人' , 'Chinese' ], ['美国人' , 'American' ], ['苹果' , 'apples' ], ['香蕉' , 'bananas' ], ['菠萝' , 'pineapples' ], ['我和中国人' , 'Chinese and I' ], ['我和美国人' , 'American and I' ], ['中国人和美国人' , 'Chinese and American' ], ['美国人喜欢苹果' , 'American like apples' ], ['中国人喜欢菠萝' , 'Chinese like pineapples' ], ['我是中国人' , 'I am Chinese' ], ['我是美国人' , 'I am American' ], ['我喜欢苹果' , 'I like apples' ], ['我喜欢香蕉' , 'I like bananas' ], ['我喜欢菠萝' , 'I like pineapples' ] ] corpus = TranslationCorpus(sentences) import torch.optim as optim device = "cuda" if torch.cuda.is_available() else "cpu" model = Transformer(corpus).to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.0001 ) for epoch in range (100 ): optimizer.zero_grad() enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size) enc_inputs, dec_inputs, target_batch = enc_inputs.to(device), dec_inputs.to(device), target_batch.to(device) outputs, _, _, _ = model(enc_inputs, dec_inputs) loss = criterion(outputs.view(-1 , len (corpus.tgt_vocab)), target_batch.view(-1 )) if (epoch + 1 ) % 20 == 0 : print (f"Epoch: {epoch + 1 :04d} cost = {loss:.6 f} " ) loss.backward() optimizer.step()
输出 :
1 2 3 4 5 Epoch: 0020 cost = 0.566277 Epoch: 0040 cost = 0.339869 Epoch: 0060 cost = 0.029208 Epoch: 0080 cost = 0.006405 Epoch: 0100 cost = 0.004169
解码器输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size=1 ,test_batch=True ) enc_inputs, dec_inputs, target_batch = enc_inputs.to(device), dec_inputs.to(device), target_batch.to(device)print ("编码器输入:" , enc_inputs) print ("解码器输入:" , dec_inputs) print ("目标数据:" , target_batch) predict, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs) predict = predict.view(-1 , len (corpus.tgt_vocab)) predict = predict.data.max (1 , keepdim=True )[1 ] translated_sentence = [corpus.tgt_idx2word[idx.item()] for idx in predict.squeeze()] input_sentence = [corpus.src_idx2word[idx.item()] for idx in enc_inputs[0 ]]print (input_sentence, '->' , translated_sentence)
输出结果 :
1 2 3 4 编码器输入: tensor([[ 1, 10, 7, 0, 0, 0]]) 解码器输入: tensor([[1, 0, 0, 0]]) 目标数据: tensor([[ 3, 11, 8, 2]]) ['我', '喜欢', '香蕉', '<pad>', '<pad>', '<pad>'] -> ['I', 'I', 'I', 'I']
这里只生成了一个 token,因为还没有使用到 Transformer
解码层的自回归机制。
生成式解码
在 Transformer
模型中,我们通过最大化预测正确词的概率来优化模型。而在推理的过程中,我们可以以选择概率最大的词作为下一个词,这就是我们常说的贪心搜索。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def greedy_decoder (model, enc_input, start_symbol, steps=5 ): enc_outputs, enc_self_attns = model.encoder(enc_input) dec_input = torch.zeros(1 , steps).type_as(enc_input) next_symbol = start_symbol for i in range (0 , steps): dec_input[0 , i] = next_symbol if next_symbol == corpus.tgt_vocab['<eos>' ]: break dec_output, _, _ = model.decoder(dec_input, enc_input, enc_outputs) proj = model.projection(dec_output) prob = proj.squeeze(0 ).max (dim=-1 , keepdim=False )[1 ] next_word = prob.data[i] next_symbol = next_word.item() final_output = dec_input return final_output
1 2 3 4 5 6 7 8 9 10 enc_inputs, dec_inputs, target_batch = corpus.make_batch(batch_size=1 , test_batch=True ) enc_inputs, dec_inputs, target_batch = enc_inputs.to(device), dec_inputs.to(device), target_batch.to(device)print ("编码器输入:" , enc_inputs) print ("解码器输入:" , dec_inputs) print ("目标数据:" , target_batch) greedy_dec_output = greedy_decoder(model, enc_inputs, start_symbol=corpus.tgt_vocab['<sos>' ], steps=corpus.tgt_len) greedy_dec_output_words = [corpus.tgt_idx2word[n.item()] for n in greedy_dec_output.squeeze()] enc_inputs_words = [corpus.src_idx2word[code.item()] for code in enc_inputs[0 ]]print (enc_inputs_words, '->' , greedy_dec_output_words)
输出 :
1 2 3 4 编码器输入: tensor([[1, 6, 7, 0, 0]]) 解码器输入: tensor([[1, 0, 0, 0]]) 目标数据: tensor([[3, 7, 8, 2]]) ['我', '喜欢', '苹果', '<pad>', '<pad>'] -> ['<sos>', 'I', 'like', 'apples', '<eos>']
测试
接下来我们现在找个训练集没有出现的数据来测试模型的泛化能力:
1 2 3 4 5 6 7 8 test_enc_inputs = [corpus.src_vocab[w] for w in list (jieba.cut('美国人喜欢香蕉' ))] test_enc_inputs = torch.LongTensor(test_enc_inputs + (corpus.src_len - len (test_enc_inputs)) * [corpus.src_vocab['<pad>' ]]).unsqueeze(0 ) dec_inputs = torch.LongTensor([corpus.tgt_vocab['<sos>' ]] + (corpus.tgt_len - 1 ) * [corpus.tgt_vocab['<pad>' ]]).unsqueeze(0 ) greedy_test_dec_output = greedy_decoder(model, test_enc_inputs, start_symbol=corpus.tgt_vocab['<sos>' ], steps=corpus.tgt_len) ans = [corpus.tgt_idx2word[idx.item()] for idx in greedy_test_dec_output[0 ]]print (ans)
输出 :
1 ['<sos>', 'American', 'like', 'bananas', '<eos>']
NLP 领域
自从 Transformer 架构在 2017
年被提出后,大型语言模型的发展经历了显著的变革和快速的进步。
2018 年,BERT(Bidirectional Encoder Representations from
Transformers) 由 Google 提出,BERT 是基于 Transformer
的首个重要应用。
2018 - 2020 年,OpenAI 发布了 GPT(Generative Pretrained
Transformer) 系列模型,包括 GPT-2 和 GPT-3。GPT-3
特别因其巨大的规模(1750亿个参数)和强大的生成能力而受到关注。
2021 - 2023 年,模型如 GPT-4 和 Google 的 PaLM(Pathways Language
Model)通过扩大规模和改进训练技巧,进一步提高了性能。
...
视觉领域
Transformer
最初是为自然语言处理(NLP)任务设计的,但其核心思想—基于注意力机制的模型架构—也被发现对于处理视觉信息非常有效。因此,Transformer
在机器视觉领域的应用也变得越来越广泛。
ViT
Google Research 在 2020 年推出了 Vision
Transformer(ViT) ,这是首次将 Transformer
完全应用于图像识别任务。ViT
将图像分割成固定大小的图像块,将每个图像块展平并映射到一个高维空间(就像NLP中的词嵌入),然后在这些块上应用标准的
Transformer 模型。ViT
在多个图像识别基准测试中表现出色,与当时的卷积神经网络(CNN)模型相比,它在一些任务上达到了更好的性能。
MAE
MAE(Masked
Autoencoders)技术是一种自监督学习方法,它是通过预训练一个神经网络以重建被随机遮挡(mask)的输入数据的方法来学习数据的有效表示。这个技术通常与自编码器(Autoencoder)相关,自编码器是一个试图通过较小的隐藏层来重构输入的神经网络结构。MAE
通过随机遮挡输入数据的一部分,然后让网络预测这些遮挡部分的原始内容,从而迫使模型学习到数据的内在结构和模式。
MAE
技术可以应用于多种类型的数据,包括图像、文本、声音等。在视觉任务中,MAE
的一个例子是,输入一张图像,然后在图像上随机选择一些像素或区域并将其遮挡,随后模型的任务是预测这些遮挡区域的像素值。这种方法迫使模型学习到图像的低级特征(如边缘和纹理)以及更高级的概念(如对象的部分和整体结构),因为要正确重建被遮挡的部分,模型需要理解其周围的上下文。