1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit import numpy as np import time from collections import namedtuple
Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))
class TRTInference: """高性能TensorRT推理封装""" def __init__(self, engine_path, device_id=0): self.logger = trt.Logger(trt.Logger.INFO) with open(engine_path, 'rb') as f: runtime = trt.Runtime(self.logger) self.engine = runtime.deserialize_cuda_engine(f.read()) self.context = self.engine.create_execution_context() self.stream = cuda.Stream() self.bindings = [] self.inputs = {} self.outputs = {} for i in range(self.engine.num_bindings): name = self.engine.get_binding_name(i) dtype = trt.nptype(self.engine.get_binding_dtype(i)) shape = self.engine.get_binding_shape(i) data = cuda.pagelocked_empty( trt.volume(shape), dtype ) ptr = cuda.mem_alloc(data.nbytes) binding = Binding(name, dtype, shape, data, ptr) self.bindings.append(binding) if self.engine.binding_is_input(i): self.inputs[name] = binding else: self.outputs[name] = binding def infer(self, input_data: dict): """执行推理""" for name, data in input_data.items(): binding = self.inputs[name] np.copyto(binding.data, data.ravel()) cuda.memcpy_htod_async(binding.ptr, binding.data, self.stream) binding_addrs = [b.ptr for b in self.bindings] self.context.execute_async_v2( bindings=binding_addrs, stream_handle=self.stream.handle ) results = {} for name, binding in self.outputs.items(): cuda.memcpy_dtoh_async(binding.data, binding.ptr, self.stream) results[name] = binding.data.reshape(binding.shape) self.stream.synchronize() return results def benchmark(self, input_data, warmup=10, iterations=100): """性能基准测试""" for _ in range(warmup): self.infer(input_data) latencies = [] for _ in range(iterations): start = time.perf_counter() self.infer(input_data) latencies.append((time.perf_counter() - start) * 1000) stats = { 'mean_ms': np.mean(latencies), 'std_ms': np.std(latencies), 'p50_ms': np.percentile(latencies, 50), 'p95_ms': np.percentile(latencies, 95), 'p99_ms': np.percentile(latencies, 99), 'fps': 1000 / np.mean(latencies) } print(f"推理性能:") print(f" 平均延迟: {stats['mean_ms']:.2f} ± {stats['std_ms']:.2f} ms") print(f" P50: {stats['p50_ms']:.2f} ms") print(f" P95: {stats['p95_ms']:.2f} ms") print(f" P99: {stats['p99_ms']:.2f} ms") print(f" FPS: {stats['fps']:.1f}") return stats
|