浏览代码

增加yolov5实现

zhy 1 周之前
父节点
当前提交
40556ecf77

+ 129 - 0
watermark_verify/inference/yolov5_inference.py

@@ -0,0 +1,129 @@
+import cv2
+import numpy as np
+import onnxruntime as ort
+
+
+class YOLOV5Inference:
+    def __init__(self, model_path, input_size=(640, 640), swap=(2, 0, 1)):
+        """
+        初始化 YOLOv5 模型推理流程
+        :param model_path: 模型 ONNX 路径
+        :param input_size: 模型输入尺寸,默认 640x640
+        :param swap: 图像轴变换顺序,默认为 (2,0,1) 即 HWC -> CHW
+        """
+        self.model_path = model_path
+        self.input_size = input_size
+        self.swap = swap
+
+        # 初始化 ONNX 推理会话
+        self.session = ort.InferenceSession(self.model_path)
+        self.input_name = self.session.get_inputs()[0].name
+
+    def input_processing(self, image_path):
+        """
+        图像预处理:读取图像、Letterbox 缩放、归一化、CHW 转换
+        :param image_path: 图像路径
+        :return: 模型输入张量, 原始图像, ratio 比例
+        """
+        img = cv2.imread(image_path)
+        h0, w0 = img.shape[:2]
+        r = min(self.input_size[0] / h0, self.input_size[1] / w0)
+
+        resized_img = cv2.resize(img, (int(w0 * r), int(h0 * r)), interpolation=cv2.INTER_LINEAR)
+        padded_img = np.full((self.input_size[0], self.input_size[1], 3), 114, dtype=np.uint8)
+        padded_img[:resized_img.shape[0], :resized_img.shape[1]] = resized_img
+
+        # BGR -> RGB, HWC -> CHW
+        img_tensor = padded_img[:, :, ::-1].transpose(self.swap).astype(np.float32)
+        img_tensor /= 255.0  # 归一化到 [0,1]
+        img_tensor = np.expand_dims(img_tensor, axis=0)  # 添加 batch 维度
+        return img_tensor, img, r
+
+    def predict(self, image_path, conf_thres=0.25, iou_thres=0.45):
+        """
+        对单张图像进行 YOLOv5 推理并返回处理后的检测结果
+        :param image_path: 图像路径
+        :param conf_thres: 置信度阈值
+        :param iou_thres: NMS IOU 阈值
+        :return: Numpy 数组,每行 [x1, y1, x2, y2, conf, cls]
+        """
+        input_tensor, raw_img, ratio = self.input_processing(image_path)
+        outputs = self.session.run(None, {self.input_name: input_tensor})[0]  # [1, N, 6/85]
+        outputs = self.output_processing(outputs, ratio, conf_thres, iou_thres)
+        return outputs
+
+    def output_processing(self, outputs, ratio, conf_thres, iou_thres):
+        """
+        解析 ONNX 输出并进行后处理(包含 NMS)
+        :param outputs: 原始模型输出
+        :param ratio: 输入图像缩放比例
+        :return: NMS 后的结果 [x1, y1, x2, y2, conf, cls]
+        """
+        preds = outputs[0]  # [N, 6] 或 [N, 85]
+        if preds.shape[1] == 6:
+            # already in [x1, y1, x2, y2, conf, cls]
+            boxes = preds[:, :4]
+            scores = preds[:, 4]
+            classes = preds[:, 5]
+        else:
+            boxes = preds[:, :4]
+            scores_all = preds[:, 5:]
+            classes = np.argmax(scores_all, axis=1)
+            scores = scores_all[np.arange(len(classes)), classes]
+
+        # 置信度筛选
+        mask = scores > conf_thres
+        boxes = boxes[mask]
+        scores = scores[mask]
+        classes = classes[mask]
+
+        if boxes.shape[0] == 0:
+            return np.array([])
+
+        # 还原坐标
+        boxes /= ratio
+
+        # 执行 NMS
+        indices = nms(boxes, scores, iou_thres)
+        dets = np.concatenate([
+            boxes[indices],
+            scores[indices, None],
+            classes[indices, None].astype(np.float32)
+        ], axis=1)
+        return dets
+
+
+def nms(boxes, scores, iou_threshold):
+    """
+    单类 NMS
+    :param boxes: [N, 4] => x1, y1, x2, y2
+    :param scores: [N,]
+    :param iou_threshold: 阈值
+    :return: 保留索引
+    """
+    x1 = boxes[:, 0]
+    y1 = boxes[:, 1]
+    x2 = boxes[:, 2]
+    y2 = boxes[:, 3]
+
+    areas = (x2 - x1) * (y2 - y1)
+    order = scores.argsort()[::-1]
+    keep = []
+
+    while order.size > 0:
+        i = order[0]
+        keep.append(i)
+
+        xx1 = np.maximum(x1[i], x1[order[1:]])
+        yy1 = np.maximum(y1[i], y1[order[1:]])
+        xx2 = np.minimum(x2[i], x2[order[1:]])
+        yy2 = np.minimum(y2[i], y2[order[1:]])
+
+        w = np.maximum(0.0, xx2 - xx1)
+        h = np.maximum(0.0, yy2 - yy1)
+        inter = w * h
+        iou = inter / (areas[i] + areas[order[1:]] - inter)
+
+        inds = np.where(iou <= iou_threshold)[0]
+        order = order[inds + 1]
+    return keep

+ 76 - 0
watermark_verify/process/yolov5_pytorch_blackbox_process.py

@@ -0,0 +1,76 @@
+"""
+yolov5基于pytorch框架的黑盒水印处理验证流程
+"""
+import os
+import cv2
+from watermark_verify.inference.yolov5_inference import YOLOV5Inference
+from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
+from watermark_verify.tools import parse_qrcode_label_file
+from watermark_verify.tools.evaluate_tool import calculate_ciou
+
+
+class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
+    def __init__(self, model_filename):
+        super(ModelWatermarkProcessor, self).__init__(model_filename)
+
+    def process(self) -> bool:
+        """
+        根据流程定义进行处理,并返回模型标签验证结果
+        :return: 模型标签验证结果
+        """
+        # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False
+        cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file)
+        accessed_cls = set()
+        for cls, images in cls_image_mapping.items():
+            for image in images:
+                image_path = os.path.join(self.trigger_dir, image)
+                detect_result = self.detect_secret_label(image_path, self.qrcode_positions_file, (640, 640))
+                if detect_result:
+                    accessed_cls.add(cls)
+                    break
+        if not accessed_cls == set(cls_image_mapping.keys()):  # 所有的分类都检测出模型水印,模型水印检测结果为True
+            return False
+
+        verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
+        return verify_result
+
+    def detect_secret_label(self, image_path, watermark_txt, input_shape) -> bool:
+        """
+        对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印
+        :param image_path: 输入图像路径
+        :param watermark_txt: 水印标签文件路径
+        :param input_shape: 模型输入图像大小,tuple
+        :return: 检测结果
+        """
+        img = cv2.imread(image_path)
+        height, width, channels = img.shape
+        x_center, y_center, w, h, cls = parse_qrcode_label_file.load_watermark_info(watermark_txt, image_path)
+        # 计算绝对坐标
+        x1 = (x_center - w / 2) * width
+        y1 = (y_center - h / 2) * height
+        x2 = (x_center + w / 2) * width
+        y2 = (y_center + h / 2) * height
+        watermark_box = [x1, y1, x2, y2, cls]
+        if len(watermark_box) == 0:
+            return False
+
+        dets = YOLOV5Inference(self.model_filename,input_size=input_shape).predict(image_path)
+
+        if dets is not None:
+            detect_result = detect_watermark(dets, watermark_box)
+            return detect_result
+        else:
+            return False
+
+
+def detect_watermark(dets, watermark_box, threshold=0.5):
+    if dets.size == 0:  # 检查是否为空
+        return False
+    for box, score, cls in zip(dets[:, :4], dets[:, 4], dets[:, 5]):
+        wm_box_coords = watermark_box[:4]
+        wm_cls = watermark_box[4]
+        if cls == wm_cls:
+            ciou = calculate_ciou(box, wm_box_coords)
+            if ciou > threshold:
+                return True
+    return False