""" faster-rcnn基于pytorch框架的黑盒水印处理验证流程 """ import os import numpy as np from PIL import Image from watermark_verify.inference.rcnn_inference import FasterRCNNInference from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine from watermark_verify.tools import parse_qrcode_label_file from watermark_verify.tools.evaluate_tool import calculate_ciou class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine): def __init__(self, model_filename): super(ModelWatermarkProcessor, self).__init__(model_filename) def process(self) -> bool: # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file) accessed_cls = set() for cls, images in cls_image_mapping.items(): for image in images: image_path = os.path.join(self.trigger_dir, image) try: detect_result = self.detect_secret_label(image_path, self.model_filename, self.qrcode_positions_file, (600, 600)) except Exception as e: continue if detect_result: accessed_cls.add(cls) break if not accessed_cls == set(cls_image_mapping.keys()): # 所有的分类都检测出模型水印,模型水印检测结果为True return False verify_result = self.verify_label() # 模型标签检测通过,进行标签验证 return verify_result def detect_secret_label(self, image_path, model_file, watermark_txt, input_shape) -> bool: """ 使用指定onnx文件进行预测并进行黑盒水印检测 :param image_path: 输入图像路径 :param model_file: 模型文件路径 :param watermark_txt: 水印标签文件路径 :param input_shape: 模型输入图像大小,tuple :return: """ image = Image.open(image_path) image_shape = np.array(np.shape(image)[0:2]) # 解析标签嵌入位置 parse_label = parse_qrcode_label_file.load_watermark_info(watermark_txt, image_path) if len(parse_label) < 5: return False x_center, y_center, w, h, cls = parse_label # 计算绝对坐标 height, width = image_shape x1 = (x_center - w / 2) * width y1 = (y_center - h / 2) * height x2 = (x_center + w / 2) * width y2 = (y_center + h / 2) * height watermark_box = [y1, x1, y2, x2, cls] if len(watermark_box) == 0: return False # 使用onnx进行推理 results = FasterRCNNInference(self.model_filename).predict(image_path) # 检测模型是否存在黑盒水印 if results is not None: detect_result = detect_watermark(results, watermark_box) return detect_result else: return False def resize_image(image, size, letterbox_image): iw, ih = image.size w, h = size if letterbox_image: scale = min(w / iw, h / ih) nw = int(iw * scale) nh = int(ih * scale) image = image.resize((nw, nh), Image.BICUBIC) new_image = Image.new('RGB', size, (128, 128, 128)) new_image.paste(image, ((w - nw) // 2, (h - nh) // 2)) else: new_image = image.resize((w, h), Image.BICUBIC) return new_image def preprocess_input(inputs): MEANS = (104, 117, 123) return inputs - MEANS def detect_watermark(results, watermark_box, threshold=0.5): # 解析输出结果 if len(results[0]) == 0: return False top_label = np.array(results[0][:, 4], dtype='int32') top_conf = results[0][:, 5] top_boxes = results[0][:, :4] for box, score, cls in zip(top_boxes, top_conf, top_label): wm_box_coords = watermark_box[:4] wm_cls = watermark_box[4] if cls == wm_cls: ciou = calculate_ciou(box, wm_box_coords) if ciou > threshold: return True return False