|
@@ -0,0 +1,297 @@
|
|
|
+"""
|
|
|
+针对目标检测模型的测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算
|
|
|
+需要安装指定python库实现功能
|
|
|
+pip install psutil gputil pynvml pycocotools
|
|
|
+"""
|
|
|
+import argparse
|
|
|
+import os
|
|
|
+
|
|
|
+import psutil
|
|
|
+import GPUtil
|
|
|
+import numpy as np
|
|
|
+import time
|
|
|
+from threading import Thread
|
|
|
+from pycocotools.coco import COCO
|
|
|
+import xml.etree.ElementTree as ET
|
|
|
+
|
|
|
+from watermark_verify.inference.rcnn_inference import FasterRCNNInference
|
|
|
+from watermark_verify.inference.ssd_inference import SSDInference
|
|
|
+from watermark_verify.inference.yolox_inference import YOLOXInference
|
|
|
+from watermark_verify.tools.evaluate_tool import calculate_iou
|
|
|
+
|
|
|
+
|
|
|
+# 定义监控函数
|
|
|
+class UsageMonitor:
|
|
|
+ def __init__(self, interval=0.5):
|
|
|
+ self.interval = interval
|
|
|
+ self.cpu_usage = []
|
|
|
+ self.gpu_usage = []
|
|
|
+ self.running = False
|
|
|
+
|
|
|
+ def start(self):
|
|
|
+ self.running = True
|
|
|
+ self.monitor_thread = Thread(target=self._monitor)
|
|
|
+ self.monitor_thread.start()
|
|
|
+
|
|
|
+ def _monitor(self):
|
|
|
+ while self.running:
|
|
|
+ # 记录 CPU 使用率
|
|
|
+ self.cpu_usage.append(psutil.cpu_percent(interval=None))
|
|
|
+
|
|
|
+ # 记录 GPU 使用率
|
|
|
+ gpus = GPUtil.getGPUs()
|
|
|
+ if gpus:
|
|
|
+ self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率
|
|
|
+ else:
|
|
|
+ self.gpu_usage.append(0) # 若没有 GPU 则记为 0
|
|
|
+
|
|
|
+ time.sleep(self.interval)
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ self.running = False
|
|
|
+ self.monitor_thread.join()
|
|
|
+
|
|
|
+ def get_average_usage(self):
|
|
|
+ avg_cpu_usage = np.mean(self.cpu_usage)
|
|
|
+ avg_gpu_usage = np.mean(self.gpu_usage)
|
|
|
+ return avg_cpu_usage, avg_gpu_usage
|
|
|
+
|
|
|
+
|
|
|
+def parse_voc_annotation(annotation_path):
|
|
|
+ """
|
|
|
+ 解析单个VOC XML文件,并提取边界框和类别信息
|
|
|
+ :param annotation_path: XML标注文件路径
|
|
|
+ :return: List[[x1, y1, x2, y2, category_id]]
|
|
|
+ """
|
|
|
+ voc_label_map = { # 类别名称到ID的映射字典
|
|
|
+ 'aeroplane': 0,
|
|
|
+ 'bicycle': 1,
|
|
|
+ 'bird': 2,
|
|
|
+ 'boat': 3,
|
|
|
+ 'bottle': 4,
|
|
|
+ 'bus': 5,
|
|
|
+ 'car': 6,
|
|
|
+ 'cat': 7,
|
|
|
+ 'chair': 8,
|
|
|
+ 'cow': 9,
|
|
|
+ 'diningtable': 10,
|
|
|
+ 'dog': 11,
|
|
|
+ 'horse': 12,
|
|
|
+ 'motorbike': 13,
|
|
|
+ 'person': 14,
|
|
|
+ 'pottedplant': 15,
|
|
|
+ 'sheep': 16,
|
|
|
+ 'sofa': 17,
|
|
|
+ 'train': 18,
|
|
|
+ 'tvmonitor': 19
|
|
|
+ }
|
|
|
+
|
|
|
+ tree = ET.parse(annotation_path)
|
|
|
+ root = tree.getroot()
|
|
|
+
|
|
|
+ annotations = []
|
|
|
+ for obj in root.findall("object"):
|
|
|
+ # 类别名称
|
|
|
+ class_name = obj.find("name").text
|
|
|
+ category_id = voc_label_map.get(class_name, -1) # 找不到时返回 -1
|
|
|
+
|
|
|
+ # 边界框信息
|
|
|
+ bndbox = obj.find("bndbox")
|
|
|
+ x1 = int(bndbox.find("xmin").text)
|
|
|
+ y1 = int(bndbox.find("ymin").text)
|
|
|
+ x2 = int(bndbox.find("xmax").text)
|
|
|
+ y2 = int(bndbox.find("ymax").text)
|
|
|
+
|
|
|
+ # 添加到结果
|
|
|
+ annotations.append([x1, y1, x2, y2, category_id])
|
|
|
+
|
|
|
+ return annotations
|
|
|
+
|
|
|
+
|
|
|
+def get_dataset_images_annotations(args, num):
|
|
|
+ """
|
|
|
+ 从指定数据集的验证集中获取指定数量的图片及其标注信息
|
|
|
+ :param args: 参数
|
|
|
+ :param num: 获取数量
|
|
|
+ :return: (image_path, annotations)
|
|
|
+ """
|
|
|
+ result = []
|
|
|
+ dataset_type = args.dataset_type # dataset_type: 数据集类型,可选参数:voc,coco
|
|
|
+ if dataset_type == 'voc':
|
|
|
+ voc_val_path = args.val_dataset_dir
|
|
|
+ voc_annotations_path = os.path.join(voc_val_path, 'Annotations')
|
|
|
+ voc_images_path = os.path.join(voc_val_path, 'JPEGImages')
|
|
|
+
|
|
|
+ annotation_files = os.listdir(voc_annotations_path)
|
|
|
+ selected_files = annotation_files[:num] # 前5张图片
|
|
|
+
|
|
|
+ for file in selected_files:
|
|
|
+ annotation_path = os.path.join(voc_annotations_path, file)
|
|
|
+ image_path = os.path.join(voc_images_path, file.replace('.xml', '.jpg'))
|
|
|
+ annotations = parse_voc_annotation(annotation_path)
|
|
|
+ result.append((image_path, annotations))
|
|
|
+ return result
|
|
|
+ if dataset_type == 'coco':
|
|
|
+ # 加载COCO验证集
|
|
|
+ coco = COCO(args.val_annotation)
|
|
|
+ image_ids = coco.getImgIds()[:num] # 前5张图片
|
|
|
+ images = coco.loadImgs(image_ids)
|
|
|
+ for image_info in images:
|
|
|
+ img_path = f"{args.val_dataset_dir}/{image_info['file_name']}"
|
|
|
+ # 获取标签信息
|
|
|
+ ann_ids = coco.getAnnIds(imgIds=image_info['id'])
|
|
|
+ anns = coco.loadAnns(ann_ids)
|
|
|
+ annotations = []
|
|
|
+ for anno in anns:
|
|
|
+ gt_box = anno['bbox'] # [x, y, width, height]
|
|
|
+ gt_box = [gt_box[0], gt_box[1], gt_box[0] + gt_box[2], gt_box[1] + gt_box[3]]
|
|
|
+ gt_class = anno['category_id'] - 1
|
|
|
+ annotations.append([gt_box[0], gt_box[1], gt_box[2], gt_box[3], gt_class])
|
|
|
+ result.append((img_path, annotations))
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def output_process(model_type, pred):
|
|
|
+ """
|
|
|
+ 将目标检测模型输出结果转换为统一结果输出
|
|
|
+ :param model_type: 目标检测模型类型,可选值:yolox,ssd,faster-rcnn
|
|
|
+ :param pred: 模型预测结果
|
|
|
+ :return: bbox,score,cls
|
|
|
+ """
|
|
|
+ pred_box = None
|
|
|
+ score = None
|
|
|
+ pred_class = None
|
|
|
+ if model_type == 'yolox':
|
|
|
+ pred_box = pred[:4] # 前4个值为bbox:[xmin,ymin,xmax,ymax]
|
|
|
+ score = pred[4] # 第5位为置信度
|
|
|
+ pred_class = int(pred[5]) # 第6位为类别
|
|
|
+ elif model_type == 'ssd':
|
|
|
+ pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax]
|
|
|
+ pred_class = int(pred[4]) # 第5位为类别
|
|
|
+ score = pred[5] # 第6位为置信度
|
|
|
+ elif model_type == 'faster-rcnn':
|
|
|
+ pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax]
|
|
|
+ score = pred[4] # 第5位为置信度
|
|
|
+ pred_class = int(pred[5]) # 第6位为类别
|
|
|
+ return pred_box, score, pred_class
|
|
|
+
|
|
|
+
|
|
|
+# 模型推理函数
|
|
|
+def model_inference(args, model_filename, conf=0.3):
|
|
|
+ """
|
|
|
+ 模型推理验证集目录下所有图片
|
|
|
+ :param args: 运行参数
|
|
|
+ :param model_filename: 模型文件
|
|
|
+ :param conf: 置信度阈值
|
|
|
+ :return: 验证集推理准确率
|
|
|
+ """
|
|
|
+ model_type = args.model_type # 目标检测模型类型,可选值:yolox,ssd,faster-rcnn
|
|
|
+
|
|
|
+ # 以下使用GPU进行推理出现问题,需要较新的CUDA版本,默认使用CPU进行推理
|
|
|
+ # if ort.get_available_providers():
|
|
|
+ # session = ort.InferenceSession(model_filename, providers=['CUDAExecutionProvider'])
|
|
|
+ # else:
|
|
|
+ # session = ort.InferenceSession(model_filename)
|
|
|
+
|
|
|
+ # 初始化计数
|
|
|
+ correct_count = 0
|
|
|
+ total_count = 0
|
|
|
+ iou_threshold = 0.5
|
|
|
+ part_dataset = get_dataset_images_annotations(args, 5)
|
|
|
+ for image_path, annotations in part_dataset:
|
|
|
+ # 使用模型推理流程定义进行模型推理
|
|
|
+ if model_type.lower() == 'yolox':
|
|
|
+ preds = YOLOXInference(model_filename).predict(image_path)
|
|
|
+ elif model_type.lower() == 'ssd':
|
|
|
+ preds = SSDInference(model_filename).predict(image_path)
|
|
|
+ preds = preds[0] # 只获取模型第一个输出
|
|
|
+ elif model_type.lower() == 'faster-rcnn':
|
|
|
+ preds = FasterRCNNInference(model_filename).predict(image_path)
|
|
|
+ preds = preds[0]
|
|
|
+ else:
|
|
|
+ raise Exception("目标检测模型类型参数不合法")
|
|
|
+ for anno in annotations:
|
|
|
+ gt_box = [anno[0], anno[1], anno[2], anno[3]]
|
|
|
+ gt_class = anno[4]
|
|
|
+ total_count += 1
|
|
|
+
|
|
|
+ # 比对推理结果
|
|
|
+ for pred in preds:
|
|
|
+ if len(pred) == 0:
|
|
|
+ continue
|
|
|
+ pred_box, score, pred_class = output_process(model_type, pred)
|
|
|
+ if score < conf:
|
|
|
+ continue
|
|
|
+ iou = calculate_iou(gt_box, pred_box)
|
|
|
+ if iou > iou_threshold and pred_class == gt_class:
|
|
|
+ correct_count += 1
|
|
|
+ break
|
|
|
+
|
|
|
+ # 计算准确率
|
|
|
+ accuracy = correct_count / total_count
|
|
|
+
|
|
|
+ return accuracy
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ parser = argparse.ArgumentParser(description='模型推理性能验证脚本')
|
|
|
+ parser.add_argument('--model_type', default='faster-rcnn', type=str, help='目标检测模型类型:yolox、ssd、faster-rcnn')
|
|
|
+ parser.add_argument('--dataset_type', default='voc', type=str, help='验证集的数据集格式,支持的参数:coco,voc')
|
|
|
+ parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录')
|
|
|
+ parser.add_argument('--origin_model_file', default=None, type=str, help='待测试原始模型的onnx文件')
|
|
|
+ parser.add_argument('--watermark_model_file', default=None, type=str, help='待测试水印模型的onnx文件')
|
|
|
+ parser.add_argument('--val_annotation', default=None, type=str,
|
|
|
+ help='验证集标注文件,仅有coco数据集需要这个参数')
|
|
|
+ # parser.add_argument('--val_dataset_dir', default="VOC2007", type=str, help='验证集目录')
|
|
|
+ # parser.add_argument('--origin_model_file', default="models/origin/faster-rcnn/model.onnx", type=str,
|
|
|
+ # help='待测试原始模型的onnx文件')
|
|
|
+
|
|
|
+ args, _ = parser.parse_known_args()
|
|
|
+ if args.origin_model_file is None:
|
|
|
+ raise Exception("待测试模型的onnx文件不可为空")
|
|
|
+ if args.val_dataset_dir is None:
|
|
|
+ raise Exception("验证集目录不可为空")
|
|
|
+ if args.model_type is None:
|
|
|
+ raise Exception("目标检测模型类型不可为空")
|
|
|
+
|
|
|
+ monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次
|
|
|
+ monitor.start()
|
|
|
+ # 记录推理开始时间
|
|
|
+ start_time = time.time()
|
|
|
+ # 进行模型推理
|
|
|
+ accuracy = model_inference(args, args.origin_model_file)
|
|
|
+ # 记录推理结束时间
|
|
|
+ end_time = time.time()
|
|
|
+ monitor.stop()
|
|
|
+ # 输出平均 CPU 和 GPU 使用率
|
|
|
+ avg_cpu, avg_gpu = monitor.get_average_usage()
|
|
|
+ print("原始模型推理性能:")
|
|
|
+ print(f"平均 CPU 使用率:{avg_cpu:.2f}%")
|
|
|
+ print(f"平均 GPU 使用率:{avg_gpu:.2f}%")
|
|
|
+ print(f"模型推理时间: {end_time - start_time:.2f} 秒")
|
|
|
+ print(f"准确率: {accuracy * 100:.2f}%")
|
|
|
+
|
|
|
+ if args.watermark_model_file: # 加入存在比对模型,进行再次推理,然后统计性能指标
|
|
|
+ time.sleep(20)
|
|
|
+ monitor2 = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次
|
|
|
+ monitor2.start()
|
|
|
+ # 记录推理开始时间
|
|
|
+ start_time2 = time.time()
|
|
|
+ # 进行模型推理
|
|
|
+ accuracy2 = model_inference(args, args.watermark_model_file)
|
|
|
+ # 记录推理结束时间
|
|
|
+ end_time2 = time.time()
|
|
|
+ monitor2.stop()
|
|
|
+ # 输出平均 CPU 和 GPU 使用率
|
|
|
+ avg_cpu2, avg_gpu2 = monitor2.get_average_usage()
|
|
|
+ print("水印模型推理性能:")
|
|
|
+ print(f"平均 CPU 使用率:{avg_cpu2:.2f}%")
|
|
|
+ print(f"平均 GPU 使用率:{avg_gpu2:.2f}%")
|
|
|
+ print(f"模型推理时间: {end_time2 - start_time2:.2f} 秒")
|
|
|
+ print(f"准确率: {accuracy2 * 100:.2f}%")
|
|
|
+
|
|
|
+ print("------------------性能指标如下-------------------------")
|
|
|
+ print(f"嵌入后模型推理准确率下降值:{(accuracy - accuracy2) * 100:.2f}%")
|
|
|
+ print(f"算力资源消耗增加值:{(avg_cpu2 - avg_cpu):.2f}%")
|
|
|
+ print(
|
|
|
+ f"运行效率降低值: {((end_time2 - start_time2) - (end_time - start_time)) * 100 / (end_time - start_time):.2f} %")
|