1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| import torch import torch.nn as nn import torch.nn.functional as F import math
class MoELayer(nn.Module): """Mixture of Experts层实现""" def __init__(self, d_model, num_experts, top_k=2, dropout=0.0): super().__init__() self.num_experts = num_experts self.top_k = top_k self.experts = nn.ModuleList([ nn.Sequential( nn.Linear(d_model, d_model * 4), nn.GELU(), nn.Dropout(dropout), nn.Linear(d_model * 4, d_model) ) for _ in range(num_experts) ]) self.gate = nn.Linear(d_model, num_experts, bias=False) self.alpha = 0.01 def forward(self, x): """ Args: x: [batch_size, seq_len, d_model] Returns: output: [batch_size, seq_len, d_model] aux_loss: 辅助损失(用于训练) """ batch_size, seq_len, d_model = x.shape x_flat = x.view(-1, d_model) gate_logits = self.gate(x_flat) gate_weights = F.softmax(gate_logits, dim=-1) top_k_weights, top_k_indices = torch.topk( gate_weights, self.top_k, dim=-1 ) top_k_weights = top_k_weights / top_k_weights.sum(dim=-1, keepdim=True) output = torch.zeros_like(x_flat) for i in range(batch_size * seq_len): for j in range(self.top_k): expert_idx = top_k_indices[i, j].item() expert_weight = top_k_weights[i, j] output[i] += expert_weight * self.experts[expert_idx](x_flat[i:i+1]) aux_loss = self._load_balancing_loss(gate_weights, top_k_indices) return output.view(batch_size, seq_len, d_model), aux_loss def _load_balancing_loss(self, gate_weights, top_k_indices): """ 负载均衡损失:鼓励专家被均匀选择 """ num_tokens = gate_weights.shape[0] expert_counts = torch.zeros(self.num_experts, device=x.device) for i in range(num_tokens): for j in range(self.top_k): expert_idx = top_k_indices[i, j].item() expert_counts[expert_idx] += 1 expert_probs = expert_counts / (num_tokens * self.top_k) avg_gate_prob = gate_weights.mean(dim=0) aux_loss = self.num_experts * torch.sum(avg_gate_prob * expert_probs) return aux_loss
|