""" 针对图像分类模型的测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算 需要安装指定python库实现功能 pip install psutil gputil pynvml """ import argparse import os import sys rootpath = str(os.path.abspath(os.path.join(os.path.dirname(__file__), '../'))) sys.path.append(rootpath) import psutil import GPUtil import numpy as np import time from threading import Thread import onnxruntime as ort from PIL import Image # 定义监控函数 class UsageMonitor: def __init__(self, interval=0.5): self.interval = interval self.cpu_usage = [] self.gpu_usage = [] self.running = False def start(self): self.running = True self.monitor_thread = Thread(target=self._monitor) self.monitor_thread.start() def _monitor(self): while self.running: # 记录 CPU 使用率 self.cpu_usage.append(psutil.cpu_percent(interval=None)) # 记录 GPU 使用率 gpus = GPUtil.getGPUs() if gpus: self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率 else: self.gpu_usage.append(0) # 若没有 GPU 则记为 0 time.sleep(self.interval) def stop(self): self.running = False self.monitor_thread.join() def get_average_usage(self): avg_cpu_usage = np.mean(self.cpu_usage) avg_gpu_usage = np.mean(self.gpu_usage) return avg_cpu_usage, avg_gpu_usage def process_image(image_path, transpose=True): """ 图片处理 :param image_path: 图片路径 :param transpose: 是否进行维度转换,在使用pytorch框架训练出来的权重需要进行维度转换,tensorflow、keras框架不需要 :return: """ # 打开图像并转换为RGB image = Image.open(image_path).convert("RGB") # 调整图像大小 image = image.resize((224, 224)) # 转换为numpy数组并归一化 image_array = np.array(image) / 255.0 # 将像素值缩放到[0, 1] # 进行标准化 mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) image_array = (image_array - mean) / std if transpose: image_array = image_array.transpose((2, 0, 1)).copy() return image_array.astype(np.float32) def batch_predict_images(session, image_dir, target_class, batch_size=10, pytorch=True): """ 对指定图片文件夹图片进行批量检测 :param session: onnx runtime session :param image_dir: 待推理的图像文件夹 :param target_class: 目标分类 :param batch_size: 每批图片数量, 默认为10 :param pytorch: 模型是否使用pytorch框架训练出的权重导出的onnx文件,默认为True :return: 检测结果 """ image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] results = {} input_name = session.get_inputs()[0].name correct_predictions = 0 total_predictions = 0 for i in range(0, len(image_files), batch_size): batch_files = image_files[i:i + batch_size] batch_images = [] for image_file in batch_files: image_path = os.path.join(image_dir, image_file) image = process_image(image_path, pytorch) batch_images.append(image) # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度 batch_images = np.stack(batch_images) # 执行预测 outputs = session.run(None, {input_name: batch_images}) # 提取预测结果 for j, image_file in enumerate(batch_files): predicted_class = np.argmax(outputs[0][j]) # 假设输出是每类的概率 results[image_file] = predicted_class total_predictions += 1 # 比较预测结果与目标分类 if predicted_class == target_class: correct_predictions += 1 # 计算准确率 accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0 return accuracy # 模型推理函数 def model_inference(model_filename, val_dataset_dir): """ 模型推理验证集目录下所有图片 :param model_filename: 模型文件 :param val_dataset_dir: 验证集图片目录 :return: 验证集推理准确率 """ # 以下使用GPU进行推理出现问题,需要较新的CUDA版本,默认使用CPU进行推理 # if ort.get_available_providers(): # session = ort.InferenceSession(model_filename, providers=['CUDAExecutionProvider']) # else: # session = ort.InferenceSession(model_filename) session = ort.InferenceSession(model_filename) accuracy = 0 class_num = 0 index = 0 for class_dir in os.listdir(val_dataset_dir): class_path = os.path.join(val_dataset_dir, class_dir) # 检查是否为目录 if not os.path.isdir(class_path): continue class_num += 1 is_pytorch = False if "keras" in model_filename or "tensorflow" in model_filename else True batch_result = batch_predict_images(session, class_path, index, pytorch=is_pytorch) accuracy += batch_result index += 1 print(f"class_num: {class_num}, index: {index}") return accuracy * 1.0 / class_num if __name__ == '__main__': parser = argparse.ArgumentParser(description='模型推理性能验证脚本') parser.add_argument('--origin_model_file', default=None, type=str, help='待测试原始模型的onnx文件') parser.add_argument('--watermark_model_file', default=None, type=str, help='待测试水印模型的onnx文件') parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录') args, _ = parser.parse_known_args() if args.origin_model_file is None: raise Exception("待测试模型的onnx文件不可为空") if args.val_dataset_dir is None: raise Exception("验证集目录不可为空") monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor.start() # 记录推理开始时间 start_time = time.time() # 进行模型推理 accuracy = model_inference(args.origin_model_file, args.val_dataset_dir) # 记录推理结束时间 end_time = time.time() monitor.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu, avg_gpu = monitor.get_average_usage() print("原始模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu:.2f}%") print(f"平均 GPU 使用率:{avg_gpu:.2f}%") print(f"模型推理时间: {end_time - start_time:.2f} 秒") print(f"准确率: {accuracy * 100:.2f}%") if args.watermark_model_file: # 加入存在比对模型,进行再次推理,然后统计性能指标 time.sleep(20) monitor2 = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor2.start() # 记录推理开始时间 start_time2 = time.time() # 进行模型推理 accuracy2 = model_inference(args.watermark_model_file, args.val_dataset_dir) # 记录推理结束时间 end_time2 = time.time() monitor2.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu2, avg_gpu2 = monitor2.get_average_usage() print("水印模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu2:.2f}%") print(f"平均 GPU 使用率:{avg_gpu2:.2f}%") print(f"模型推理时间: {end_time2 - start_time2:.2f} 秒") print(f"准确率: {accuracy2 * 100:.2f}%") print("------------------性能指标如下-------------------------") print(f"嵌入后模型推理准确率下降值:{(accuracy - accuracy2) * 100:.2f}%") print(f"算力资源消耗增加值:{(avg_cpu2 - avg_cpu):.2f}%") print(f"运行效率降低值: {((end_time2 - start_time2) - (end_time - start_time)) * 100 / (end_time - start_time):.2f} %")