""" 针对图像分类模型的测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算 需要安装指定python库实现功能 pip install psutil gputil pynvml """ import argparse import os import psutil import GPUtil import numpy as np import time from threading import Thread import onnxruntime as ort from PIL import Image # 定义监控函数 class UsageMonitor: def __init__(self, interval=0.5): self.interval = interval self.cpu_usage = [] self.gpu_usage = [] self.running = False def start(self): self.running = True self.monitor_thread = Thread(target=self._monitor) self.monitor_thread.start() def _monitor(self): while self.running: # 记录 CPU 使用率 self.cpu_usage.append(psutil.cpu_percent(interval=None)) # 记录 GPU 使用率 gpus = GPUtil.getGPUs() if gpus: self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率 else: self.gpu_usage.append(0) # 若没有 GPU 则记为 0 time.sleep(self.interval) def stop(self): self.running = False self.monitor_thread.join() def get_average_usage(self): avg_cpu_usage = np.mean(self.cpu_usage) avg_gpu_usage = np.mean(self.gpu_usage) return avg_cpu_usage, avg_gpu_usage def process_image(image_path, transpose=True): """ 图片处理 :param image_path: 图片路径 :param transpose: 是否进行维度转换,在使用pytorch框架训练出来的权重需要进行维度转换,tensorflow、keras框架不需要 :return: """ # 打开图像并转换为RGB image = Image.open(image_path).convert("RGB") # 调整图像大小 image = image.resize((224, 224)) # 转换为numpy数组并归一化 image_array = np.array(image) / 255.0 # 将像素值缩放到[0, 1] # 进行标准化 mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) image_array = (image_array - mean) / std if transpose: image_array = image_array.transpose((2, 0, 1)).copy() return image_array.astype(np.float32) def batch_predict_images(session, image_dir, target_class, batch_size=10, pytorch=True): """ 对指定图片文件夹图片进行批量检测 :param session: onnx runtime session :param image_dir: 待推理的图像文件夹 :param target_class: 目标分类 :param batch_size: 每批图片数量, 默认为10 :param pytorch: 模型是否使用pytorch框架训练出的权重导出的onnx文件,默认为True :return: 检测结果 """ image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] results = {} input_name = session.get_inputs()[0].name correct_predictions = 0 total_predictions = 0 for i in range(0, len(image_files), batch_size): batch_files = image_files[i:i + batch_size] batch_images = [] for image_file in batch_files: image_path = os.path.join(image_dir, image_file) image = process_image(image_path, pytorch) batch_images.append(image) # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度 batch_images = np.stack(batch_images) # 执行预测 outputs = session.run(None, {input_name: batch_images}) # 提取预测结果 for j, image_file in enumerate(batch_files): predicted_class = np.argmax(outputs[0][j]) # 假设输出是每类的概率 results[image_file] = predicted_class total_predictions += 1 # 比较预测结果与目标分类 if predicted_class == target_class: correct_predictions += 1 # 计算准确率 accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0 return accuracy # 模型推理函数 def model_inference(model_filename, val_dataset_dir): """ 模型推理验证集目录下所有图片 :param model_filename: 模型文件 :param val_dataset_dir: 验证集图片目录 :return: 验证集推理准确率 """ # 以下使用GPU进行推理出现问题,需要较新的CUDA版本,默认使用CPU进行推理 # if ort.get_available_providers(): # session = ort.InferenceSession(model_filename, providers=['CUDAExecutionProvider']) # else: # session = ort.InferenceSession(model_filename) session = ort.InferenceSession(model_filename) accuracy = 0 class_num = 0 index = 0 for class_dir in os.listdir(val_dataset_dir): class_path = os.path.join(val_dataset_dir, class_dir) # 检查是否为目录 if not os.path.isdir(class_path): continue class_num += 1 is_pytorch = False if "keras" in model_filename or "tensorflow" in model_filename else True batch_result = batch_predict_images(session, class_path, index, pytorch=is_pytorch) accuracy += batch_result index += 1 print(f"class_num: {class_num}, index: {index}") return accuracy * 1.0 / class_num if __name__ == '__main__': parser = argparse.ArgumentParser(description='模型推理性能验证脚本') parser.add_argument('--origin_model_file', default=None, type=str, help='待测试原始模型的onnx文件') parser.add_argument('--watermark_model_file', default=None, type=str, help='待测试水印模型的onnx文件') parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录') args, _ = parser.parse_known_args() if args.origin_model_file is None: raise Exception("待测试模型的onnx文件不可为空") if args.val_dataset_dir is None: raise Exception("验证集目录不可为空") monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor.start() # 记录推理开始时间 start_time = time.time() # 进行模型推理 accuracy = model_inference(args.origin_model_file, args.val_dataset_dir) # 记录推理结束时间 end_time = time.time() monitor.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu, avg_gpu = monitor.get_average_usage() print("原始模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu:.2f}%") print(f"平均 GPU 使用率:{avg_gpu:.2f}%") print(f"模型推理时间: {end_time - start_time:.2f} 秒") print(f"准确率: {accuracy * 100:.2f}%") if args.watermark_model_file: # 加入存在比对模型,进行再次推理,然后统计性能指标 time.sleep(20) monitor2 = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor2.start() # 记录推理开始时间 start_time2 = time.time() # 进行模型推理 accuracy2 = model_inference(args.watermark_model_file, args.val_dataset_dir) # 记录推理结束时间 end_time2 = time.time() monitor2.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu2, avg_gpu2 = monitor2.get_average_usage() print("水印模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu2:.2f}%") print(f"平均 GPU 使用率:{avg_gpu2:.2f}%") print(f"模型推理时间: {end_time2 - start_time2:.2f} 秒") print(f"准确率: {accuracy2 * 100:.2f}%") print("------------------性能指标如下-------------------------") print(f"嵌入后模型推理准确率下降值:{(accuracy - accuracy2) * 100:.2f}%") print(f"算力资源消耗增加值:{(avg_cpu2 - avg_cpu):.2f}%") print(f"运行效率降低值: {((end_time2 - start_time2) - (end_time - start_time)) * 100 / (end_time - start_time):.2f} %")