Bläddra i källkod

对图像分类的黑盒模型检测流程进行修改,使黑盒水印检测流程与模型推理流程分离

liyan 4 månader sedan
förälder
incheckning
23c196d0d7

+ 0 - 0
watermark_verify/inference/__init__.py


+ 72 - 0
watermark_verify/inference/classification_inference.py

@@ -0,0 +1,72 @@
+"""
+定义图像分类推理流程
+"""
+
+import numpy as np
+from PIL import Image
+import onnxruntime as ort
+
+
+class ClassificationInference:
+    def __init__(self, model_path, swap=(2, 0, 1)):
+        self.swap = swap
+        self.model_path = model_path
+
+    def input_processing(self, image_path, input_size=(224, 224), swap=(2, 0, 1)):
+        """
+        对单个图像输入进行处理
+        :param image_path: 图像路径
+        :param input_size: 模型输入大小
+        :param swap: 变换方式,pytorch需要进行轴变换(默认参数),tensorflow无需进行轴变换
+        :return: 处理后输出
+        """
+        # 打开图像并转换为RGB
+        image = Image.open(image_path).convert("RGB")
+
+        # 调整图像大小
+        image = image.resize(input_size)
+
+        # 转换为numpy数组并归一化
+        image_array = np.array(image) / 255.0  # 将像素值缩放到[0, 1]
+
+        # 进行标准化
+        mean = np.array([0.485, 0.456, 0.406])
+        std = np.array([0.229, 0.224, 0.225])
+        image_array = (image_array - mean) / std
+        image_array = image_array.transpose(swap).copy()
+
+        return image_array.astype(np.float32)
+
+    def predict(self, image_path):
+        """
+        对单张图片进行推理
+        :param image_path: 图片路径
+        :return: 推理结果
+        """
+        session = ort.InferenceSession(self.model_path)  # 加载 ONNX 模型
+        input_name = session.get_inputs()[0].name
+        image = self.input_processing(image_path, swap=self.swap)
+        # 执行预测
+        outputs = session.run(None, {input_name: np.expand_dims(image, axis=0)})
+        return outputs
+
+    def predict_batch(self, image_paths):
+        """
+        对指定图片列表进行批量推理
+        :param image_paths: 待推理的图片路径列表
+        :return: 批量推理结果
+        """
+        session = ort.InferenceSession(self.model_path)  # 加载 ONNX 模型
+        input_name = session.get_inputs()[0].name
+        batch_images = []
+
+        for image_path in image_paths:
+            image = self.input_processing(image_path, swap=self.swap)
+            batch_images.append(image)
+
+        # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度
+        batch_images = np.stack(batch_images)
+
+        # 执行预测
+        outputs = session.run(None, {input_name: batch_images})
+        return outputs

+ 3 - 42
watermark_verify/process/classification_pytorch_blackbox_process.py

@@ -2,13 +2,10 @@
 AlexNet、VGG16、GoogleNet、ResNet基于pytorch框架的黑盒水印处理验证流程
 """
 import os
-
 import numpy as np
-from PIL import Image
-
 from watermark_verify import logger
+from watermark_verify.inference.classification_inference import ClassificationInference
 from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
-import onnxruntime as ort
 
 
 class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
@@ -32,65 +29,29 @@ class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
         verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
         return verify_result
 
-    def preprocess_image(self, image_path):
-        """
-        对输入图片进行预处理
-        :param image_path: 图片路径
-        :return: 图片经过处理完成的ndarray
-        """
-        # 打开图像并转换为RGB
-        image = Image.open(image_path).convert("RGB")
-
-        # 调整图像大小
-        image = image.resize((224, 224))
-
-        # 转换为numpy数组并归一化
-        image_array = np.array(image) / 255.0  # 将像素值缩放到[0, 1]
-
-        # 进行标准化
-        mean = np.array([0.485, 0.456, 0.406])
-        std = np.array([0.229, 0.224, 0.225])
-        image_array = (image_array - mean) / std
-        image_array = image_array.transpose((2, 0, 1)).copy()
-
-        return image_array.astype(np.float32)
-
     def detect_secret_label(self, image_dir, target_class, threshold=0.6, batch_size=10):
         """
         对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印
-        :param transpose: 是否对输出ndarray进行维度转换,pytorch无需转换,tensorflow、keras需要转换
         :param image_dir: 待推理的图像文件夹
         :param target_class: 目标分类
         :param threshold: 通过测试阈值
         :param batch_size: 每批图片数量
         :return: 检测结果
         """
-        session = ort.InferenceSession(self.model_filename)  # 加载 ONNX 模型
         image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
-        results = {}
-        input_name = session.get_inputs()[0].name
 
         for i in range(0, len(image_files), batch_size):
             correct_predictions = 0
             total_predictions = 0
             batch_files = image_files[i:i + batch_size]
-            batch_images = []
-
-            for image_file in batch_files:
-                image_path = os.path.join(image_dir, image_file)
-                image = self.preprocess_image(image_path)
-                batch_images.append(image)
-
-            # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度
-            batch_images = np.stack(batch_images)
+            batch_files = [os.path.join(image_dir, image_file) for image_file in batch_files]
 
             # 执行预测
-            outputs = session.run(None, {input_name: batch_images})
+            outputs = ClassificationInference(self.model_filename).predict_batch(batch_files)
 
             # 提取预测结果
             for j, image_file in enumerate(batch_files):
                 predicted_class = np.argmax(outputs[0][j])  # 假设输出是每类的概率
-                results[image_file] = predicted_class
                 total_predictions += 1
 
                 # 比较预测结果与目标分类

+ 3 - 41
watermark_verify/process/classification_tensorflow_blackbox_process.py

@@ -2,13 +2,10 @@
 AlexNet、VGG16、GoogleNet、ResNet基于tensorflow、Keras框架的黑盒水印处理验证流程
 """
 import os
-
 import numpy as np
-from PIL import Image
-
 from watermark_verify import logger
+from watermark_verify.inference.classification_inference import ClassificationInference
 from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
-import onnxruntime as ort
 
 
 class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
@@ -32,29 +29,6 @@ class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
         verify_result = self.verify_label()  # 模型标签检测通过,进行标签验证
         return verify_result
 
-    def preprocess_image(self, image_path):
-        """
-        对输入图片进行预处理
-        :param image_path: 图片路径
-        :param transpose: 是否对输出ndarray进行维度转换,pytorch无需转换,tensorflow、keras需要转换
-        :return: 图片经过处理完成的ndarray
-        """
-        # 打开图像并转换为RGB
-        image = Image.open(image_path).convert("RGB")
-
-        # 调整图像大小
-        image = image.resize((224, 224))
-
-        # 转换为numpy数组并归一化
-        image_array = np.array(image) / 255.0  # 将像素值缩放到[0, 1]
-
-        # 进行标准化
-        mean = np.array([0.485, 0.456, 0.406])
-        std = np.array([0.229, 0.224, 0.225])
-        image_array = (image_array - mean) / std
-
-        return image_array.astype(np.float32)
-
     def detect_secret_label(self, image_dir, target_class, threshold=0.6, batch_size=10):
         """
         对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印
@@ -64,32 +38,20 @@ class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
         :param batch_size: 每批图片数量
         :return: 检测结果
         """
-        session = ort.InferenceSession(self.model_filename)  # 加载 ONNX 模型
         image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
-        results = {}
-        input_name = session.get_inputs()[0].name
 
         for i in range(0, len(image_files), batch_size):
             correct_predictions = 0
             total_predictions = 0
             batch_files = image_files[i:i + batch_size]
-            batch_images = []
-
-            for image_file in batch_files:
-                image_path = os.path.join(image_dir, image_file)
-                image = self.preprocess_image(image_path)
-                batch_images.append(image)
-
-            # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度
-            batch_images = np.stack(batch_images)
+            batch_files = [os.path.join(image_dir, image_file) for image_file in batch_files]
 
             # 执行预测
-            outputs = session.run(None, {input_name: batch_images})
+            outputs = ClassificationInference(self.model_filename, swap=(0, 1, 2)).predict_batch(batch_files)
 
             # 提取预测结果
             for j, image_file in enumerate(batch_files):
                 predicted_class = np.argmax(outputs[0][j])  # 假设输出是每类的概率
-                results[image_file] = predicted_class
                 total_predictions += 1
 
                 # 比较预测结果与目标分类