""" 针对目标检测模型的测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算 需要安装指定python库实现功能 pip install psutil gputil pynvml pycocotools """ import argparse import os import sys rootpath = str(os.path.abspath(os.path.join(os.path.dirname(__file__), '../'))) sys.path.append(rootpath) import psutil import GPUtil import numpy as np import time from threading import Thread from pycocotools.coco import COCO import xml.etree.ElementTree as ET from watermark_verify.inference.rcnn_inference import FasterRCNNInference from watermark_verify.inference.ssd_inference import SSDInference from watermark_verify.inference.yolox_inference import YOLOXInference from watermark_verify.tools.evaluate_tool import calculate_iou # 定义监控函数 class UsageMonitor: def __init__(self, interval=0.5): self.interval = interval self.cpu_usage = [] self.gpu_usage = [] self.running = False def start(self): self.running = True self.monitor_thread = Thread(target=self._monitor) self.monitor_thread.start() def _monitor(self): while self.running: # 记录 CPU 使用率 self.cpu_usage.append(psutil.cpu_percent(interval=None)) # 记录 GPU 使用率 gpus = GPUtil.getGPUs() if gpus: self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率 else: self.gpu_usage.append(0) # 若没有 GPU 则记为 0 time.sleep(self.interval) def stop(self): self.running = False self.monitor_thread.join() def get_average_usage(self): avg_cpu_usage = np.mean(self.cpu_usage) avg_gpu_usage = np.mean(self.gpu_usage) return avg_cpu_usage, avg_gpu_usage def parse_voc_annotation(annotation_path): """ 解析单个VOC XML文件,并提取边界框和类别信息 :param annotation_path: XML标注文件路径 :return: List[[x1, y1, x2, y2, category_id]] """ voc_label_map = { # 类别名称到ID的映射字典 'aeroplane': 0, 'bicycle': 1, 'bird': 2, 'boat': 3, 'bottle': 4, 'bus': 5, 'car': 6, 'cat': 7, 'chair': 8, 'cow': 9, 'diningtable': 10, 'dog': 11, 'horse': 12, 'motorbike': 13, 'person': 14, 'pottedplant': 15, 'sheep': 16, 'sofa': 17, 'train': 18, 'tvmonitor': 19 } tree = ET.parse(annotation_path) root = tree.getroot() annotations = [] for obj in root.findall("object"): # 类别名称 class_name = obj.find("name").text category_id = voc_label_map.get(class_name, -1) # 找不到时返回 -1 # 边界框信息 bndbox = obj.find("bndbox") x1 = int(bndbox.find("xmin").text) y1 = int(bndbox.find("ymin").text) x2 = int(bndbox.find("xmax").text) y2 = int(bndbox.find("ymax").text) # 添加到结果 annotations.append([x1, y1, x2, y2, category_id]) return annotations def get_dataset_images_annotations(args, num): """ 从指定数据集的验证集中获取指定数量的图片及其标注信息 :param args: 参数 :param num: 获取数量 :return: (image_path, annotations) """ result = [] dataset_type = args.dataset_type # dataset_type: 数据集类型,可选参数:voc,coco if dataset_type == 'voc': voc_val_path = args.val_dataset_dir voc_annotations_path = os.path.join(voc_val_path, 'Annotations') voc_images_path = os.path.join(voc_val_path, 'JPEGImages') annotation_files = os.listdir(voc_annotations_path) selected_files = annotation_files[:num] # 前5张图片 for file in selected_files: annotation_path = os.path.join(voc_annotations_path, file) image_path = os.path.join(voc_images_path, file.replace('.xml', '.jpg')) annotations = parse_voc_annotation(annotation_path) result.append((image_path, annotations)) return result if dataset_type == 'coco': # 加载COCO验证集 coco = COCO(args.val_annotation) image_ids = coco.getImgIds()[:num] # 前5张图片 images = coco.loadImgs(image_ids) for image_info in images: img_path = f"{args.val_dataset_dir}/{image_info['file_name']}" # 获取标签信息 ann_ids = coco.getAnnIds(imgIds=image_info['id']) anns = coco.loadAnns(ann_ids) annotations = [] for anno in anns: gt_box = anno['bbox'] # [x, y, width, height] gt_box = [gt_box[0], gt_box[1], gt_box[0] + gt_box[2], gt_box[1] + gt_box[3]] gt_class = anno['category_id'] - 1 annotations.append([gt_box[0], gt_box[1], gt_box[2], gt_box[3], gt_class]) result.append((img_path, annotations)) return result def output_process(model_type, pred): """ 将目标检测模型输出结果转换为统一结果输出 :param model_type: 目标检测模型类型,可选值:yolox,ssd,faster-rcnn :param pred: 模型预测结果 :return: bbox,score,cls """ pred_box = None score = None pred_class = None if model_type == 'yolox': pred_box = pred[:4] # 前4个值为bbox:[xmin,ymin,xmax,ymax] score = pred[4] # 第5位为置信度 pred_class = int(pred[5]) # 第6位为类别 elif model_type == 'ssd': pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax] pred_class = int(pred[4]) # 第5位为类别 score = pred[5] # 第6位为置信度 elif model_type == 'faster-rcnn': pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax] score = pred[4] # 第5位为置信度 pred_class = int(pred[5]) # 第6位为类别 return pred_box, score, pred_class # 模型推理函数 def model_inference(args, model_filename, conf=0.3): """ 模型推理验证集目录下所有图片 :param args: 运行参数 :param model_filename: 模型文件 :param conf: 置信度阈值 :return: 验证集推理准确率 """ model_type = args.model_type # 目标检测模型类型,可选值:yolox,ssd,faster-rcnn # 以下使用GPU进行推理出现问题,需要较新的CUDA版本,默认使用CPU进行推理 # if ort.get_available_providers(): # session = ort.InferenceSession(model_filename, providers=['CUDAExecutionProvider']) # else: # session = ort.InferenceSession(model_filename) # 初始化计数 correct_count = 0 total_count = 0 iou_threshold = 0.5 part_dataset = get_dataset_images_annotations(args, 5) for image_path, annotations in part_dataset: # 使用模型推理流程定义进行模型推理 if model_type.lower() == 'yolox': preds = YOLOXInference(model_filename).predict(image_path) elif model_type.lower() == 'ssd': preds = SSDInference(model_filename).predict(image_path) preds = preds[0] # 只获取模型第一个输出 elif model_type.lower() == 'faster-rcnn': preds = FasterRCNNInference(model_filename).predict(image_path) preds = preds[0] else: raise Exception("目标检测模型类型参数不合法") for anno in annotations: gt_box = [anno[0], anno[1], anno[2], anno[3]] gt_class = anno[4] total_count += 1 # 比对推理结果 for pred in preds: if len(pred) == 0: continue pred_box, score, pred_class = output_process(model_type, pred) if score < conf: continue iou = calculate_iou(gt_box, pred_box) if iou > iou_threshold and pred_class == gt_class: correct_count += 1 break # 计算准确率 accuracy = correct_count / total_count return accuracy if __name__ == '__main__': parser = argparse.ArgumentParser(description='模型推理性能验证脚本') parser.add_argument('--model_type', default='faster-rcnn', type=str, help='目标检测模型类型:yolox、ssd、faster-rcnn') parser.add_argument('--dataset_type', default='voc', type=str, help='验证集的数据集格式,支持的参数:coco,voc') parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录') parser.add_argument('--origin_model_file', default=None, type=str, help='待测试原始模型的onnx文件') parser.add_argument('--watermark_model_file', default=None, type=str, help='待测试水印模型的onnx文件') parser.add_argument('--val_annotation', default=None, type=str, help='验证集标注文件,仅有coco数据集需要这个参数') # parser.add_argument('--val_dataset_dir', default="VOC2007", type=str, help='验证集目录') # parser.add_argument('--origin_model_file', default="models/origin/faster-rcnn/model.onnx", type=str, # help='待测试原始模型的onnx文件') args, _ = parser.parse_known_args() if args.origin_model_file is None: raise Exception("待测试模型的onnx文件不可为空") if args.val_dataset_dir is None: raise Exception("验证集目录不可为空") if args.model_type is None: raise Exception("目标检测模型类型不可为空") monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor.start() # 记录推理开始时间 start_time = time.time() # 进行模型推理 accuracy = model_inference(args, args.origin_model_file) # 记录推理结束时间 end_time = time.time() monitor.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu, avg_gpu = monitor.get_average_usage() print("原始模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu:.2f}%") print(f"平均 GPU 使用率:{avg_gpu:.2f}%") print(f"模型推理时间: {end_time - start_time:.2f} 秒") print(f"准确率: {accuracy * 100:.2f}%") if args.watermark_model_file: # 加入存在比对模型,进行再次推理,然后统计性能指标 time.sleep(20) monitor2 = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次 monitor2.start() # 记录推理开始时间 start_time2 = time.time() # 进行模型推理 accuracy2 = model_inference(args, args.watermark_model_file) # 记录推理结束时间 end_time2 = time.time() monitor2.stop() # 输出平均 CPU 和 GPU 使用率 avg_cpu2, avg_gpu2 = monitor2.get_average_usage() print("水印模型推理性能:") print(f"平均 CPU 使用率:{avg_cpu2:.2f}%") print(f"平均 GPU 使用率:{avg_gpu2:.2f}%") print(f"模型推理时间: {end_time2 - start_time2:.2f} 秒") print(f"准确率: {accuracy2 * 100:.2f}%") print("------------------性能指标如下-------------------------") print(f"嵌入后模型推理准确率下降值:{(accuracy - accuracy2) * 100:.2f}%") print(f"算力资源消耗增加值:{(avg_cpu2 - avg_cpu):.2f}%") print( f"运行效率降低值: {((end_time2 - start_time2) - (end_time - start_time)) * 100 / (end_time - start_time):.2f} %")