123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- """
- 测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算
- 需要安装指定python库实现功能
- pip install psutil gputil pynvml
- """
- import argparse
- import os
- import psutil
- import GPUtil
- import numpy as np
- import time
- from threading import Thread
- import onnxruntime as ort
- from PIL import Image
- # 定义监控函数
- class UsageMonitor:
- def __init__(self, interval=0.5):
- self.interval = interval
- self.cpu_usage = []
- self.gpu_usage = []
- self.running = False
- def start(self):
- self.running = True
- self.monitor_thread = Thread(target=self._monitor)
- self.monitor_thread.start()
- def _monitor(self):
- while self.running:
- # 记录 CPU 使用率
- self.cpu_usage.append(psutil.cpu_percent(interval=None))
- # 记录 GPU 使用率
- gpus = GPUtil.getGPUs()
- if gpus:
- self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率
- else:
- self.gpu_usage.append(0) # 若没有 GPU 则记为 0
- time.sleep(self.interval)
- def stop(self):
- self.running = False
- self.monitor_thread.join()
- def get_average_usage(self):
- avg_cpu_usage = np.mean(self.cpu_usage)
- avg_gpu_usage = np.mean(self.gpu_usage)
- return avg_cpu_usage, avg_gpu_usage
- def process_image(image_path):
- # 打开图像并转换为RGB
- image = Image.open(image_path).convert("RGB")
- # 调整图像大小
- image = image.resize((224, 224))
- # 转换为numpy数组并归一化
- image_array = np.array(image) / 255.0 # 将像素值缩放到[0, 1]
- # 进行标准化
- mean = np.array([0.485, 0.456, 0.406])
- std = np.array([0.229, 0.224, 0.225])
- image_array = (image_array - mean) / std
- image_array = image_array.transpose((2, 0, 1)).copy()
- return image_array.astype(np.float32)
- def batch_predict_images(session, image_dir, target_class, batch_size=10):
- """
- 对指定图片文件夹图片进行批量检测
- :param session: onnx runtime session
- :param image_dir: 待推理的图像文件夹
- :param target_class: 目标分类
- :param batch_size: 每批图片数量
- :return: 检测结果
- """
- image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
- results = {}
- input_name = session.get_inputs()[0].name
- correct_predictions = 0
- total_predictions = 0
- for i in range(0, len(image_files), batch_size):
- batch_files = image_files[i:i + batch_size]
- batch_images = []
- for image_file in batch_files:
- image_path = os.path.join(image_dir, image_file)
- image = process_image(image_path)
- batch_images.append(image)
- # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度
- batch_images = np.stack(batch_images)
- # 执行预测
- outputs = session.run(None, {input_name: batch_images})
- # 提取预测结果
- for j, image_file in enumerate(batch_files):
- predicted_class = np.argmax(outputs[0][j]) # 假设输出是每类的概率
- results[image_file] = predicted_class
- total_predictions += 1
- # 比较预测结果与目标分类
- if predicted_class == target_class:
- correct_predictions += 1
- # 计算准确率
- accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
- return accuracy
- # 模型推理函数
- def model_inference(model_filename, val_dataset_dir):
- """
- 模型推理
- :param model_filename: 模型文件
- :param val_dataset_dir: 验证集图片目录
- :return: 验证集推理准确率
- """
- if ort.get_available_providers():
- session = ort.InferenceSession("model.onnx", providers=['CUDAExecutionProvider'])
- else:
- session = ort.InferenceSession(model_filename)
- accuracy = 0
- class_num = 0
- index = 0
- for class_dir in os.listdir(val_dataset_dir):
- class_path = os.path.join(val_dataset_dir, class_dir)
- # 检查是否为目录
- if not os.path.isdir(class_path):
- continue
- class_num += 1
- batch_result = batch_predict_images(session, class_path, index)
- accuracy += batch_result
- index += 1
- print(f"class_num: {class_num}, index: {index}")
- return accuracy * 1.0 / class_num
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='模型推理性能验证脚本')
- # parser.add_argument('--model_file', default=None, type=str, help='待测试模型的onnx文件')
- # parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录')
- parser.add_argument('--model_file', default="origin_models/googlenet/model_139.onnx", type=str, help='待测试模型的onnx文件')
- parser.add_argument('--val_dataset_dir', default="val", type=str, help='验证集目录')
- args, _ = parser.parse_known_args()
- if args.model_file is None:
- raise Exception("待测试模型的onnx文件不可为空")
- if args.val_dataset_dir is None:
- raise Exception("验证集目录不可为空")
- monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次
- monitor.start()
- # 记录推理开始时间
- start_time = time.time()
- # 进行模型推理
- accuracy = model_inference(args.model_file, args.val_dataset_dir)
- # 记录推理结束时间
- end_time = time.time()
- monitor.stop()
- # 输出平均 CPU 和 GPU 使用率
- avg_cpu, avg_gpu = monitor.get_average_usage()
- print(f"平均 CPU 使用率:{avg_cpu:.2f}%")
- print(f"平均 GPU 使用率:{avg_gpu:.2f}%")
- print(f"模型推理时间: {end_time - start_time:.2f} 秒")
- print(f"准确率: {accuracy * 100:.2f}%")
|