贝叶斯网络,k-means 和transformer

PART1

手写数字识别

本部分通过贝叶斯网络实现手写数字的识别

实验内容

  • fit函数

    Pixels中每一行代表一个图像,每一列代表一个像素值。Labels是一个形状为(n_samples,)的向量,其中每个元素都是相应图像的类标签。

    该方法通过计算每个类别的先验概率和给定每个类别的每个像素值的条件概率,将模型与数据拟合。它使用类的两个属性:self.labels_prior 和 self.pixels_cond_label。前者是一个形状为(n_classes,)的向量,用于存储每个类别的先验概率。后者是一个形状为(n_pixels, n_values, n_classes)的矩阵,存储了给定每个类别的每个像素值的条件概率。该方法通过计算数据中每个类别和每个像素值的频率并除以适当的分母来更新这些属性。

    def fit(self, pixels, labels):
        n_samples = len(labels)
        for label in labels:
            self.labels_prior[label] += 1
            self.labels_prior /= n_samples
            for i in range(n_samples):
                pixel_values = pixels[i]
                label = labels[i]
                for pixel in range(self.n_pixels):
                    self.pixels_cond_label[pixel][pixel_values[pixel]][label] += 1
                    for pixel in range(self.n_pixels):
                        for value in range(self.n_values):
                            self.pixels_cond_label[pixel][value] /= self.labels_prior
  • predict函数

    该方法返回一个形状为(n_samples,)的向量,其中每个元素都是相应图像的预测类别标签。通过找到后验概率最大的类别来获得预测结果。通过拟合方法计算self.labels_prior和self.pixels_cond_label属性。该方法对每幅图像和每个类别进行循环,并通过将先验概率的对数与给定类别的每个像素值的条件概率的对数相加来计算后验概率。然后将后验概率与当前最大值进行比较,如果后验概率大于当前最大值,则更新预测标签。然后将预测标签分配给输出向量。

    def predict(self, pixels):
          n_samples = len(pixels)
          labels = np.zeros(n_samples)
          for i in range(n_samples):
              pixel_values = pixels[i]
              max_posterior = float("-inf")
              predicted_label = None
              for label in range(self.n_labels):
                  posterior = np.log(self.labels_prior[label])
                  for pixel in range(self.n_pixels):
                      posterior += np.log(self.pixels_cond_label[pixel][pixel_values[pixel]][label])
                  if posterior > max_posterior:
                      max_posterior = posterior
                      predicted_label = label
              labels[i] = predicted_label
          return labels

图片压缩

本部分使用k-means实现图片压缩,并在不同的参数下实现不同的压缩效果。

实验原理

图片压缩的原理是将图片中的像素分成几个类别,每个类别用一个代表颜色来表示,从而减少图片中的颜色数目,降低文件大小。步骤如下:

  • 使用RGB颜色空间表示图像的颜色,将图片展平为一个二维数组,每一行代表一个像素,每一列代表一个颜色通道。
  • 对展平的图片数组应用K-means聚类算法,K代表压缩后的图片中想要的颜色数目。算法会根据像素的RGB值将相似的像素分到一起,并给每个类别分配一个平均的RGB值。其中K的值决定了压缩的程度。
  • 将原始图片中的每个像素替换为其所属类别的平均RGB值。这样就得到了一个颜色数目更少,但是外观相似的图片。

实验内容

  • assign_points函数

    主要流程:

    • 遍历每个样本点,计算它与每个聚类中心的欧几里得距离,使用np.linalg.norm函数。
    • 找出距离最小的聚类中心的索引,使用np.argmin函数,将其作为该样本点的聚类标签,存入labels数组中。
    • 最后更新labels数组。
    def assign_points(self, centers, points):
            n_samples, n_dims = points.shape
            labels = np.zeros(n_samples)
            for i in range(n_samples):
                distances = np.linalg.norm(points[i] - centers, axis=1)
                labels[i] = np.argmin(distances)
            return labels
  • uptdate_centers函数

    主要流程

    • 遍历每个聚类中心,找出所有属于该聚类的样本点。
    • 如果该聚类有样本点,则计算这些样本点在每个维度上的平均值作为新的聚类中心。
    • 如果该聚类没有样本点,则保持原来的聚类中心不变。
    def update_centers(self, centers, labels, points):
            for k in range(self.k):
                cluster_points = points[labels == k]
                if len(cluster_points) > 0:
                    centers[k] = cluster_points.mean(axis=0)
            return centers
  • fit函数

    主要流程:

    • 调用initialize_centers方法,随机初始化聚类中心centers。
    • 进行max_iter次循环
      • 调用assign_points方法,根据当前的聚类中心centers,将每个样本点分配到最近的聚类中,得到每个样本点的聚类标签labels。
      • 调用update_centers方法,根据新的点分配labels,更新聚类中心centers。
    • 返回最终的聚类中心centers。
    def fit(self, points):
            centers = self.initialize_centers(points)
            for _ in range(self.max_iter):
                labels = self.assign_points(centers, points)
                centers = self.update_centers(centers, labels, points)
            return centers
  • compress函数

    主要流程

    • 将图片的像素值展平为一个RGB二维数组。
    • 调用fit方法,对展平的像素点进行K-means聚类,得到聚类中心centers。
    • 对于每个像素点,计算它与每个聚类中心的欧几里得距离,找出距离最小的聚类中心的索引。
    • 用距离最小的聚类中心的颜色值替换原来的像素值,得到压缩后的像素点。
    • 像素点重塑为原来的图片形状,得到压缩后的图片。
    def compress(self, img):
            points = img.reshape((-1, img.shape[-1]))
            centers = self.fit(points)
            compressed_img = centers[np.argmin(np.linalg.norm(points[:, np.newaxis] - centers, axis=-1), axis=-1)]
            compressed_img = compressed_img.reshape(img.shape)
            return compressed_img

PART 2

Transformer

实验内容

char_tokenizer

实现了一个基于字符的分词器。将一个字符串转换为一个整数列表,或者将一个整数列表转换为一个字符串。

具体功能包含以下部分:

  • 初始化__init__,计算语料库中的不同字符的个数,并创建一个字典,将每个字符映射到一个唯一的整数。

    def __init__(self, corpus: List[str]):
            self.corpus = corpus
            self.vocab = {char: i for i, char in enumerate(sorted(list(set(''.join(corpus)))))}
            self.n_vocab = len(self.vocab)
  • 编码encode,编码文本,将字符串中的每个字符替换为字典中对应的整数,并返回一个整数列表。

    def encode(self, string: str):
            encode_res=[self.vocab[c] for c in string]
            return encode_res
  • 解码decode,解码编码。将整数列表中的每个整数替换为字典中对应的字符,并返回一个字符串。

    def decode(self, codes: List[int]):
            decode_res=''.join([list(self.vocab.keys())[list(self.vocab.values()).index(c)] for c in codes])
            return decode_res

自注意力是一种用于捕捉序列中的长距离依赖关系的机制,它通过计算序列中每个元素与其他元素的相关性来生成一个加权平均的输出。本部分实现了一个注意力单头。

具体功能包含以下部分:

  • init

    创建三个线性层,分别称为Key,Query和Value,它们都将输入维度n_embd映射到输出维度head_size。

    def __init__(self, head_size):
        super().__init__()
        self.Key = nn.Linear(n_embd, head_size)
        self.Query = nn.Linear(n_embd, head_size)
        self.Value = nn.Linear(n_embd, head_size)
        self.head_size = head_size
        self.register_buffer("tril", torch.tril(torch.ones(block_size, block_size)))
  • forward函数

    • 调用Key,Query和Value对inputs进行线性变换。
    • 计算query和key张量的点积,并在最后两个维度上转置key张量,得到scores张量。
    • 对scores张量进行掩码操作,将其上三角部分(包括对角线)填充为负无穷大。
    • 在最后一个维度上对scores张量进行softmax操作,得到weights张量,表示每个元素对其他元素的注意力权重。
    • 通过weights和value计算得到out张量,表示每个元素的加权平均输出。
    def forward(self, inputs):
            key = self.Key(inputs)
            query = self.Query(inputs)
            value = self.Value(inputs)
            scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_size)
            mask = torch.tril(torch.ones_like(scores)).bool()
            scores = scores.masked_fill(mask == 0, float('-inf'))
            weights = F.softmax(scores, dim=-1)
            out = torch.matmul(weights, value)
            return out
MultiHead_Attention

多头自注意力是一种将多个单头自注意力的输出拼接起来,并进行线性变换的机制。本部分在Head的基础上实现了MultiHead。

  • init函数

    • 创建一个模块列表,包含n_heads个Head对象。

    • 创建一个线性层,将n_heads * head_size映射到n_embd

    def __init__(self, n_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for i in range(n_heads)])
        self.projection = nn.Linear(n_heads * head_size, n_embd)
  • forward函数

    • 对heads列表中的每个Head对象,调用其前向方法,对inputs进行单头自注意力,并将所有单头的输出在最后一个维度上拼接起来.
    • 调用projection层,对拼接后的张量进行线性变换,并返回。
    def forward(self, inputs):
        out = torch.cat([head(inputs) for head in self.heads], dim=-1)
        return self.projection(out)
Feedforward

前馈神经网络是一种将输入通过一系列的线性变换和非线性激活函数映射到输出的机制。

  • init函数

    创建一个顺序模块,包含两个线性层和一个ReLU激活函数。第一个线性层将n_embd映射到4n_embd,第二个线性层将4n_embd映射回n_embd。

    class FeedForward(nn.Module):
        def __init__(self, n_embd):
            super().__init__()
            Linear_1 = nn.Linear(n_embd, 4*n_embd)
            Linear_2 = nn.Linear(4*n_embd, n_embd)
            self.net = nn.Sequential(Linear_1,nn.ReLU(),Linear_2)
        def forward(self, inputs):
            return self.net(inputs)
Block

本部分将多头自注意力和前馈神经网络结合起来,并使用残差连接和层归一化的机制,实现序列到序列的映射和编码。

  • init函数

    创建两个层归一化层,一个多头自注意力模块和一个前馈神经网络模块

    def __init__(self, n_embd, n_heads):
        super().__init__()
        self.norm1 = nn.LayerNorm(n_embd)
        self.norm2 = nn.LayerNorm(n_embd)
        self.attention = MultiHeadAttention(n_heads, n_embd//n_heads)
        self.feed_forward = FeedForward(n_embd)
  • forward函数

    • 对inputs和多头自注意力模块的输出即相加,对第一个层归一化,得到attention_output。
    • 对attention_output和前馈神经网络模块的输出相加,对第二个层归一化得到outputs。
    def forward(self, inputs):
           attention_output = self.norm1(inputs+self.attention(inputs))
           outputs = self.norm2(attention_output+self.feed_forward(attention_output))
           return outputs
Transformer

借助以上的模块,可以构建Transformer模块。

  • init

    • 嵌入表,将词汇表中的每个单词映射到一个n_embd维的向量。
    • 归一化层,对输入进行归一化处理。
    • 线性层,将n_embd维的向量映射回词汇表中的每个单词。
    • 位置嵌入参数,表示每个位置的向量,用于增加位置信息。
    • 模块列表,包含n_layers个变换器的块。
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(n_vocab, n_embd)
        self.norm = nn.LayerNorm(n_embd)
        self.linear = nn.Linear(n_embd, n_vocab)
        self.position_embedding = nn.Parameter(torch.zeros(1, block_size, n_embd))
        self.blocks = nn.ModuleList([Block(n_embd, n_heads) for i in range(n_layers)])
  • forward

    • 得到每个单词的嵌入向量embedding。
    • 根据inputs的时间维度,截取位置嵌入参数中对应的部分,得到position_embedding张量,表示每个位置的嵌入向量。
    • 将embedding和position_embedding相加,得到attens张量.
    • 对attens列表中的每个变换器的块对象,调用forward方法,更新attens张量。
    • 对attens应用层归一化层。
    • 对attens应用线性层,得到logits张量,表示每个时间步每个单词的预测概率。
    def forward(self, inputs, labels=None):
        batch, time = inputs.shape
        embedding = self.embedding(inputs)
        position_embedding = self.position_embedding[:, :time]
        attens = embedding + position_embedding
        for block in self.blocks:
            attens = block(attens)
            attens = self.norm(attens)
            logits = self.linear(attens)
        if labels is None:
            loss = None
        else:
            batch, time, channel = logits.shape
            logits = logits.view(batch * time, channel)
            labels = labels.view(batch * time)
            loss = F.cross_entropy(logits, labels)
            return logits, loss
  • generate

    • forward对inputs进行预测,得到logits张量。
    • 对logits张量在最后一个时间取最大值,得到最可能的索引。
    • 并更新inputs张量。
    • 以上操作进行max_new_tokens次循环
    def generate(self, inputs, max_new_tokens):
            for i in range(max_new_tokens):
                logits, _ = self(inputs)
                logits = logits[:, -1:, :].argmax(dim=-1)
                inputs = torch.cat([inputs, logits], dim=1)
            return inputs