""" 针对目标检测模型的测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算 需要安装指定python库实现功能 pip install psutil gputil pynvml pycocotools """ import argparse import os import psutil import GPUtil import numpy as np import time from threading import Thread from pycocotools.coco import COCO import xml.etree.ElementTree as ET from watermark_verify.inference.rcnn_inference import FasterRCNNInference from watermark_verify.inference.ssd_inference import SSDInference from watermark_verify.inference.yolox_inference import YOLOXInference from watermark_verify.tools.evaluate_tool import calculate_iou # 定义监控函数 class UsageMonitor: def __init__(self, interval=0.5): self.interval = interval self.cpu_usage = [] self.gpu_usage = [] self.running = False def start(self): self.running = True self.monitor_thread = Thread(target=self._monitor) self.monitor_thread.start() def _monitor(self): while self.running: # 记录 CPU 使用率 self.cpu_usage.append(psutil.cpu_percent(interval=None)) # 记录 GPU 使用率 gpus = GPUtil.getGPUs() if gpus: self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率 else: self.gpu_usage.append(0) # 若没有 GPU 则记为 0 time.sleep(self.interval) def stop(self): self.running = False self.monitor_thread.join() def get_average_usage(self): avg_cpu_usage = np.mean(self.cpu_usage) avg_gpu_usage = np.mean(self.gpu_usage) return avg_cpu_usage, avg_gpu_usage def parse_voc_annotation(annotation_path): """ 解析单个VOC XML文件,并提取边界框和类别信息 :param annotation_path: XML标注文件路径 :return: List[[x1, y1, x2, y2, category_id]] """ voc_label_map = { # 类别名称到ID的映射字典 'aeroplane': 0, 'bicycle': 1, 'bird': 2, 'boat': 3, 'bottle': 4, 'bus': 5, 'car': 6, 'cat': 7, 'chair': 8, 'cow': 9, 'diningtable': 10, 'dog': 11, 'horse': 12, 'motorbike': 13, 'person': 14, 'pottedplant': 15, 'sheep': 16, 'sofa': 17, 'train': 18, 'tvmonitor': 19 } tree = ET.parse(annotation_path) root = tree.getroot() annotations = [] for obj in root.findall("object"): # 类别名称 class_name = obj.find("name").text category_id = voc_label_map.get(class_name, -1) # 找不到时返回 -1 # 边界框信息 bndbox = obj.find("bndbox") x1 = int(bndbox.find("xmin").text) y1 = int(bndbox.find("ymin").text) x2 = int(bndbox.find("xmax").text) y2 = int(bndbox.find("ymax").text) # 添加到结果 annotations.append([x1, y1, x2, y2, category_id]) return annotations def get_dataset_images_annotations(args, num): """ 从指定数据集的验证集中获取指定数量的图片及其标注信息 :param args: 参数 :param num: 获取数量 :return: (image_path, annotations) """ result = [] dataset_type = args.dataset_type # dataset_type: 数据集类型,可选参数:voc,coco if dataset_type == 'voc': voc_val_path = args.val_dataset_dir voc_annotations_path = os.path.join(voc_val_path, 'Annotations') voc_images_path = os.path.join(voc_val_path, 'JPEGImages') annotation_files = os.listdir(voc_annotations_path) selected_files = annotation_files[:num] # 前5张图片 for file in selected_files: annotation_path = os.path.join(voc_annotations_path, file) image_path = os.path.join(voc_images_path, file.replace('.xml', '.jpg')) annotations = parse_voc_annotation(annotation_path) result.append((image_path, annotations)) return result if dataset_type == 'coco': # 加载COCO验证集 coco = COCO(args.val_annotation) image_ids = coco.getImgIds()[:num] # 前5张图片 images = coco.loadImgs(image_ids) for image_info in images: img_path = f"{args.val_dataset_dir}/{image_info['file_name']}" # 获取标签信息 ann_ids = coco.getAnnIds(imgIds=image_info['id']) anns = coco.loadAnns(ann_ids) annotations = [] for anno in anns: gt_box = anno['bbox'] # [x, y, width, height] gt_box = [gt_box[0], gt_box[1], gt_box[0] + gt_box[2], gt_box[1] + gt_box[3]] gt_class = anno['category_id'] - 1 annotations.append([gt_box[0], gt_box[1], gt_box[2], gt_box[3], gt_class]) result.append((img_path, annotations)) return result def output_process(model_type, pred): """ 将目标检测模型输出结果转换为统一结果输出 :param model_type: 目标检测模型类型,可选值:yolox,ssd,faster-rcnn :param pred: 模型预测结果 :return: bbox,score,cls """ pred_box = None score = None pred_class = None if model_type == 'yolox': pred_box = pred[:4] # 前4个值为bbox:[xmin,ymin,xmax,ymax] score = pred[4] # 第5位为置信度 pred_class = int(pred[5]) # 第6位为类别 elif model_type == 'ssd': pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax] pred_class = int(pred[4]) # 第5位为类别 score = pred[5] # 第6位为置信度 elif model_type == 'faster-rcnn': pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax] score = pred[4] # 第5位为置信度 pred_class = int(pred[5]) # 第6位为类别 return pred_box, score, pred_class # 模型推理函数 def model_inference(args, model_filename, conf=0.3): """ 模型推理验证集目录下所有图片 :param args: 运行参数 :param model_filename: 模型文件 :param conf: 置信度阈值 :return: 验证集推理准确率 """ model_type = args.model_type # 目标检测模型类型,可选值:yolox,ssd,faster-rcnn # 以下使用GPU进行推理出现问题,需要较新的CUDA版本,默认使用CPU进行推理 # if ort.get_available_providers(): # session = ort.InferenceSession(model_filename, providers=['CUDAExecutionProvider']) # else: # session = ort.InferenceSession(model_filename) # 初始化计数 correct_count = 0 total_count = 0 iou_threshold = 0.5 part_dataset = get_dataset_images_annotations(args, 5) for image_path, annotations in part_dataset: # 使用模型推理流程定义进行模型推理 if model_type.lower() == 'yolox': preds = YOLOXInference(model_filename).predict(image_path) elif model_type.lower() == 'ssd': preds = SSDInference(model_filename).predict(image_path) preds = preds[0] # 只获取模型第一个输出 elif model_type.lower() == 'faster-rcnn': preds = FasterRCNNInference(model_filename).predict(image_path) preds = preds[0] else: raise Exception("目标检测模型类型参数不合法") for anno in annotations: gt_box = [anno[0], anno[1], anno[2], anno[3]] gt_class = anno[4] total_count += 1 # 比对推理结果 for pred in preds: if len(pred) == 0: continue pred_box, score, pred_class = output_process(model_type, pred) if score < conf: continue iou = calculate_iou(gt_box, pred_box) if iou > iou_threshold and pred_class == gt_class: correct_count += 1 break # 计算准确率 accuracy = correct_count / total_count return accuracy if __name__ == '__main__': parser = argparse.ArgumentParser(description='模型推理性能验证脚本') parser.add_argument('--model_type', default='faster-rcnn', type=str, help='目标检测模型类型:yolox、ssd、faster-rcnn') parser.add_argument('--dataset_type', default='voc', type=str, help='验证集的数据集格式,支持的参数:coco,voc') parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录') parser.add_argument('--origin_model_file', default=None, type=str, help='待测试原始模型的onnx文件') parser.add_argument('--watermark_model_file', default=None, type=str, help='待测试水印模型的onnx文件') parser.add_argument('--val_annotation', default=None, type=str, help='验证集标注文件,仅有coco数据集需要这个参数') # parser.add_argument('--val_dataset_dir', default="VOC2007", type=str, help='验证集目录') # parser.add_argument('--origin_model_file', default="models/origin/faster-rcnn/model.onnx", type=str, # help='待测试原始模型的onnx文件') args, _ = parser.parse_known_args() if args.origin_model_file is None: raise Exception("待测试模型的onnx文件不可为空") if args.val_dataset_dir is None: raise Exception("验证集目录不可为空") if args.model_type is None: raise Exception("目标检测模型类型不可为空") monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor.start() # 记录推理开始时间 start_time = time.time() # 进行模型推理 accuracy = model_inference(args, args.origin_model_file) # 记录推理结束时间 end_time = time.time() monitor.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu, avg_gpu = monitor.get_average_usage() print("原始模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu:.2f}%") print(f"平均 GPU 使用率:{avg_gpu:.2f}%") print(f"模型推理时间: {end_time - start_time:.2f} 秒") print(f"准确率: {accuracy * 100:.2f}%") if args.watermark_model_file: # 加入存在比对模型,进行再次推理,然后统计性能指标 time.sleep(20) monitor2 = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor2.start() # 记录推理开始时间 start_time2 = time.time() # 进行模型推理 accuracy2 = model_inference(args, args.watermark_model_file) # 记录推理结束时间 end_time2 = time.time() monitor2.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu2, avg_gpu2 = monitor2.get_average_usage() print("水印模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu2:.2f}%") print(f"平均 GPU 使用率:{avg_gpu2:.2f}%") print(f"模型推理时间: {end_time2 - start_time2:.2f} 秒") print(f"准确率: {accuracy2 * 100:.2f}%") print("------------------性能指标如下-------------------------") print(f"嵌入后模型推理准确率下降值:{(accuracy - accuracy2) * 100:.2f}%") print(f"算力资源消耗增加值:{(avg_cpu2 - avg_cpu):.2f}%") print( f"运行效率降低值: {((end_time2 - start_time2) - (end_time - start_time)) * 100 / (end_time - start_time):.2f} %")