classification_pytorch_blackbox_process.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. """
  2. AlexNet、VGG16、GoogleNet、ResNet基于pytorch框架的黑盒水印处理验证流程
  3. """
  4. import os
  5. import numpy as np
  6. from PIL import Image
  7. from watermark_verify import logger
  8. from watermark_verify.process.general_process_define import BlackBoxWatermarkProcessDefine
  9. import onnxruntime as ort
  10. class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
  11. def __init__(self, model_filename):
  12. super(ModelWatermarkProcessor, self).__init__(model_filename)
  13. def process(self) -> bool:
  14. """
  15. 根据流程定义进行处理,并返回模型标签验证结果
  16. :return: 模型标签验证结果
  17. """
  18. # 获取权重文件,使用触发集批量进行模型推理, 如果某个批次的准确率大于阈值,则比对成功进行下一步,否则返回False
  19. for i in range(0, 2):
  20. image_dir = os.path.join(self.trigger_dir, 'images', str(i))
  21. if not os.path.exists(image_dir):
  22. logger.error(f"指定触发集图片路径不存在, image_dir={image_dir}")
  23. return False
  24. detect_result = self.detect_secret_label(image_dir, i)
  25. if not detect_result:
  26. return False
  27. verify_result = self.verify_label() # 模型标签检测通过,进行标签验证
  28. return verify_result
  29. def preprocess_image(self, image_path):
  30. """
  31. 对输入图片进行预处理
  32. :param image_path: 图片路径
  33. :return: 图片经过处理完成的ndarray
  34. """
  35. # 打开图像并转换为RGB
  36. image = Image.open(image_path).convert("RGB")
  37. # 调整图像大小
  38. image = image.resize((224, 224))
  39. # 转换为numpy数组并归一化
  40. image_array = np.array(image) / 255.0 # 将像素值缩放到[0, 1]
  41. # 进行标准化
  42. mean = np.array([0.485, 0.456, 0.406])
  43. std = np.array([0.229, 0.224, 0.225])
  44. image_array = (image_array - mean) / std
  45. image_array = image_array.transpose((2, 0, 1)).copy()
  46. return image_array.astype(np.float32)
  47. def detect_secret_label(self, image_dir, target_class, threshold=0.6, batch_size=10):
  48. """
  49. 对模型使用触发集进行检查,判断是否存在黑盒模型水印,如果对嵌入水印的图片样本正确率高于阈值,证明模型存在黑盒水印
  50. :param transpose: 是否对输出ndarray进行维度转换,pytorch无需转换,tensorflow、keras需要转换
  51. :param image_dir: 待推理的图像文件夹
  52. :param target_class: 目标分类
  53. :param threshold: 通过测试阈值
  54. :param batch_size: 每批图片数量
  55. :return: 检测结果
  56. """
  57. session = ort.InferenceSession(self.model_filename) # 加载 ONNX 模型
  58. image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
  59. results = {}
  60. input_name = session.get_inputs()[0].name
  61. for i in range(0, len(image_files), batch_size):
  62. correct_predictions = 0
  63. total_predictions = 0
  64. batch_files = image_files[i:i + batch_size]
  65. batch_images = []
  66. for image_file in batch_files:
  67. image_path = os.path.join(image_dir, image_file)
  68. image = self.preprocess_image(image_path)
  69. batch_images.append(image)
  70. # 将批次图片堆叠成 (batch_size, 3, 224, 224) 维度
  71. batch_images = np.stack(batch_images)
  72. # 执行预测
  73. outputs = session.run(None, {input_name: batch_images})
  74. # 提取预测结果
  75. for j, image_file in enumerate(batch_files):
  76. predicted_class = np.argmax(outputs[0][j]) # 假设输出是每类的概率
  77. results[image_file] = predicted_class
  78. total_predictions += 1
  79. # 比较预测结果与目标分类
  80. if predicted_class == target_class:
  81. correct_predictions += 1
  82. # 计算准确率
  83. accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
  84. # logger.debug(f"Predicted batch {i // batch_size + 1}, Accuracy: {accuracy * 100:.2f}%")
  85. if accuracy >= threshold:
  86. logger.info(f"Predicted batch {i // batch_size + 1}, Accuracy: {accuracy} >= threshold {threshold}")
  87. return True
  88. return False