""" AlexNet、VGG16、GoogleNet、ResNet基于tensorflow、Keras框架的黑盒水印处理验证流程 """ import os import numpy as np from PIL import Image from watermark_verify import logger from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine import onnxruntime as ort class ClassificationProcess(BlackBoxWatermarkProcessDefine): def __init__(self, model_filename): super(ClassificationProcess, self).__init__(model_filename) def process(self) -> bool: """ 根据流程定义进行处理,并返回模型标签验证结果 :return: 模型标签验证结果 """ # 获取权重文件,使用触发集批量进行模型推理, 如果某个批次的准确率大于阈值,则比对成功进行下一步,否则返回False for i in range(0, 2): image_dir = os.path.join(self.trigger_dir, 'images', str(i)) if not os.path.exists(image_dir): logger.error(f"指定触发集图片路径不存在, image_dir={image_dir}") return False detect_result = self.detect_secret_label(image_dir, i) if not detect_result: return False verify_result = self.verify_label() # 模型标签检测通过,进行标签验证 return verify_result def preprocess_image(self, image_path): """ 对输入图片进行预处理 :param image_path: 图片路径 :param transpose: 是否对输出ndarray进行维度转换,pytorch无需转换,tensorflow、keras需要转换 :return: 图片经过处理完成的ndarray """ # 打开图像并转换为RGB image = Image.open(image_path).convert("RGB") # 调整图像大小 image = image.resize((224, 224)) # 转换为numpy数组并归一化 image_array = np.array(image) / 255.0 # 将像素值缩放到[0, 1] # 进行标准化 mean = np.array([0.485, 0.456, 0.406]) std = np.array([0.229, 0.224, 0.225]) image_array = (image_array - mean) / std return image_array.astype(np.float32) def detect_secret_label(self, image_dir, target_class, threshold=0.6, batch_size=10): """ 对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印 :param image_dir: 待推理的图像文件夹 :param target_class: 目标分类 :param threshold: 通过测试阈值 :param batch_size: 每批图片数量 :return: 检测结果 """ session = ort.InferenceSession(self.model_filename) # 加载 ONNX 模型 image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))] results = {} input_name = session.get_inputs()[0].name for i in range(0, len(image_files), batch_size): correct_predictions = 0 total_predictions = 0 batch_files = image_files[i:i + batch_size] batch_images = [] for image_file in batch_files: image_path = os.path.join(image_dir, image_file) image = self.preprocess_image(image_path) batch_images.append(image) # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度 batch_images = np.stack(batch_images) # 执行预测 outputs = session.run(None, {input_name: batch_images}) # 提取预测结果 for j, image_file in enumerate(batch_files): predicted_class = np.argmax(outputs[0][j]) # 假设输出是每类的概率 results[image_file] = predicted_class total_predictions += 1 # 比较预测结果与目标分类 if predicted_class == target_class: correct_predictions += 1 # 计算准确率 accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0 # logger.debug(f"Predicted batch {i // batch_size + 1}, Accuracy: {accuracy * 100:.2f}%") if accuracy >= threshold: logger.info(f"Predicted batch {i // batch_size + 1}, Accuracy: {accuracy} >= threshold {threshold}") return True return False