123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- """
- 针对目标检测模型的测试性能损失脚本,通过比较推理过程中CPU、GPU占用、推理时间来进行计算
- 需要安装指定python库实现功能
- pip install psutil gputil pynvml pycocotools
- """
- import argparse
- import os
- import sys
- rootpath = str(os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))
- sys.path.append(rootpath)
- import psutil
- import GPUtil
- import numpy as np
- import time
- from threading import Thread
- from pycocotools.coco import COCO
- import xml.etree.ElementTree as ET
- from watermark_verify.inference.rcnn_inference import FasterRCNNInference
- from watermark_verify.inference.ssd_inference import SSDInference
- from watermark_verify.inference.yolox_inference import YOLOXInference
- from watermark_verify.tools.evaluate_tool import calculate_iou
- # 定义监控函数
- class UsageMonitor:
- def __init__(self, interval=0.5):
- self.interval = interval
- self.cpu_usage = []
- self.gpu_usage = []
- self.running = False
- def start(self):
- self.running = True
- self.monitor_thread = Thread(target=self._monitor)
- self.monitor_thread.start()
- def _monitor(self):
- while self.running:
- # 记录 CPU 使用率
- self.cpu_usage.append(psutil.cpu_percent(interval=None))
- # 记录 GPU 使用率
- gpus = GPUtil.getGPUs()
- if gpus:
- self.gpu_usage.append(gpus[0].load * 100) # 获取第一个 GPU 的使用率
- else:
- self.gpu_usage.append(0) # 若没有 GPU 则记为 0
- time.sleep(self.interval)
- def stop(self):
- self.running = False
- self.monitor_thread.join()
- def get_average_usage(self):
- avg_cpu_usage = np.mean(self.cpu_usage)
- avg_gpu_usage = np.mean(self.gpu_usage)
- return avg_cpu_usage, avg_gpu_usage
- def parse_voc_annotation(annotation_path):
- """
- 解析单个VOC XML文件,并提取边界框和类别信息
- :param annotation_path: XML标注文件路径
- :return: List[[x1, y1, x2, y2, category_id]]
- """
- voc_label_map = { # 类别名称到ID的映射字典
- 'aeroplane': 0,
- 'bicycle': 1,
- 'bird': 2,
- 'boat': 3,
- 'bottle': 4,
- 'bus': 5,
- 'car': 6,
- 'cat': 7,
- 'chair': 8,
- 'cow': 9,
- 'diningtable': 10,
- 'dog': 11,
- 'horse': 12,
- 'motorbike': 13,
- 'person': 14,
- 'pottedplant': 15,
- 'sheep': 16,
- 'sofa': 17,
- 'train': 18,
- 'tvmonitor': 19
- }
- tree = ET.parse(annotation_path)
- root = tree.getroot()
- annotations = []
- for obj in root.findall("object"):
- # 类别名称
- class_name = obj.find("name").text
- category_id = voc_label_map.get(class_name, -1) # 找不到时返回 -1
- # 边界框信息
- bndbox = obj.find("bndbox")
- x1 = int(bndbox.find("xmin").text)
- y1 = int(bndbox.find("ymin").text)
- x2 = int(bndbox.find("xmax").text)
- y2 = int(bndbox.find("ymax").text)
- # 添加到结果
- annotations.append([x1, y1, x2, y2, category_id])
- return annotations
- def get_dataset_images_annotations(args, num):
- """
- 从指定数据集的验证集中获取指定数量的图片及其标注信息
- :param args: 参数
- :param num: 获取数量
- :return: (image_path, annotations)
- """
- result = []
- dataset_type = args.dataset_type # dataset_type: 数据集类型,可选参数:voc,coco
- if dataset_type == 'voc':
- voc_val_path = args.val_dataset_dir
- voc_annotations_path = os.path.join(voc_val_path, 'Annotations')
- voc_images_path = os.path.join(voc_val_path, 'JPEGImages')
- annotation_files = os.listdir(voc_annotations_path)
- selected_files = annotation_files[:num] # 前5张图片
- for file in selected_files:
- annotation_path = os.path.join(voc_annotations_path, file)
- image_path = os.path.join(voc_images_path, file.replace('.xml', '.jpg'))
- annotations = parse_voc_annotation(annotation_path)
- result.append((image_path, annotations))
- return result
- if dataset_type == 'coco':
- # 加载COCO验证集
- coco = COCO(args.val_annotation)
- image_ids = coco.getImgIds()[:num] # 前5张图片
- images = coco.loadImgs(image_ids)
- for image_info in images:
- img_path = f"{args.val_dataset_dir}/{image_info['file_name']}"
- # 获取标签信息
- ann_ids = coco.getAnnIds(imgIds=image_info['id'])
- anns = coco.loadAnns(ann_ids)
- annotations = []
- for anno in anns:
- gt_box = anno['bbox'] # [x, y, width, height]
- gt_box = [gt_box[0], gt_box[1], gt_box[0] + gt_box[2], gt_box[1] + gt_box[3]]
- gt_class = anno['category_id'] - 1
- annotations.append([gt_box[0], gt_box[1], gt_box[2], gt_box[3], gt_class])
- result.append((img_path, annotations))
- return result
- def output_process(model_type, pred):
- """
- 将目标检测模型输出结果转换为统一结果输出
- :param model_type: 目标检测模型类型,可选值:yolox,ssd,faster-rcnn
- :param pred: 模型预测结果
- :return: bbox,score,cls
- """
- pred_box = None
- score = None
- pred_class = None
- if model_type == 'yolox':
- pred_box = pred[:4] # 前4个值为bbox:[xmin,ymin,xmax,ymax]
- score = pred[4] # 第5位为置信度
- pred_class = int(pred[5]) # 第6位为类别
- elif model_type == 'ssd':
- pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax]
- pred_class = int(pred[4]) # 第5位为类别
- score = pred[5] # 第6位为置信度
- elif model_type == 'faster-rcnn':
- pred_box = np.array([pred[1], pred[0], pred[3], pred[2]]) # 前4个值为bbox:[ymin,xmin,ymax,xmax]
- score = pred[4] # 第5位为置信度
- pred_class = int(pred[5]) # 第6位为类别
- return pred_box, score, pred_class
- # 模型推理函数
- def model_inference(args, model_filename, conf=0.3):
- """
- 模型推理验证集目录下所有图片
- :param args: 运行参数
- :param model_filename: 模型文件
- :param conf: 置信度阈值
- :return: 验证集推理准确率
- """
- model_type = args.model_type # 目标检测模型类型,可选值:yolox,ssd,faster-rcnn
- # 以下使用GPU进行推理出现问题,需要较新的CUDA版本,默认使用CPU进行推理
- # if ort.get_available_providers():
- # session = ort.InferenceSession(model_filename, providers=['CUDAExecutionProvider'])
- # else:
- # session = ort.InferenceSession(model_filename)
- # 初始化计数
- correct_count = 0
- total_count = 0
- iou_threshold = 0.5
- part_dataset = get_dataset_images_annotations(args, 5)
- for image_path, annotations in part_dataset:
- # 使用模型推理流程定义进行模型推理
- if model_type.lower() == 'yolox':
- preds = YOLOXInference(model_filename).predict(image_path)
- elif model_type.lower() == 'ssd':
- preds = SSDInference(model_filename).predict(image_path)
- preds = preds[0] # 只获取模型第一个输出
- elif model_type.lower() == 'faster-rcnn':
- preds = FasterRCNNInference(model_filename).predict(image_path)
- preds = preds[0]
- else:
- raise Exception("目标检测模型类型参数不合法")
- for anno in annotations:
- gt_box = [anno[0], anno[1], anno[2], anno[3]]
- gt_class = anno[4]
- total_count += 1
- # 比对推理结果
- for pred in preds:
- if len(pred) == 0:
- continue
- pred_box, score, pred_class = output_process(model_type, pred)
- if score < conf:
- continue
- iou = calculate_iou(gt_box, pred_box)
- if iou > iou_threshold and pred_class == gt_class:
- correct_count += 1
- break
- # 计算准确率
- accuracy = correct_count / total_count
- return accuracy
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='模型推理性能验证脚本')
- parser.add_argument('--model_type', default='faster-rcnn', type=str, help='目标检测模型类型:yolox、ssd、faster-rcnn')
- parser.add_argument('--dataset_type', default='voc', type=str, help='验证集的数据集格式,支持的参数:coco,voc')
- parser.add_argument('--val_dataset_dir', default=None, type=str, help='验证集目录')
- parser.add_argument('--origin_model_file', default=None, type=str, help='待测试原始模型的onnx文件')
- parser.add_argument('--watermark_model_file', default=None, type=str, help='待测试水印模型的onnx文件')
- parser.add_argument('--val_annotation', default=None, type=str,
- help='验证集标注文件,仅有coco数据集需要这个参数')
- # parser.add_argument('--val_dataset_dir', default="VOC2007", type=str, help='验证集目录')
- # parser.add_argument('--origin_model_file', default="models/origin/faster-rcnn/model.onnx", type=str,
- # help='待测试原始模型的onnx文件')
- args, _ = parser.parse_known_args()
- if args.origin_model_file is None:
- raise Exception("待测试模型的onnx文件不可为空")
- if args.val_dataset_dir is None:
- raise Exception("验证集目录不可为空")
- if args.model_type is None:
- raise Exception("目标检测模型类型不可为空")
- monitor = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次
- monitor.start()
- # 记录推理开始时间
- start_time = time.time()
- # 进行模型推理
- accuracy = model_inference(args, args.origin_model_file)
- # 记录推理结束时间
- end_time = time.time()
- monitor.stop()
- # 输出平均 CPU 和 GPU 使用率
- avg_cpu, avg_gpu = monitor.get_average_usage()
- print("原始模型推理性能:")
- print(f"平均 CPU 使用率:{avg_cpu:.2f}%")
- print(f"平均 GPU 使用率:{avg_gpu:.2f}%")
- print(f"模型推理时间: {end_time - start_time:.2f} 秒")
- print(f"准确率: {accuracy * 100:.2f}%")
- if args.watermark_model_file: # 加入存在比对模型,进行再次推理,然后统计性能指标
- time.sleep(20)
- monitor2 = UsageMonitor(interval=0.5) # 每隔 0.5 秒采样一次
- monitor2.start()
- # 记录推理开始时间
- start_time2 = time.time()
- # 进行模型推理
- accuracy2 = model_inference(args, args.watermark_model_file)
- # 记录推理结束时间
- end_time2 = time.time()
- monitor2.stop()
- # 输出平均 CPU 和 GPU 使用率
- avg_cpu2, avg_gpu2 = monitor2.get_average_usage()
- print("水印模型推理性能:")
- print(f"平均 CPU 使用率:{avg_cpu2:.2f}%")
- print(f"平均 GPU 使用率:{avg_gpu2:.2f}%")
- print(f"模型推理时间: {end_time2 - start_time2:.2f} 秒")
- print(f"准确率: {accuracy2 * 100:.2f}%")
- print("------------------性能指标如下-------------------------")
- print(f"嵌入后模型推理准确率下降值:{(accuracy - accuracy2) * 100:.2f}%")
- print(f"算力资源消耗增加值:{(avg_cpu2 - avg_cpu):.2f}%")
- print(
- f"运行效率降低值: {((end_time2 - start_time2) - (end_time - start_time)) * 100 / (end_time - start_time):.2f} %")
|