vLLM高性能推理引擎:原理与实践

🎙️ 语音朗读 当前: 晓晓 (温柔女声)

引言

vLLM是2023年最热门的LLM推理框架之一,它通过PagedAttention技术和连续批处理,实现了极高的推理吞吐量。本文深入解析vLLM的架构设计、核心技术以及部署实践。

vLLM核心架构

1. PagedAttention原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
PagedAttention: 基于虚拟内存管理的注意力机制
核心思想:将KV Cache分成固定大小的"页",实现高效内存管理
"""

import torch
import torch.nn as nn
from typing import Optional

class PagedAttentionConfig:
"""
PagedAttention配置
"""
def __init__(self, block_size=16, num_blocks=1024):
self.block_size = block_size # 每块大小
self.num_blocks = num_blocks # 总块数
self.head_dim = 128 # 注意力头维度
self.num_heads = 32 # 注意力头数

class KVCache:
"""
分页KV Cache
"""
def __init__(self, config: PagedAttentionConfig):
self.config = config
self.block_size = config.block_size

# 物理块管理器
self.num_blocks = config.num_blocks

# KV缓存张量
self.k_cache = None
self.v_cache = None

# 块表:逻辑块 -> 物理块
self.block_tables = {}

def alloc(self, seq_len: int) -> dict:
"""
为新序列分配缓存块
"""
num_blocks_needed = (seq_len + self.block_size - 1) // self.block_size

# 分配物理块
physical_blocks = []
for _ in range(num_blocks_needed):
block_id = self._alloc_physical_block()
if block_id is None:
raise RuntimeError("Out of memory")
physical_blocks.append(block_id)

# 更新块表
block_table = physical_blocks.copy()

return {
"num_blocks": num_blocks_needed,
"block_table": block_table
}

def _alloc_physical_block(self) -> Optional[int]:
"""分配物理块"""
# 简化实现
for i in range(self.num_blocks):
if i not in self.used_blocks:
self.used_blocks.add(i)
return i
return None

def update(self, seq_id: int, start_pos: int, k: torch.Tensor, v: torch.Tensor):
"""
更新KV缓存
"""
block_table = self.block_tables[seq_id]

# 计算块索引和块内偏移
block_idx = start_pos // self.block_size
block_offset = start_pos % self.block_size

num_tokens = k.shape[0]

for i in range(num_tokens):
if block_offset == self.block_size:
block_idx += 1
block_offset = 0

physical_block = block_table[block_idx]

# 写入缓存
self.k_cache[physical_block, block_offset] = k[i]
self.v_cache[physical_block, block_offset] = v[i]

block_offset += 1

def free(self, seq_id: int):
"""释放序列的缓存块"""
if seq_id in self.block_tables:
for block_id in self.block_tables[seq_id]:
self.used_blocks.discard(block_id)
del self.block_tables[seq_id]

class PagedAttention(nn.Module):
"""
PagedAttention实现
"""
def __init__(self, config: PagedAttentionConfig):
super().__init__()
self.config = config
self.kv_cache = KVCache(config)

def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor,
block_tables: dict, seq_lens: dict):
"""
PagedAttention前向传播

Args:
query: 查询张量 [batch, heads, seq_len, head_dim]
key: 键张量 [batch, heads, seq_len, head_dim]
value: 值张量 [batch, heads, seq_len, head_dim]
block_tables: 块表,映射逻辑位置到物理块
seq_lens: 每个序列的长度
"""
B, H, T, D = query.shape

# 扩展query以匹配key/value长度
if T != key.shape[2]:
query = query.expand(-1, -1, key.shape[2], -1)

# 计算注意力分数
scale = 1.0 / (D ** 0.5)

# 分块计算注意力
output = torch.zeros_like(query)

for batch_idx in range(B):
seq_len = seq_lens[batch_idx]
block_table = block_tables[batch_idx]

# 获取当前batch的KV
k_seq = key[batch_idx] # [H, seq_len, D]
v_seq = value[batch_idx] # [H, seq_len, D]
q = query[batch_idx] # [H, T, D]

# 分块计算
num_blocks = (seq_len + self.config.block_size - 1) // self.config.block_size

for block_idx in range(num_blocks):
# 获取物理块
physical_block = block_table[block_idx]

# 获取KV块
start = block_idx * self.config.block_size
end = min(start + self.config.block_size, seq_len)

k_block = k_seq[:, start:end] # [H, block_size, D]
v_block = v_seq[:, start:end] # [H, block_size, D]

# 计算注意力
attn = torch.matmul(q, k_block.transpose(-2, -1)) * scale # [H, T, block_size]
attn = torch.softmax(attn, dim=-1)

# 累积输出
output[batch_idx, :, start:end] = torch.matmul(attn, v_block)

return output

2. 连续批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
from dataclasses import dataclass
from typing import List, Optional
import time

@dataclass
class GenerationRequest:
"""生成请求"""
request_id: str
prompt: str
max_tokens: int
temperature: float
created_at: float

@dataclass
class GenerationResult:
"""生成结果"""
request_id: str
text: str
num_generated_tokens: int
latency: float

class ContinuousBatching:
"""
连续批处理调度器
"""
def __init__(self, model_runner, max_batch_size=32):
self.model_runner = model_runner
self.max_batch_size = max_batch_size

# 请求队列
self.pending_requests: List[GenerationRequest] = []
self.running_requests: dict = {} # request_id -> request info

# 统计
self.total_requests = 0
self.total_tokens = 0

async def add_request(self, request: GenerationRequest):
"""添加请求到队列"""
self.pending_requests.append(request)
self.total_requests += 1

async def step(self) -> List[GenerationResult]:
"""
调度步骤
返回完成的结果
"""
completed = []

# 1. 准备批处理
batch_requests = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]

if not batch_requests:
await asyncio.sleep(0.01)
return completed

# 2. 运行推理
prompts = [r.prompt for r in batch_requests]
results = self.model_runner.generate_batch(prompts)

# 3. 处理结果
for req, result in zip(batch_requests, results):
completed_result = GenerationResult(
request_id=req.request_id,
text=result["text"],
num_generated_tokens=result["num_tokens"],
latency=time.time() - req.created_at
)
completed.append(completed_result)
self.total_tokens += completed_result.num_generated_tokens

return completed

async def run(self):
"""运行调度器"""
while True:
completed = await self.step()

# 处理完成的请求
for result in completed:
print(f"Request {result.request_id} completed: {result.latency:.2f}s")

# 检查是否所有请求都已完成
if not self.pending_requests and not self.running_requests:
await asyncio.sleep(0.1)

vLLM部署实践

1. OpenAI兼容API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 启动vLLM服务器
"""
vllm serve mistralai/Mistral-7B-Instruct-v0.1 \
--tensor-parallel-size 2 \
--gpu-memory-utilization 0.9 \
--port 8000
"""

from openai import OpenAI

class vLLMClient:
"""
vLLM客户端
"""
def __init__(self, base_url="http://localhost:8000/v1"):
self.client = OpenAI(base_url=base_url, api_key="EMPTY")

def chat_completion(self, messages: list, model: str = "mistral",
temperature: float = 0.7, max_tokens: int = 256):
"""
对话补全
"""
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)

return {
"content": response.choices[0].message.content,
"usage": response.usage.to_dict()
}

def completion(self, prompt: str, model: str = "mistral",
max_tokens: int = 256):
"""
文本补全
"""
response = self.client.completions.create(
model=model,
prompt=prompt,
max_tokens=max_tokens
)

return {
"content": response.choices[0].text,
"usage": response.usage.to_dict()
}

def batch_completion(self, prompts: List[str], model: str = "mistral"):
"""
批量补全
"""
response = self.client.completions.create(
model=model,
prompts=prompts,
max_tokens=256,
batch_size=len(prompts)
)

return [
{"content": choice.text, "index": choice.index}
for choice in response.choices
]

2. FastAPI集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
import uuid

app = FastAPI(title="LLM Inference API")

# 存储请求状态
request_status = {}

class ChatRequest(BaseModel):
messages: List[dict]
model: str = "mistral"
temperature: float = 0.7
max_tokens: int = 256
stream: bool = False

@app.post("/chat/completions")
async def chat_completions(request: ChatRequest, background_tasks: BackgroundTasks):
"""
对话补全接口
"""
request_id = str(uuid.uuid4())

request_status[request_id] = {
"status": "processing",
"created_at": time.time()
}

# 创建vLLM客户端
client = vLLMClient()

# 调用vLLM
result = client.chat_completion(
messages=request.messages,
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)

request_status[request_id]["status"] = "completed"

return {
"id": f"chatcmpl-{request_id[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": request.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": result["content"]
},
"finish_reason": "stop"
}],
"usage": result["usage"]
}

@app.get("/models")
async def list_models():
"""列出可用模型"""
return {
"data": [
{"id": "mistral", "object": "model", "owned_by": "mistralai"},
{"id": "llama", "object": "model", "owned_by": "meta"}
]
}

@app.get("/health")
async def health():
"""健康检查"""
return {"status": "ok"}

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

3. 性能基准测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import time
import statistics
from concurrent.futures import ThreadPoolExecutor

class LLMBenchmark:
"""
LLM推理性能基准测试
"""
def __init__(self, client: vLLMClient):
self.client = client

def throughput_benchmark(self, num_requests: int = 100,
num_concurrent: int = 10):
"""
吞吐量测试
"""
prompts = [
"解释量子计算的基本原理。" * 10,
"写一段Python代码实现快速排序。",
"什么是机器学习中的梯度下降法?"
]

latencies = []

def make_request():
start = time.time()
self.client.completion(
prompt=prompts[0],
max_tokens=256
)
return time.time() - start

# 串行热身
for _ in range(5):
make_request()

# 并发测试
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
futures = [executor.submit(make_request) for _ in range(num_requests)]

for future in futures:
latencies.append(future.result())

return {
"num_requests": num_requests,
"num_concurrent": num_concurrent,
"total_time": sum(latencies),
"avg_latency": statistics.mean(latencies),
"median_latency": statistics.median(latencies),
"p95_latency": sorted(latencies)[int(len(latencies) * 0.95)],
"p99_latency": sorted(latencies)[int(len(latencies) * 0.99)],
"throughput": num_requests / sum(latencies)
}

def latency_benchmark(self, prompt: str):
"""
延迟测试
"""
results = {"first_token": [], "end_to_end": []}

for _ in range(20):
# 首token延迟
start = time.time()
# 模拟首token测量
self.client.completion(prompt, max_tokens=1)
results["first_token"].append(time.time() - start)

# 端到端延迟
start = time.time()
self.client.completion(prompt, max_tokens=256)
results["end_to_end"].append(time.time() - start)

return {
"first_token_avg": statistics.mean(results["first_token"]),
"first_token_p50": statistics.median(results["first_token"]),
"e2e_avg": statistics.mean(results["end_to_end"]),
"e2e_p50": statistics.median(results["end_to_end"])
}

vLLM性能优化

1. Tensor Parallel

1
2
3
4
# 多GPU并行推理
vllm serve meta-llama/Llama-2-70b-chat-hf \
--tensor-parallel-size 4 \
--gpu-memory-utilization 0.9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class TensorParallelRunner:
"""
Tensor Parallel配置
"""
@staticmethod
def calculate_num_gpus(num_params: int) -> int:
"""
根据模型参数量计算所需GPU数
"""
# 估算:每个参数2字节(FP16),加上KV cache等开销
memory_per_param = 2 + 0.5 # weight + KV cache overhead

for num_gpus in [1, 2, 4, 8]:
available_memory = num_gpus * 80 * 1024**3 # 假设每卡80GB
required_memory = num_params * memory_per_param

if required_memory < available_memory * 0.9:
return num_gpus

return 8 # 最大8卡

2. 显存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MemoryOptimizer:
"""
vLLM显存优化配置
"""
@staticmethod
def get_optimal_config(num_gpus: int, gpu_memory_gb: float):
"""
获取最优配置
"""
# GPU利用率
gpu_util = 0.9

# 根据显存计算最大batch size
if gpu_memory_gb >= 80:
max_model_len = 8192
max_num_seqs = 256
elif gpu_memory_gb >= 40:
max_model_len = 4096
max_num_seqs = 128
else:
max_model_len = 2048
max_num_seqs = 64

return {
"--gpu-memory-utilization": gpu_util,
"--max-model-len": max_model_len,
"--max-num-seqs": max_num_seqs,
"--num-scheduler-steps": 1,
"--enable-chunked-prefill": True
}

总结

vLLM通过PagedAttention和连续批处理技术,实现了LLM推理的重大性能提升。其OpenAI兼容的API设计使得从其他服务迁移变得简单。本文详细介绍了vLLM的架构原理和部署实践,为开发者提供了全面的参考指南。

参考资源

© 2019-2026 ovo$^{mc^2}$ All Rights Reserved. | 站点总访问 28969 次 | 访客 19045
Theme by hiero