AI+流体力学:PINN求解N-S方程实战

🎙️ 语音朗读 当前: 晓晓 (温柔女声)

AI+流体力学:PINN求解N-S方程实战

引言

纳维尔-斯托克斯方程(Navier-Stokes Equations)是描述流体运动的基本方程。然而,由于其高度非线性特性,传统数值方法在求解时面临巨大挑战。PINN(物理信息神经网络)的出现为这一经典难题提供了新的解决思路。本文将深入探讨如何使用PINN求解N-S方程,并提供完整的实战代码。

N-S方程基础回顾

方程形式

二维不可压缩N-S方程:

$$
\frac{\partial u}{\partial t} + u\frac{\partial u}{\partial x} + v\frac{\partial u}{\partial y} = -\frac{1}{\rho}\frac{\partial p}{\partial x} + \nu\left(\frac{\partial^2 u}{\partial x^2} + \frac{\partial^2 u}{\partial y^2}\right)
$$

$$
\frac{\partial v}{\partial t} + u\frac{\partial v}{\partial x} + v\frac{\partial v}{\partial y} = -\frac{1}{\rho}\frac{\partial p}{\partial y} + \nu\left(\frac{\partial^2 v}{\partial x^2} + \frac{\partial^2 v}{\partial y^2}\right)
$$

连续性方程(不可压条件):

$$
\frac{\partial u}{\partial x} + \frac{\partial v}{\partial y} = 0
$$

其中:

  • $u, v$:速度分量
  • $p$:压力
  • $\rho$:密度
  • $\nu$:运动粘度

PINN求解器架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

class NSPINN(nn.Module):
"""
求解N-S方程的物理信息神经网络

网络输出: [u, v, p] 速度和压力
"""

def __init__(self, layers=[2, 64, 64, 64, 64, 3]):
super().__init__()

# 深度神经网络近似 u(x,y,t), v(x,y,t), p(x,y,t)
layers_dict = []
for i in range(len(layers) - 1):
layers_dict.append(nn.Linear(layers[i], layers[i+1]))
if i < len(layers) - 2: # 除了最后一层
layers_dict.append(nn.Tanh())

self.network = nn.Sequential(*layers_dict)

# 流体参数
self.nu = 0.01 # 粘度
self.rho = 1.0 # 密度

# 用于存储训练配置点
self.domain_points = None

def forward(self, x, y, t):
"""
前向传播
输入: 空间坐标(x,y)和时间t
输出: u, v, p
"""
inputs = torch.cat([x, y, t], dim=1)
outputs = self.network(inputs)

u = outputs[:, 0:1]
v = outputs[:, 1:2]
p = outputs[:, 2:3]

return u, v, p

梯度计算与N-S残差

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def compute_derivatives(self, x, y, t):
"""
计算神经网络输出的各阶导数
使用自动微分
"""
# 确保需要梯度
x.requires_grad_(True)
y.requires_grad_(True)
t.requires_grad_(True)

# 前向传播
u, v, p = self.forward(x, y, t)

# 一阶导数
u_t = torch.autograd.grad(u, t, torch.ones_like(u),
create_graph=True)[0]
u_x = torch.autograd.grad(u, x, torch.ones_like(u),
create_graph=True)[0]
u_y = torch.autograd.grad(u, y, torch.ones_like(u),
create_graph=True)[0]

v_t = torch.autograd.grad(v, t, torch.ones_like(v),
create_graph=True)[0]
v_x = torch.autograd.grad(v, x, torch.ones_like(v),
create_graph=True)[0]
v_y = torch.autograd.grad(v, y, torch.ones_like(v),
create_graph=True)[0]

p_x = torch.autograd.grad(p, x, torch.ones_like(p),
create_graph=True)[0]
p_y = torch.autograd.grad(p, y, torch.ones_like(p),
create_graph=True)[0]

# 二阶导数
u_xx = torch.autograd.grad(u_x, x, torch.ones_like(u_x),
create_graph=True)[0]
u_yy = torch.autograd.grad(u_y, y, torch.ones_like(u_y),
create_graph=True)[0]

v_xx = torch.autograd.grad(v_x, x, torch.ones_like(v_x),
create_graph=True)[0]
v_yy = torch.autograd.grad(v_y, y, torch.ones_like(v_y),
create_graph=True)[0]

return {
'u': u, 'v': v, 'p': p,
'u_t': u_t, 'u_x': u_x, 'u_y': u_y,
'v_t': v_t, 'v_x': v_x, 'v_y': v_y,
'u_xx': u_xx, 'u_yy': u_yy,
'v_xx': v_xx, 'v_yy': v_yy,
'p_x': p_x, 'p_y': p_y
}

def compute_ns_residuals(self, x, y, t):
"""
计算N-S方程残差
"""
derivs = self.compute_derivatives(x, y, t)

u = derivs['u']
v = derivs['v']
u_t = derivs['u_t']
v_t = derivs['v_t']
u_x = derivs['u_x']
u_y = derivs['u_y']
v_x = derivs['v_x']
v_y = derivs['v_y']
u_xx = derivs['u_xx']
u_yy = derivs['u_yy']
v_xx = derivs['v_xx']
v_yy = derivs['v_yy']
p_x = derivs['p_x']
p_y = derivs['p_y']

# X-动量方程残差: u_t + u*u_x + v*u_y = -p_x/rho + nu*(u_xx + u_yy)
residual_x = (u_t + u * u_x + v * u_y +
p_x / self.rho -
self.nu * (u_xx + u_yy))

# Y-动量方程残差: v_t + u*v_x + v*v_y = -p_y/rho + nu*(v_xx + v_yy)
residual_y = (v_t + u * v_x + v * v_y +
p_y / self.rho -
self.nu * (v_xx + v_yy))

# 连续性方程残差: u_x + v_y = 0
residual_continuity = u_x + v_y

return residual_x, residual_y, residual_continuity

损失函数与训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
class NSTrainer:
"""N-S PINN训练器"""

def __init__(self, model, device='cuda'):
self.model = model.to(device)
self.device = device
self.history = {'total': [], 'pde': [], 'bc': [], 'ic': []}

def generate_collocation_points(self, n_points,
x_range, y_range, t_range):
"""生成域内配置点"""
x = np.random.uniform(x_range[0], x_range[1], n_points)
y = np.random.uniform(y_range[0], y_range[1], n_points)
t = np.random.uniform(t_range[0], t_range[1], n_points)

return (torch.tensor(x, dtype=torch.float32, device=self.device),
torch.tensor(y, dtype=torch.float32, device=self.device),
torch.tensor(t, dtype=torch.float32, device=self.device))

def get_initial_condition(self, x, y, case='lid_driven'):
"""
初始条件定义

对于顶盖驱动腔问题:
u(x,y,0) = 0, v(x,y,0) = 0
"""
u0 = torch.zeros_like(x)
v0 = torch.zeros_like(x)
return u0, v0

def get_boundary_condition(self, x, y, t, case='lid_driven'):
"""
边界条件定义

对于顶盖驱动腔问题(Lid-Driven Cavity):
- 顶边(y=1): u=1, v=0 (移动顶盖)
- 底边(y=0): u=0, v=0
- 左边(x=0): u=0, v=0
- 右边(x=1): u=0, v=0
"""
# 检测边界点
tol = 0.01

u_bc = torch.zeros_like(x)
v_bc = torch.zeros_like(x)

# 顶边 (y = 1)
mask_top = torch.isclose(y, torch.ones_like(y) * 1.0, atol=tol)
u_bc[mask_top] = 1.0

return u_bc, v_bc

def compute_loss(self, x, y, t):
"""计算总损失"""
# 1. PDE残差损失
res_x, res_y, res_cont = self.model.compute_ns_residuals(x, y, t)
loss_pde = (torch.mean(res_x**2) +
torch.mean(res_y**2) +
torch.mean(res_cont**2))

# 2. 边界条件损失
u_bc, v_bc = self.get_boundary_condition(x, y, t)
u_pred, v_pred, _ = self.model(x, y, t)
loss_bc = torch.mean((u_pred - u_bc)**2) + torch.mean((v_pred - v_bc)**2)

# 3. 初始条件损失
u0, v0 = self.get_initial_condition(x, y, 'lid_driven')
u_ic, v_ic, _ = self.model(x, y, torch.zeros_like(t))
loss_ic = torch.mean((u_ic - u0)**2) + torch.mean((v_ic - v0)**2)

# 加权总损失
total_loss = (loss_pde +
10.0 * loss_bc +
10.0 * loss_ic)

return total_loss, loss_pde, loss_bc, loss_ic

def train(self, n_iterations, n_collocation_points,
x_range, y_range, t_range,
learning_rate=1e-3):
"""训练循环"""
optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1000, 0.95)

for iteration in range(n_iterations):
# 生成训练点
x, y, t = self.generate_collocation_points(
n_collocation_points, x_range, y_range, t_range
)

# 计算损失
loss, loss_pde, loss_bc, loss_ic = self.compute_loss(x, y, t)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
scheduler.step()

# 记录历史
self.history['total'].append(loss.item())
self.history['pde'].append(loss_pde.item())
self.history['bc'].append(loss_bc.item())
self.history['ic'].append(loss_ic.item())

if iteration % 500 == 0:
print(f"Iter {iteration:5d} | Loss: {loss.item():.6f} | "
f"PDE: {loss_pde.item():.6f} | BC: {loss_bc.item():.6f}")

return self.history

完整训练示例:顶盖驱动腔问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def solve_lid_driven_cavity():
"""
求解顶盖驱动腔问题
"""
# 设置设备
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# 创建模型
model = NSPINN(layers=[3, 80, 80, 80, 80, 3])
model.to(device)

# 创建训练器
trainer = NSTrainer(model, device)

# 定义计算域
x_range = [0.0, 1.0] # 方形腔
y_range = [0.0, 1.0]
t_range = [0.0, 2.0] # 达到稳态

# 训练
print("Starting training...")
history = trainer.train(
n_iterations=20000,
n_collocation_points=5000,
x_range=x_range,
y_range=y_range,
t_range=t_range,
learning_rate=1e-3
)

# 评估
model.eval()

# 在最终时间生成预测
n_grid = 50
x = np.linspace(0, 1, n_grid)
y = np.linspace(0, 1, n_grid)
X, Y = np.meshgrid(x, y)
t_final = torch.ones(n_grid*n_grid, 1, device=device) * 2.0

x_flat = torch.tensor(X.flatten(), dtype=torch.float32, device=device).reshape(-1, 1)
y_flat = torch.tensor(Y.flatten(), dtype=torch.float32, device=device).reshape(-1, 1)

with torch.no_grad():
u, v, p = model(x_flat, y_flat, t_final)

# 可视化
plot_results(X, Y, u.cpu().numpy(), v.cpu().numpy(), p.cpu().numpy())

return model, history

def plot_results(X, Y, u, v, p):
"""可视化结果"""
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 速度场u
im1 = axes[0, 0].contourf(X, Y, u.reshape(X.shape), levels=20)
axes[0, 0].set_title('Velocity u')
axes[0, 0].set_xlabel('x')
axes[0, 0].set_ylabel('y')
plt.colorbar(im1, ax=axes[0, 0])

# 速度场v
im2 = axes[0, 1].contourf(X, Y, v.reshape(X.shape), levels=20)
axes[0, 1].set_title('Velocity v')
axes[0, 1].set_xlabel('x')
axes[0, 1].set_ylabel('y')
plt.colorbar(im2, ax=axes[0, 1])

# 压力场
im3 = axes[1, 0].contourf(X, Y, p.reshape(X.shape), levels=20)
axes[1, 0].set_title('Pressure')
axes[1, 0].set_xlabel('x')
axes[1, 0].set_ylabel('y')
plt.colorbar(im3, ax=axes[1, 0])

# 速度矢量图
axes[1, 1].quiver(X[::4, ::4], Y[::4, ::4],
u.reshape(X.shape)[::4, ::4],
v.reshape(X.shape)[::4, ::4])
axes[1, 1].set_title('Velocity Vector')
axes[1, 1].set_xlabel('x')
axes[1, 1].set_ylabel('y')
axes[1, 1].set_xlim([0, 1])
axes[1, 1].set_ylim([0, 1])

plt.tight_layout()
plt.savefig('navier_stokes_solution.png', dpi=150)
plt.show()

if __name__ == '__main__':
solve_lid_driven_cavity()

更复杂的应用:圆柱绕流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CylinderFlowPINN(NSPINN):
"""
圆柱绕流问题的PINN
"""

def __init__(self):
super().__init__()
# 圆柱参数
self.center_x = 0.5
self.center_y = 0.5
self.radius = 0.1

def get_boundary_condition(self, x, y, t):
"""
圆柱绕流边界条件:
- 进口: u = U_inlet, v = 0
- 圆柱表面: u = 0, v = 0 (无滑移)
- 出口: 自由流出
"""
U_inlet = 1.0
u_bc = torch.zeros_like(x)
v_bc = torch.zeros_like(x)

# 检测进口(左边界)
mask_inlet = x < 0.1
u_bc[mask_inlet] = U_inlet

# 检测圆柱表面
dist_to_center = ((x - self.center_x)**2 + (y - self.center_y)**2)**0.5
mask_cylinder = dist_to_center < (self.radius + 0.01)

return u_bc, v_bc, mask_cylinder

高级技巧

1. 变换学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class TransferLearning:
"""迁移学习加速训练"""

@staticmethod
def pretrain_on_easier_problem():
"""
预训练策略: 先训练简单问题,再迁移到复杂问题
"""
# 1. 先训练低雷诺数流动
model = NSPINN()
trainer = NSTrainer(model)
trainer.model.nu = 0.1 # 高粘度
trainer.train(n_iterations=5000, ...)

# 2. 迁移到目标雷诺数
trainer.model.nu = 0.01 # 降低粘度
trainer.train(n_iterations=10000, ...)

return model

2. 自适应采样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class AdaptiveSampling:
"""基于残差的自适应采样"""

@staticmethod
def residual_based_refinement(model, x, y, t, n_new_points=100):
"""
在残差大的区域增加采样点
"""
model.eval()
x.requires_grad_(True)
y.requires_grad_(True)
t.requires_grad_(True)

# 计算残差
res_x, res_y, res_cont = model.compute_ns_residuals(x, y, t)

# 计算总残差
total_residual = torch.sqrt(res_x**2 + res_y**2 + res_cont**2)

# 选择残差最大的点
_, indices = torch.topk(total_residual.squeeze(), n_new_points)

return x[indices], y[indices], t[indices]

与传统CFD方法对比

特性 PINN 传统CFD (FVM)
网格生成 无需/稀疏网格 精细网格必需
高维问题 天然处理 维度灾难
非结构化网格 容易处理 需要特殊处理
计算效率 GPU并行 CPU密集
精度 适中
物理一致性 理论保证 数值耗散
可解释性 黑盒 物理清晰

总结与展望

PINN为流体力学问题提供了一种新的求解范式。虽然与传统CFD方法相比仍有精度差距,但其在高维问题、无网格计算等方面的优势使其成为强有力的补充工具。


推荐阅读:

  • Raissi et al. “Physics-informed neural networks for fluid mechanics”
  • 《Computational Fluid Dynamics》- Hoffman & Chiang
  • 《Deep Learning in Fluid Dynamics》综述论文
© 2019-2026 ovo$^{mc^2}$ All Rights Reserved. | 站点总访问 28969 次 | 访客 19045
Theme by hiero