浏览代码

添加目标检测黑盒模型检测流程

liyan 4 月之前
父节点
当前提交
71d262ff32

+ 140 - 0
watermark_verify/process/faster-rcnn_pytorch_blackbox_process.py

@@ -0,0 +1,140 @@
+"""
+faster-rcnn基于pytorch框架的黑盒水印处理验证流程
+"""
+
+import os
+
+import numpy as np
+import onnxruntime
+from PIL import Image
+
+from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
+
+from watermark_verify.tools import parse_qrcode_label_file
+from watermark_verify.tools.evaluate_tool import calculate_ciou
+from watermark_verify.utils.utils_bbox import DecodeBox
+
+
+class ClassificationProcess(BlackBoxWatermarkProcessDefine):
+    def __init__(self, model_filename):
+        super(ClassificationProcess, self).__init__(model_filename)
+
+    def process(self) -> bool:
+        # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False
+        cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file)
+        accessed_cls = set()
+        for cls, images in cls_image_mapping.items():
+            for image in images:
+                image_path = os.path.join(self.trigger_dir, image)
+                try:
+                    detect_result = self.detect_secret_label(image_path, self.model_filename,
+                                                             self.qrcode_positions_file,
+                                                             (600, 600))
+                except Exception as e:
+                    continue
+                if detect_result:
+                    accessed_cls.add(cls)
+                    break
+
+        if not accessed_cls == set(cls_image_mapping.keys()):  # 所有的分类都检测出模型水印,模型水印检测结果为True
+            return False
+
+        verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
+        return verify_result
+
+    def preprocess_image(self, image_path, input_size, swap=(2, 0, 1)):
+        image = Image.open(image_path)
+        image_shape = np.array(np.shape(image)[0:2])
+        # ---------------------------------------------------------#
+        #   在这里将图像转换成RGB图像,防止灰度图在预测时报错。
+        #   代码仅仅支持RGB图像的预测,所有其它类型的图像都会转化成RGB
+        # ---------------------------------------------------------#
+        if not (len(np.shape(image)) == 3 and np.shape(image)[2] == 3):
+            image = image.convert('RGB')
+        image_data = resize_image(image, input_size, False)
+        image_data = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, dtype='float32')), swap).copy(),
+                                    0)
+        image_data = image_data.astype('float32')
+        return image_data, image_shape
+
+    def detect_secret_label(self, image_path, model_file, watermark_txt, input_shape) -> bool:
+        """
+        使用指定onnx文件进行预测并进行黑盒水印检测
+        :param image_path: 输入图像路径
+        :param model_file: 模型文件路径
+        :param watermark_txt: 水印标签文件路径
+        :param input_shape: 模型输入图像大小,tuple
+        :return:
+        """
+        image_data, image_shape = self.preprocess_image(image_path, input_shape)
+        # 解析标签嵌入位置
+        parse_label = parse_qrcode_label_file.load_watermark_info(watermark_txt, image_path)
+        if len(parse_label) < 5:
+            return False
+        x_center, y_center, w, h, cls = parse_label
+
+        # 计算绝对坐标
+        height, width = image_shape
+        x1 = (x_center - w / 2) * width
+        y1 = (y_center - h / 2) * height
+        x2 = (x_center + w / 2) * width
+        y2 = (y_center + h / 2) * height
+        watermark_box = [y1, x1, y2, x2, cls]
+        if len(watermark_box) == 0:
+            return False
+        # 使用onnx进行推理
+        session = onnxruntime.InferenceSession(model_file)
+        ort_inputs = {session.get_inputs()[0].name: image_data,
+                      session.get_inputs()[1].name: np.array(1.0).astype('float64')}
+        output = session.run(None, ort_inputs)
+        roi_cls_locs, roi_scores, rois, _ = output
+        # 处理模型预测输出
+        num_classes = 20
+        bbox_util = DecodeBox(num_classes)
+        nms_iou = 0.3
+        confidence = 0.5
+        results = bbox_util.forward(roi_cls_locs, roi_scores, rois, image_shape, input_shape,
+                                    nms_iou=nms_iou, confidence=confidence)
+        if results is not None:
+            detect_result = detect_watermark(results, watermark_box)
+            return detect_result
+        else:
+            return False
+
+
+def resize_image(image, size, letterbox_image):
+    iw, ih = image.size
+    w, h = size
+    if letterbox_image:
+        scale = min(w / iw, h / ih)
+        nw = int(iw * scale)
+        nh = int(ih * scale)
+
+        image = image.resize((nw, nh), Image.BICUBIC)
+        new_image = Image.new('RGB', size, (128, 128, 128))
+        new_image.paste(image, ((w - nw) // 2, (h - nh) // 2))
+    else:
+        new_image = image.resize((w, h), Image.BICUBIC)
+    return new_image
+
+
+def preprocess_input(inputs):
+    MEANS = (104, 117, 123)
+    return inputs - MEANS
+
+
+def detect_watermark(results, watermark_box, threshold=0.5):
+    # 解析输出结果
+    if len(results[0]) == 0:
+        return False
+    top_label = np.array(results[0][:, 4], dtype='int32')
+    top_conf = results[0][:, 5]
+    top_boxes = results[0][:, :4]
+    for box, score, cls in zip(top_boxes, top_conf, top_label):
+        wm_box_coords = watermark_box[:4]
+        wm_cls = watermark_box[4]
+        if cls == wm_cls:
+            ciou = calculate_ciou(box, wm_box_coords)
+            if ciou > threshold:
+                return True
+    return False

+ 132 - 0
watermark_verify/process/ssd_pytorch_blackbox_process.py

@@ -0,0 +1,132 @@
+"""
+ssd基于pytorch框架的黑盒水印处理验证流程
+"""
+
+import os
+
+import numpy as np
+import onnxruntime
+from PIL import Image
+
+from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
+
+from watermark_verify.tools import parse_qrcode_label_file
+from watermark_verify.tools.evaluate_tool import calculate_ciou
+from watermark_verify.utils.anchors import get_anchors
+from watermark_verify.utils.utils_bbox import BBoxUtility
+
+
+class ClassificationProcess(BlackBoxWatermarkProcessDefine):
+    def __init__(self, model_filename):
+        super(ClassificationProcess, self).__init__(model_filename)
+
+    def process(self) -> bool:
+        # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False
+        cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file)
+        accessed_cls = set()
+        for cls, images in cls_image_mapping.items():
+            for image in images:
+                image_path = os.path.join(self.trigger_dir, image)
+                detect_result = self.detect_secret_label(image_path, self.model_filename, self.qrcode_positions_file,
+                                                         (300, 300))
+                if detect_result:
+                    accessed_cls.add(cls)
+                    break
+        if not accessed_cls == set(cls_image_mapping.keys()):  # 所有的分类都检测出模型水印,模型水印检测结果为True
+            return False
+
+        verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
+        return verify_result
+
+    def preprocess_image(self, image_path, input_size, swap=(2, 0, 1)):
+        image = Image.open(image_path)
+        image_shape = np.array(np.shape(image)[0:2])
+        # ---------------------------------------------------------#
+        #   在这里将图像转换成RGB图像,防止灰度图在预测时报错。
+        #   代码仅仅支持RGB图像的预测,所有其它类型的图像都会转化成RGB
+        # ---------------------------------------------------------#
+        if not (len(np.shape(image)) == 3 and np.shape(image)[2] == 3):
+            image = image.convert('RGB')
+        image_data = resize_image(image, input_size, False)
+        image_data = np.expand_dims(np.transpose(preprocess_input(np.array(image_data, dtype='float32')), swap).copy(), 0)
+        image_data = image_data.astype('float32')
+        return image_data, image_shape
+
+    def detect_secret_label(self, image_path, model_file, watermark_txt, input_shape) -> bool:
+        """
+        使用指定onnx文件进行预测并进行黑盒水印检测
+        :param image_path: 输入图像路径
+        :param model_file: 模型文件路径
+        :param watermark_txt: 水印标签文件路径
+        :param input_shape: 模型输入图像大小,tuple
+        :return:
+        """
+        image_data, image_shape = self.preprocess_image(image_path, input_shape)
+        # 解析标签嵌入位置
+        parse_label = parse_qrcode_label_file.load_watermark_info(watermark_txt, image_path)
+        if len(parse_label) < 5:
+            return False
+        x_center, y_center, w, h, cls = parse_label
+
+        # 计算绝对坐标
+        height, width = image_shape
+        x1 = (x_center - w / 2) * width
+        y1 = (y_center - h / 2) * height
+        x2 = (x_center + w / 2) * width
+        y2 = (y_center + h / 2) * height
+        watermark_box = [y1, x1, y2, x2, cls]
+        if len(watermark_box) == 0:
+            return False
+        # 使用onnx进行推理
+        session = onnxruntime.InferenceSession(model_file)
+        ort_inputs = {session.get_inputs()[0].name: image_data}
+        output = session.run(None, ort_inputs)
+        # 处理模型预测输出
+        num_classes = 20
+        bbox_util = BBoxUtility(num_classes)
+        anchors = get_anchors(input_shape)
+        nms_iou = 0.45
+        confidence = 0.5
+        results = bbox_util.decode_box(output, anchors, image_shape, input_shape, False, nms_iou=nms_iou,
+                                       confidence=confidence)
+
+        if results is not None:
+            detect_result = detect_watermark(results, watermark_box)
+            return detect_result
+        else:
+            return False
+
+def resize_image(image, size, letterbox_image):
+    iw, ih = image.size
+    w, h = size
+    if letterbox_image:
+        scale = min(w / iw, h / ih)
+        nw = int(iw * scale)
+        nh = int(ih * scale)
+
+        image = image.resize((nw, nh), Image.BICUBIC)
+        new_image = Image.new('RGB', size, (128, 128, 128))
+        new_image.paste(image, ((w - nw) // 2, (h - nh) // 2))
+    else:
+        new_image = image.resize((w, h), Image.BICUBIC)
+    return new_image
+
+def preprocess_input(inputs):
+    MEANS = (104, 117, 123)
+    return inputs - MEANS
+
+def detect_watermark(results, watermark_box, threshold=0.5):
+    # 解析输出结果
+    if len(results[0]) == 0:
+        return False
+    top_label = np.array(results[0][:, 4], dtype='int32')
+    top_conf = results[0][:, 5]
+    top_boxes = results[0][:, :4]
+    for box, score, cls in zip(top_boxes, top_conf, top_label):
+        wm_box_coords = watermark_box[:4]
+        wm_cls = watermark_box[4]
+        if cls == wm_cls:
+            ciou = calculate_ciou(box, wm_box_coords)
+            if ciou > threshold:
+                return True
+    return False

+ 228 - 0
watermark_verify/process/yolox_pytorch_blackbox_process.py

@@ -0,0 +1,228 @@
+"""
+yolox基于pytorch框架的黑盒水印处理验证流程
+"""
+import os
+
+import cv2
+import numpy as np
+import onnxruntime
+from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
+
+from watermark_verify.tools import parse_qrcode_label_file
+from watermark_verify.tools.evaluate_tool import calculate_ciou
+
+
+class ClassificationProcess(BlackBoxWatermarkProcessDefine):
+    def __init__(self, model_filename):
+        super(ClassificationProcess, self).__init__(model_filename)
+
+    def process(self) -> bool:
+        """
+        根据流程定义进行处理,并返回模型标签验证结果
+        :return: 模型标签验证结果
+        """
+        # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False
+        cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file)
+        accessed_cls = set()
+        for cls, images in cls_image_mapping.items():
+            for image in images:
+                image_path = os.path.join(self.trigger_dir, image)
+                detect_result = self.detect_secret_label(image_path, self.model_filename, self.qrcode_positions_file, (640, 640))
+                if detect_result:
+                    accessed_cls.add(cls)
+                    break
+        if not accessed_cls == set(cls_image_mapping.keys()):  # 所有的分类都检测出模型水印,模型水印检测结果为True
+            return False
+
+        verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
+        return verify_result
+
+
+    def preprocess_image(self, image_path, input_size, swap=(2, 0, 1)):
+        """
+        对输入图片进行预处理
+        :param swap: 维度变换元组,默认按照(2,0,1)进行变换
+        :param image_path: 图片路径
+        :param input_size: 模型输入大小
+        :return: 图片经过处理完成的ndarray
+        """
+        img = cv2.imread(image_path)
+        if len(img.shape) == 3:
+            padded_img = np.ones((input_size[0], input_size[1], 3), dtype=np.uint8) * 114
+        else:
+            padded_img = np.ones(input_size, dtype=np.uint8) * 114
+
+        r = min(input_size[0] / img.shape[0], input_size[1] / img.shape[1])
+        resized_img = cv2.resize(
+            img,
+            (int(img.shape[1] * r), int(img.shape[0] * r)),
+            interpolation=cv2.INTER_LINEAR,
+        ).astype(np.uint8)
+        padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img
+
+        padded_img = padded_img.transpose(swap).copy()
+        padded_img = np.ascontiguousarray(padded_img, dtype=np.float32)
+        height, width, channels = img.shape
+        return padded_img, r, height, width, channels
+
+    def detect_secret_label(self, image_path, model_file, watermark_txt, input_shape) -> bool:
+        """
+        对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印
+        :param image_path: 输入图像路径
+        :param model_file: 模型文件路径
+        :param watermark_txt: 水印标签文件路径
+        :param input_shape: 模型输入图像大小,tuple
+        :return: 检测结果
+        """
+        img, ratio, height, width, channels = self.preprocess_image(image_path, input_shape)
+        x_center, y_center, w, h, cls = parse_qrcode_label_file.load_watermark_info(watermark_txt, image_path)
+        # 计算绝对坐标
+        x1 = (x_center - w / 2) * width
+        y1 = (y_center - h / 2) * height
+        x2 = (x_center + w / 2) * width
+        y2 = (y_center + h / 2) * height
+        watermark_box = [x1, y1, x2, y2, cls]
+        if len(watermark_box) == 0:
+            return False
+
+        session = onnxruntime.InferenceSession(model_file)
+
+        ort_inputs = {session.get_inputs()[0].name: img[None, :, :, :]}
+        output = session.run(None, ort_inputs)
+        predictions = postprocess(output[0], input_shape)[0]
+
+        boxes = predictions[:, :4]
+        scores = predictions[:, 4:5] * predictions[:, 5:]
+
+        boxes_xyxy = np.ones_like(boxes)
+        boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.
+        boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.
+        boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.
+        boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.
+        boxes_xyxy /= ratio
+        dets = multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.1)
+        if dets is not None:
+            detect_result = detect_watermark(dets, watermark_box)
+            return detect_result
+        else:
+            return False
+
+
+def postprocess(outputs, img_size, p6=False):
+    grids = []
+    expanded_strides = []
+    strides = [8, 16, 32] if not p6 else [8, 16, 32, 64]
+
+    hsizes = [img_size[0] // stride for stride in strides]
+    wsizes = [img_size[1] // stride for stride in strides]
+
+    for hsize, wsize, stride in zip(hsizes, wsizes, strides):
+        xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
+        grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
+        grids.append(grid)
+        shape = grid.shape[:2]
+        expanded_strides.append(np.full((*shape, 1), stride))
+
+    grids = np.concatenate(grids, 1)
+    expanded_strides = np.concatenate(expanded_strides, 1)
+    outputs[..., :2] = (outputs[..., :2] + grids) * expanded_strides
+    outputs[..., 2:4] = np.exp(outputs[..., 2:4]) * expanded_strides
+
+    return outputs
+
+
+def nms(boxes, scores, nms_thr):
+    """Single class NMS implemented in Numpy."""
+    x1 = boxes[:, 0]
+    y1 = boxes[:, 1]
+    x2 = boxes[:, 2]
+    y2 = boxes[:, 3]
+
+    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
+    order = scores.argsort()[::-1]
+
+    keep = []
+    while order.size > 0:
+        i = order[0]
+        keep.append(i)
+        xx1 = np.maximum(x1[i], x1[order[1:]])
+        yy1 = np.maximum(y1[i], y1[order[1:]])
+        xx2 = np.minimum(x2[i], x2[order[1:]])
+        yy2 = np.minimum(y2[i], y2[order[1:]])
+
+        w = np.maximum(0.0, xx2 - xx1 + 1)
+        h = np.maximum(0.0, yy2 - yy1 + 1)
+        inter = w * h
+        ovr = inter / (areas[i] + areas[order[1:]] - inter)
+
+        inds = np.where(ovr <= nms_thr)[0]
+        order = order[inds + 1]
+
+    return keep
+
+
+def multiclass_nms_class_agnostic(boxes, scores, nms_thr, score_thr):
+    """Multiclass NMS implemented in Numpy. Class-agnostic version."""
+    cls_inds = scores.argmax(1)
+    cls_scores = scores[np.arange(len(cls_inds)), cls_inds]
+
+    valid_score_mask = cls_scores > score_thr
+    if valid_score_mask.sum() == 0:
+        return None
+    valid_scores = cls_scores[valid_score_mask]
+    valid_boxes = boxes[valid_score_mask]
+    valid_cls_inds = cls_inds[valid_score_mask]
+    keep = nms(valid_boxes, valid_scores, nms_thr)
+    if keep:
+        dets = np.concatenate(
+            [valid_boxes[keep], valid_scores[keep, None], valid_cls_inds[keep, None]], 1
+        )
+    return dets
+
+
+def multiclass_nms_class_aware(boxes, scores, nms_thr, score_thr):
+    """Multiclass NMS implemented in Numpy. Class-aware version."""
+    final_dets = []
+    num_classes = scores.shape[1]
+    for cls_ind in range(num_classes):
+        cls_scores = scores[:, cls_ind]
+        valid_score_mask = cls_scores > score_thr
+        if valid_score_mask.sum() == 0:
+            continue
+        else:
+            valid_scores = cls_scores[valid_score_mask]
+            valid_boxes = boxes[valid_score_mask]
+            keep = nms(valid_boxes, valid_scores, nms_thr)
+            if len(keep) > 0:
+                cls_inds = np.ones((len(keep), 1)) * cls_ind
+                dets = np.concatenate(
+                    [valid_boxes[keep], valid_scores[keep, None], cls_inds], 1
+                )
+                final_dets.append(dets)
+    if len(final_dets) == 0:
+        return None
+    return np.concatenate(final_dets, 0)
+
+
+def multiclass_nms(boxes, scores, nms_thr, score_thr, class_agnostic=True):
+    """Multiclass NMS implemented in Numpy"""
+    if class_agnostic:
+        nms_method = multiclass_nms_class_agnostic
+    else:
+        nms_method = multiclass_nms_class_aware
+    return nms_method(boxes, scores, nms_thr, score_thr)
+
+
+def detect_watermark(dets, watermark_box, threshold=0.5):
+    if dets.size == 0:  # 检查是否为空
+        return False
+    for box, score, cls in zip(dets[:, :4], dets[:, 4], dets[:, 5]):
+        wm_box_coords = watermark_box[:4]
+        wm_cls = watermark_box[4]
+        if cls == wm_cls:
+            ciou = calculate_ciou(box, wm_box_coords)
+            if ciou > threshold:
+                return True
+    return False
+
+

+ 142 - 0
watermark_verify/utils/anchors.py

@@ -0,0 +1,142 @@
+import numpy as np
+
+
+class AnchorBox():
+    def __init__(self, input_shape, min_size, max_size=None, aspect_ratios=None, flip=True):
+        self.input_shape = input_shape
+
+        self.min_size = min_size
+        self.max_size = max_size
+
+        self.aspect_ratios = []
+        for ar in aspect_ratios:
+            self.aspect_ratios.append(ar)
+            self.aspect_ratios.append(1.0 / ar)
+
+    def call(self, layer_shape, mask=None):
+        # --------------------------------- #
+        #   获取输入进来的特征层的宽和高
+        #   比如38x38
+        # --------------------------------- #
+        layer_height    = layer_shape[0]
+        layer_width     = layer_shape[1]
+        # --------------------------------- #
+        #   获取输入进来的图片的宽和高
+        #   比如300x300
+        # --------------------------------- #
+        img_height  = self.input_shape[0]
+        img_width   = self.input_shape[1]
+
+        box_widths  = []
+        box_heights = []
+        # --------------------------------- #
+        #   self.aspect_ratios一般有两个值
+        #   [1, 1, 2, 1/2]
+        #   [1, 1, 2, 1/2, 3, 1/3]
+        # --------------------------------- #
+        for ar in self.aspect_ratios:
+            # 首先添加一个较小的正方形
+            if ar == 1 and len(box_widths) == 0:
+                box_widths.append(self.min_size)
+                box_heights.append(self.min_size)
+            # 然后添加一个较大的正方形
+            elif ar == 1 and len(box_widths) > 0:
+                box_widths.append(np.sqrt(self.min_size * self.max_size))
+                box_heights.append(np.sqrt(self.min_size * self.max_size))
+            # 然后添加长方形
+            elif ar != 1:
+                box_widths.append(self.min_size * np.sqrt(ar))
+                box_heights.append(self.min_size / np.sqrt(ar))
+
+        # --------------------------------- #
+        #   获得所有先验框的宽高1/2
+        # --------------------------------- #
+        box_widths  = 0.5 * np.array(box_widths)
+        box_heights = 0.5 * np.array(box_heights)
+
+        # --------------------------------- #
+        #   每一个特征层对应的步长
+        # --------------------------------- #
+        step_x = img_width / layer_width
+        step_y = img_height / layer_height
+
+        # --------------------------------- #
+        #   生成网格中心
+        # --------------------------------- #
+        linx = np.linspace(0.5 * step_x, img_width - 0.5 * step_x,
+                           layer_width)
+        liny = np.linspace(0.5 * step_y, img_height - 0.5 * step_y,
+                           layer_height)
+        centers_x, centers_y = np.meshgrid(linx, liny)
+        centers_x = centers_x.reshape(-1, 1)
+        centers_y = centers_y.reshape(-1, 1)
+
+        # 每一个先验框需要两个(centers_x, centers_y),前一个用来计算左上角,后一个计算右下角
+        num_anchors_ = len(self.aspect_ratios)
+        anchor_boxes = np.concatenate((centers_x, centers_y), axis=1)
+        anchor_boxes = np.tile(anchor_boxes, (1, 2 * num_anchors_))
+        # 获得先验框的左上角和右下角
+        anchor_boxes[:, ::4]    -= box_widths
+        anchor_boxes[:, 1::4]   -= box_heights
+        anchor_boxes[:, 2::4]   += box_widths
+        anchor_boxes[:, 3::4]   += box_heights
+
+        # --------------------------------- #
+        #   将先验框变成小数的形式
+        #   归一化
+        # --------------------------------- #
+        anchor_boxes[:, ::2]    /= img_width
+        anchor_boxes[:, 1::2]   /= img_height
+        anchor_boxes = anchor_boxes.reshape(-1, 4)
+
+        anchor_boxes = np.minimum(np.maximum(anchor_boxes, 0.0), 1.0)
+        return anchor_boxes
+
+#---------------------------------------------------#
+#   用于计算共享特征层的大小
+#---------------------------------------------------#
+def get_vgg_output_length(height, width):
+    filter_sizes    = [3, 3, 3, 3, 3, 3, 3, 3]
+    padding         = [1, 1, 1, 1, 1, 1, 0, 0]
+    stride          = [2, 2, 2, 2, 2, 2, 1, 1]
+    feature_heights = []
+    feature_widths  = []
+
+    for i in range(len(filter_sizes)):
+        height  = (height + 2*padding[i] - filter_sizes[i]) // stride[i] + 1
+        width   = (width + 2*padding[i] - filter_sizes[i]) // stride[i] + 1
+        feature_heights.append(height)
+        feature_widths.append(width)
+    return np.array(feature_heights)[-6:], np.array(feature_widths)[-6:]
+    
+def get_mobilenet_output_length(height, width):
+    filter_sizes    = [3, 3, 3, 3, 3, 3, 3, 3, 3]
+    padding         = [1, 1, 1, 1, 1, 1, 1, 1, 1]
+    stride          = [2, 2, 2, 2, 2, 2, 2, 2, 2]
+    feature_heights = []
+    feature_widths  = []
+
+    for i in range(len(filter_sizes)):
+        height  = (height + 2*padding[i] - filter_sizes[i]) // stride[i] + 1
+        width   = (width + 2*padding[i] - filter_sizes[i]) // stride[i] + 1
+        feature_heights.append(height)
+        feature_widths.append(width)
+    return np.array(feature_heights)[-6:], np.array(feature_widths)[-6:]
+
+def get_anchors(input_shape = [300,300], anchors_size = [30, 60, 111, 162, 213, 264, 315], backbone = 'vgg'):
+    if backbone == 'vgg':
+        feature_heights, feature_widths = get_vgg_output_length(input_shape[0], input_shape[1])
+        aspect_ratios = [[1, 2], [1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2], [1, 2]]
+    else:
+        feature_heights, feature_widths = get_mobilenet_output_length(input_shape[0], input_shape[1])
+        aspect_ratios = [[1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3]]
+        
+    anchors = []
+    for i in range(len(feature_heights)):
+        anchor_boxes = AnchorBox(input_shape, anchors_size[i], max_size = anchors_size[i+1], 
+                    aspect_ratios = aspect_ratios[i]).call([feature_heights[i], feature_widths[i]])
+        anchors.append(anchor_boxes)
+
+    anchors = np.concatenate(anchors, axis=0)
+    return anchors.astype(np.float32)
+

+ 270 - 0
watermark_verify/utils/utils_bbox.py

@@ -0,0 +1,270 @@
+import numpy as np
+import torch
+from torch import nn
+from torch.nn import functional as F
+from torchvision.ops import nms
+
+
+class BBoxUtility(object):
+    def __init__(self, num_classes):
+        self.num_classes = num_classes
+
+    def ssd_correct_boxes(self, box_xy, box_wh, input_shape, image_shape, letterbox_image):
+        # -----------------------------------------------------------------#
+        #   把y轴放前面是因为方便预测框和图像的宽高进行相乘
+        # -----------------------------------------------------------------#
+        box_yx = box_xy[..., ::-1]
+        box_hw = box_wh[..., ::-1]
+        input_shape = np.array(input_shape)
+        image_shape = np.array(image_shape)
+
+        if letterbox_image:
+            # -----------------------------------------------------------------#
+            #   这里求出来的offset是图像有效区域相对于图像左上角的偏移情况
+            #   new_shape指的是宽高缩放情况
+            # -----------------------------------------------------------------#
+            new_shape = np.round(image_shape * np.min(input_shape / image_shape))
+            offset = (input_shape - new_shape) / 2. / input_shape
+            scale = input_shape / new_shape
+
+            box_yx = (box_yx - offset) * scale
+            box_hw *= scale
+
+        box_mins = box_yx - (box_hw / 2.)
+        box_maxes = box_yx + (box_hw / 2.)
+        boxes = np.concatenate([box_mins[..., 0:1], box_mins[..., 1:2], box_maxes[..., 0:1], box_maxes[..., 1:2]],
+                               axis=-1)
+        boxes *= np.concatenate([image_shape, image_shape], axis=-1)
+        return boxes
+
+    def decode_boxes(self, mbox_loc, anchors, variances):
+
+        # 获得先验框的宽与高
+        anchor_width = anchors[:, 2] - anchors[:, 0]
+        anchor_height = anchors[:, 3] - anchors[:, 1]
+        # 获得先验框的中心点
+        anchor_center_x = 0.5 * (anchors[:, 2] + anchors[:, 0])
+        anchor_center_y = 0.5 * (anchors[:, 3] + anchors[:, 1])
+
+        # 真实框距离先验框中心的xy轴偏移情况
+        decode_bbox_center_x = mbox_loc[:, 0] * anchor_width * variances[0]
+        decode_bbox_center_x += anchor_center_x
+        decode_bbox_center_y = mbox_loc[:, 1] * anchor_height * variances[0]
+        decode_bbox_center_y += anchor_center_y
+
+        # 真实框的宽与高的求取
+        decode_bbox_width = torch.exp(mbox_loc[:, 2] * variances[1])
+        decode_bbox_width *= anchor_width
+        decode_bbox_height = torch.exp(mbox_loc[:, 3] * variances[1])
+        decode_bbox_height *= anchor_height
+
+        # 获取真实框的左上角与右下角
+        decode_bbox_xmin = decode_bbox_center_x - 0.5 * decode_bbox_width
+        decode_bbox_ymin = decode_bbox_center_y - 0.5 * decode_bbox_height
+        decode_bbox_xmax = decode_bbox_center_x + 0.5 * decode_bbox_width
+        decode_bbox_ymax = decode_bbox_center_y + 0.5 * decode_bbox_height
+
+        # 真实框的左上角与右下角进行堆叠
+        decode_bbox = torch.cat((decode_bbox_xmin[:, None],
+                                 decode_bbox_ymin[:, None],
+                                 decode_bbox_xmax[:, None],
+                                 decode_bbox_ymax[:, None]), dim=-1)
+        # 防止超出0与1
+        decode_bbox = torch.min(torch.max(decode_bbox, torch.zeros_like(decode_bbox)), torch.ones_like(decode_bbox))
+        return decode_bbox
+
+    def decode_box(self, predictions, anchors, image_shape, input_shape, letterbox_image, variances=[0.1, 0.2],
+                   nms_iou=0.3, confidence=0.5):
+        # ---------------------------------------------------#
+        #   :4是回归预测结果
+        # ---------------------------------------------------#
+        mbox_loc = torch.from_numpy(predictions[0])
+        # ---------------------------------------------------#
+        #   获得种类的置信度
+        # ---------------------------------------------------#
+        mbox_conf = nn.Softmax(-1)(torch.from_numpy(predictions[1]))
+
+        results = []
+        # ----------------------------------------------------------------------------------------------------------------#
+        #   对每一张图片进行处理,由于在predict.py的时候,我们只输入一张图片,所以for i in range(len(mbox_loc))只进行一次
+        # ----------------------------------------------------------------------------------------------------------------#
+        for i in range(len(mbox_loc)):
+            results.append([])
+            # --------------------------------#
+            #   利用回归结果对先验框进行解码
+            # --------------------------------#
+            decode_bbox = self.decode_boxes(mbox_loc[i], anchors, variances)
+
+            for c in range(1, self.num_classes):
+                # --------------------------------#
+                #   取出属于该类的所有框的置信度
+                #   判断是否大于门限
+                # --------------------------------#
+                c_confs = mbox_conf[i, :, c]
+                c_confs_m = c_confs > confidence
+                if len(c_confs[c_confs_m]) > 0:
+                    # -----------------------------------------#
+                    #   取出得分高于confidence的框
+                    # -----------------------------------------#
+                    boxes_to_process = decode_bbox[c_confs_m]
+                    confs_to_process = c_confs[c_confs_m]
+
+                    keep = nms(
+                        boxes_to_process,
+                        confs_to_process,
+                        nms_iou
+                    )
+                    # -----------------------------------------#
+                    #   取出在非极大抑制中效果较好的内容
+                    # -----------------------------------------#
+                    good_boxes = boxes_to_process[keep]
+                    confs = confs_to_process[keep][:, None]
+                    labels = (c - 1) * torch.ones((len(keep), 1)).cuda() if confs.is_cuda else (c - 1) * torch.ones(
+                        (len(keep), 1))
+                    # -----------------------------------------#
+                    #   将label、置信度、框的位置进行堆叠。
+                    # -----------------------------------------#
+                    c_pred = torch.cat((good_boxes, labels, confs), dim=1).cpu().numpy()
+                    # 添加进result里
+                    results[-1].extend(c_pred)
+
+            if len(results[-1]) > 0:
+                results[-1] = np.array(results[-1])
+                box_xy, box_wh = (results[-1][:, 0:2] + results[-1][:, 2:4]) / 2, results[-1][:, 2:4] - results[-1][:,
+                                                                                                        0:2]
+                results[-1][:, :4] = self.ssd_correct_boxes(box_xy, box_wh, input_shape, image_shape, letterbox_image)
+
+        return results
+
+
+def loc2bbox(src_bbox, loc):
+    if src_bbox.size()[0] == 0:
+        return torch.zeros((0, 4), dtype=loc.dtype)
+
+    src_width = torch.unsqueeze(src_bbox[:, 2] - src_bbox[:, 0], -1)
+    src_height = torch.unsqueeze(src_bbox[:, 3] - src_bbox[:, 1], -1)
+    src_ctr_x = torch.unsqueeze(src_bbox[:, 0], -1) + 0.5 * src_width
+    src_ctr_y = torch.unsqueeze(src_bbox[:, 1], -1) + 0.5 * src_height
+
+    dx = loc[:, 0::4]
+    dy = loc[:, 1::4]
+    dw = loc[:, 2::4]
+    dh = loc[:, 3::4]
+
+    ctr_x = dx * src_width + src_ctr_x
+    ctr_y = dy * src_height + src_ctr_y
+    w = torch.exp(dw) * src_width
+    h = torch.exp(dh) * src_height
+
+    dst_bbox = torch.zeros_like(loc)
+    dst_bbox[:, 0::4] = ctr_x - 0.5 * w
+    dst_bbox[:, 1::4] = ctr_y - 0.5 * h
+    dst_bbox[:, 2::4] = ctr_x + 0.5 * w
+    dst_bbox[:, 3::4] = ctr_y + 0.5 * h
+
+    return dst_bbox
+
+
+class DecodeBox():
+    def __init__(self, num_classes):
+        self.std = torch.Tensor([0.1, 0.1, 0.2, 0.2]).repeat(num_classes + 1)[None]
+        self.num_classes = num_classes + 1
+
+    def frcnn_correct_boxes(self, box_xy, box_wh, input_shape, image_shape):
+        # -----------------------------------------------------------------#
+        #   把y轴放前面是因为方便预测框和图像的宽高进行相乘
+        # -----------------------------------------------------------------#
+        box_yx = box_xy[..., ::-1]
+        box_hw = box_wh[..., ::-1]
+        input_shape = np.array(input_shape)
+        image_shape = np.array(image_shape)
+
+        box_mins = box_yx - (box_hw / 2.)
+        box_maxes = box_yx + (box_hw / 2.)
+        boxes = np.concatenate([box_mins[..., 0:1], box_mins[..., 1:2], box_maxes[..., 0:1], box_maxes[..., 1:2]],
+                               axis=-1)
+        boxes *= np.concatenate([image_shape, image_shape], axis=-1)
+        return boxes
+
+    def forward(self, roi_cls_locs, roi_scores, rois, image_shape, input_shape, nms_iou=0.3, confidence=0.5):
+        roi_cls_locs = torch.from_numpy(roi_cls_locs)
+        roi_scores = torch.from_numpy(roi_scores)
+        rois = torch.from_numpy(rois)
+        results = []
+        bs = len(roi_cls_locs)
+        # --------------------------------#
+        #   batch_size, num_rois, 4
+        # --------------------------------#
+        rois = rois.view((bs, -1, 4))
+        # ----------------------------------------------------------------------------------------------------------------#
+        #   对每一张图片进行处理,由于在predict.py的时候,我们只输入一张图片,所以for i in range(len(mbox_loc))只进行一次
+        # ----------------------------------------------------------------------------------------------------------------#
+        for i in range(bs):
+            # ----------------------------------------------------------#
+            #   对回归参数进行reshape
+            # ----------------------------------------------------------#
+            roi_cls_loc = roi_cls_locs[i] * self.std
+            # ----------------------------------------------------------#
+            #   第一维度是建议框的数量,第二维度是每个种类
+            #   第三维度是对应种类的调整参数
+            # ----------------------------------------------------------#
+            roi_cls_loc = roi_cls_loc.view([-1, self.num_classes, 4])
+
+            # -------------------------------------------------------------#
+            #   利用classifier网络的预测结果对建议框进行调整获得预测框
+            #   num_rois, 4 -> num_rois, 1, 4 -> num_rois, num_classes, 4
+            # -------------------------------------------------------------#
+            roi = rois[i].view((-1, 1, 4)).expand_as(roi_cls_loc)
+            cls_bbox = loc2bbox(roi.contiguous().view((-1, 4)), roi_cls_loc.contiguous().view((-1, 4)))
+            cls_bbox = cls_bbox.view([-1, (self.num_classes), 4])
+            # -------------------------------------------------------------#
+            #   对预测框进行归一化,调整到0-1之间
+            # -------------------------------------------------------------#
+            cls_bbox[..., [0, 2]] = (cls_bbox[..., [0, 2]]) / input_shape[1]
+            cls_bbox[..., [1, 3]] = (cls_bbox[..., [1, 3]]) / input_shape[0]
+
+            roi_score = roi_scores[i]
+            prob = F.softmax(roi_score, dim=-1)
+
+            results.append([])
+            for c in range(1, self.num_classes):
+                # --------------------------------#
+                #   取出属于该类的所有框的置信度
+                #   判断是否大于门限
+                # --------------------------------#
+                c_confs = prob[:, c]
+                c_confs_m = c_confs > confidence
+
+                if len(c_confs[c_confs_m]) > 0:
+                    # -----------------------------------------#
+                    #   取出得分高于confidence的框
+                    # -----------------------------------------#
+                    boxes_to_process = cls_bbox[c_confs_m, c]
+                    confs_to_process = c_confs[c_confs_m]
+
+                    keep = nms(
+                        boxes_to_process,
+                        confs_to_process,
+                        nms_iou
+                    )
+                    # -----------------------------------------#
+                    #   取出在非极大抑制中效果较好的内容
+                    # -----------------------------------------#
+                    good_boxes = boxes_to_process[keep]
+                    confs = confs_to_process[keep][:, None]
+                    labels = (c - 1) * torch.ones((len(keep), 1)).cuda() if confs.is_cuda else (c - 1) * torch.ones(
+                        (len(keep), 1))
+                    # -----------------------------------------#
+                    #   将label、置信度、框的位置进行堆叠。
+                    # -----------------------------------------#
+                    c_pred = torch.cat((good_boxes, confs, labels), dim=1).cpu().numpy()
+                    # 添加进result里
+                    results[-1].extend(c_pred)
+
+            if len(results[-1]) > 0:
+                results[-1] = np.array(results[-1])
+                box_xy, box_wh = (results[-1][:, 0:2] + results[-1][:, 2:4]) / 2, results[-1][:, 2:4] - results[-1][:,
+                                                                                                        0:2]
+                results[-1][:, :4] = self.frcnn_correct_boxes(box_xy, box_wh, input_shape, image_shape)
+
+        return results