瀏覽代碼

增加yolov5模型验证

zhy 1 周之前
父節點
當前提交
2688584572

+ 148 - 0
watermark_verify/inference/yolov5_inference.py

@@ -0,0 +1,148 @@
+import cv2
+import numpy as np
+from mindx.sdk import Tensor
+from mindx.sdk import base
+
+
+class YOLOV5Inference:
+    def __init__(self, model_path, input_size=(640, 640), swap=(2, 0, 1),
+                 score_thr=0.3, nms_thr=0.45, class_agnostic=True):
+        self.model_path = model_path
+        self.input_size = input_size
+        self.swap = swap
+        self.score_thr = score_thr
+        self.nms_thr = nms_thr
+        self.class_agnostic = class_agnostic
+
+        base.mx_init()
+        self.model = base.model(modelPath=self.model_path)
+        if self.model is None:
+            raise Exception("模型导入失败!请检查model_path。")
+
+    def input_processing(self, image_path):
+        img = cv2.imread(image_path)
+        if len(img.shape) == 3:
+            padded_img = np.ones((self.input_size[0], self.input_size[1], 3), dtype=np.uint8) * 114
+        else:
+            padded_img = np.ones(self.input_size, dtype=np.uint8) * 114
+
+        r = min(self.input_size[0] / img.shape[0], self.input_size[1] / img.shape[1])
+        resized_img = cv2.resize(
+            img,
+            (int(img.shape[1] * r), int(img.shape[0] * r)),
+            interpolation=cv2.INTER_LINEAR,
+        ).astype(np.uint8)
+        padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img
+
+        padded_img = padded_img.transpose(self.swap).copy()
+        padded_img = np.ascontiguousarray(padded_img, dtype=np.float32)
+        height, width, channels = img.shape
+        return padded_img, r, height, width, channels
+
+    def predict(self, image_path):
+        img, ratio, height, width, channels = self.input_processing(image_path)
+        input_tensors = img[None, :, :, :]
+        input_tensors = Tensor(input_tensors)
+
+        outputs = self.model.infer([input_tensors])[0]
+        outputs.to_host()
+        outputs = np.array(outputs)
+        dets = self.output_processing(outputs, ratio)
+        return dets
+
+    def output_processing(self, outputs, ratio):
+        outputs = outputs[0]  # [1, N, 85] -> [N, 85]
+        boxes = outputs[:, 0:4]
+        obj_conf = outputs[:, 4]
+        class_scores = outputs[:, 5:]
+
+        scores = obj_conf[:, None] * class_scores
+        dets = multiclass_nms(boxes, scores, nms_thr=self.nms_thr, score_thr=self.score_thr,
+                              class_agnostic=self.class_agnostic)
+
+        if dets is None or len(dets) == 0:
+            return np.zeros((0, 6))
+
+        # 恢复原图尺度
+        dets[:, :4] /= ratio
+        return dets
+
+
+def nms(boxes, scores, nms_thr):
+    x1 = boxes[:, 0]
+    y1 = boxes[:, 1]
+    x2 = boxes[:, 2]
+    y2 = boxes[:, 3]
+
+    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
+    order = scores.argsort()[::-1]
+
+    keep = []
+    while order.size > 0:
+        i = order[0]
+        keep.append(i)
+        xx1 = np.maximum(x1[i], x1[order[1:]])
+        yy1 = np.maximum(y1[i], y1[order[1:]])
+        xx2 = np.minimum(x2[i], x2[order[1:]])
+        yy2 = np.minimum(y2[i], y2[order[1:]])
+
+        w = np.maximum(0.0, xx2 - xx1 + 1)
+        h = np.maximum(0.0, yy2 - yy1 + 1)
+        inter = w * h
+        ovr = inter / (areas[i] + areas[order[1:]] - inter)
+
+        inds = np.where(ovr <= nms_thr)[0]
+        order = order[inds + 1]
+
+    return keep
+
+
+def multiclass_nms_class_agnostic(boxes, scores, nms_thr, score_thr):
+    cls_inds = scores.argmax(1)
+    cls_scores = scores[np.arange(len(cls_inds)), cls_inds]
+    valid_mask = cls_scores > score_thr
+
+    if valid_mask.sum() == 0:
+        return None
+
+    valid_boxes = boxes[valid_mask]
+    valid_scores = cls_scores[valid_mask]
+    valid_cls_inds = cls_inds[valid_mask]
+    keep = nms(valid_boxes, valid_scores, nms_thr)
+
+    if not keep:
+        return None
+
+    dets = np.concatenate(
+        [valid_boxes[keep], valid_scores[keep, None], valid_cls_inds[keep, None]], axis=1
+    )
+    return dets
+
+
+def multiclass_nms_class_aware(boxes, scores, nms_thr, score_thr):
+    final_dets = []
+    num_classes = scores.shape[1]
+    for cls_ind in range(num_classes):
+        cls_scores = scores[:, cls_ind]
+        valid_mask = cls_scores > score_thr
+        if valid_mask.sum() == 0:
+            continue
+        valid_boxes = boxes[valid_mask]
+        valid_scores = cls_scores[valid_mask]
+        keep = nms(valid_boxes, valid_scores, nms_thr)
+        if not keep:
+            continue
+        cls_inds = np.ones((len(keep), 1)) * cls_ind
+        dets = np.concatenate([valid_boxes[keep], valid_scores[keep, None], cls_inds], axis=1)
+        final_dets.append(dets)
+
+    if not final_dets:
+        return None
+    return np.concatenate(final_dets, axis=0)
+
+
+def multiclass_nms(boxes, scores, nms_thr, score_thr, class_agnostic=True):
+    if class_agnostic:
+        return multiclass_nms_class_agnostic(boxes, scores, nms_thr, score_thr)
+    else:
+        return multiclass_nms_class_aware(boxes, scores, nms_thr, score_thr)

+ 76 - 0
watermark_verify/process/yolov5_pytorch_blackbox_process.py

@@ -0,0 +1,76 @@
+"""
+yolov5基于pytorch框架的黑盒水印处理验证流程
+"""
+import os
+import cv2
+from watermark_verify.inference.yolov5_inference import YOLOV5Inference
+from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
+from watermark_verify.tools import parse_qrcode_label_file
+from watermark_verify.tools.evaluate_tool import calculate_ciou
+
+
+class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
+    def __init__(self, model_filename):
+        super(ModelWatermarkProcessor, self).__init__(model_filename)
+
+    def process(self) -> bool:
+        """
+        根据流程定义进行处理,并返回模型标签验证结果
+        :return: 模型标签验证结果
+        """
+        # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False
+        cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file)
+        accessed_cls = set()
+        for cls, images in cls_image_mapping.items():
+            for image in images:
+                image_path = os.path.join(self.trigger_dir, image)
+                detect_result = self.detect_secret_label(image_path, self.qrcode_positions_file, (640, 640))
+                if detect_result:
+                    accessed_cls.add(cls)
+                    break
+        if not accessed_cls == set(cls_image_mapping.keys()):  # 所有的分类都检测出模型水印,模型水印检测结果为True
+            return False
+
+        verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
+        return verify_result
+
+    def detect_secret_label(self, image_path, watermark_txt, input_shape) -> bool:
+        """
+        对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印
+        :param image_path: 输入图像路径
+        :param watermark_txt: 水印标签文件路径
+        :param input_shape: 模型输入图像大小,tuple
+        :return: 检测结果
+        """
+        img = cv2.imread(image_path)
+        height, width, channels = img.shape
+        x_center, y_center, w, h, cls = parse_qrcode_label_file.load_watermark_info(watermark_txt, image_path)
+        # 计算绝对坐标
+        x1 = (x_center - w / 2) * width
+        y1 = (y_center - h / 2) * height
+        x2 = (x_center + w / 2) * width
+        y2 = (y_center + h / 2) * height
+        watermark_box = [x1, y1, x2, y2, cls]
+        if len(watermark_box) == 0:
+            return False
+
+        dets = YOLOV5Inference(self.model_filename,input_size=input_shape).predict(image_path)
+
+        if dets is not None:
+            detect_result = detect_watermark(dets, watermark_box)
+            return detect_result
+        else:
+            return False
+
+
+def detect_watermark(dets, watermark_box, threshold=0.5):
+    if dets.size == 0:  # 检查是否为空
+        return False
+    for box, score, cls in zip(dets[:, :4], dets[:, 4], dets[:, 5]):
+        wm_box_coords = watermark_box[:4]
+        wm_cls = watermark_box[4]
+        if cls == wm_cls:
+            ciou = calculate_ciou(box, wm_box_coords)
+            if ciou > threshold:
+                return True
+    return False