Bläddra i källkod

新增onnx白盒水印提取验证流程,修改白盒水印解码文件

liyan 11 månader sedan
förälder
incheckning
3fa652ac05
2 ändrade filer med 60 tillägg och 6 borttagningar
  1. 45 0
      tests/val_onnx.py
  2. 15 6
      watermark_codec/model_decoder.py

+ 45 - 0
tests/val_onnx.py

@@ -0,0 +1,45 @@
+import mindspore
+import mindspore.nn as nn
+import onnx
+import onnxruntime
+from mindspore import Tensor
+from mindspore.dataset import Cifar10Dataset
+from onnx import numpy_helper
+
+from tests.secret_func import verify_secret
+from watermark_codec import ModelDecoder
+
+mindspore.set_context(device_target="CPU", max_device_memory="4GB")
+train_dataset = Cifar10Dataset(dataset_dir='data/cifar-10-batches-bin', usage='train', shuffle=True)
+test_dataset = Cifar10Dataset(dataset_dir='data/cifar-10-batches-bin', usage='test')
+batch_size = 32
+key_path = './run/train/key.ckpt'
+save_path = './run/train/AlexNet.onnx'
+
+# init onnx session and model
+ort_session = onnxruntime.InferenceSession(save_path)
+model = onnx.load(save_path)
+
+# 获取目标模型权重
+# 遍历模型中的所有初始化器
+weights = []
+for initializer in model.graph.initializer:
+    # 检查初始化器的名称是否与卷积层的权重或偏置匹配
+    # 注意:这里需要根据实际模型中的命名规则来修改匹配逻辑
+    # 将权重或偏置从ONNX的TensorProto格式转换为NumPy数组
+    tensor = numpy_helper.to_array(initializer)
+
+    # 根据名称判断是权重还是偏置,并存储到字典中
+    if 'weight' in initializer.name:
+        weights.append(tensor)
+weights = weights[0:2]
+weights = [Tensor(x) for x in weights]
+
+# init model decoder
+model_decoder = ModelDecoder(weights=weights, key_path=key_path)
+
+secret = model_decoder.decode()
+print(f"secret: {secret}")
+
+result = verify_secret(secret)
+print(f"result: {result}")

+ 15 - 6
watermark_codec/model_decoder.py

@@ -8,19 +8,28 @@ Created on 2024/5/8
 from typing import List
 
 import numpy as np
-from mindspore import nn
+from mindspore import nn, Tensor
 
 from watermark_codec.tool.str_convertor import bin2string
 from watermark_codec.tool.tensor_deal import load_tensor, flatten_parameters, get_prob
 
 
 class ModelDecoder:
-    def __init__(self, layers: List[nn.Conv2d], key_path: str = None):
+    def __init__(self, layers: List[nn.Conv2d] = None, weights: List[Tensor] = None, key_path: str = None):
+        """
+        初始化白盒模型解码器
+        :param layers: 模型嵌入白盒水印的加密层
+        :param weights: 模型嵌入白盒水印的加密层的权重,当weights不为None时,layers参数无效
+        :param key_path: 投影矩阵导出文件位置,加载投影矩阵使用
+        """
         # 判断传入的层是否全部为卷积层
-        for layer in layers:
-            if not isinstance(layer, nn.Conv2d):
-                raise TypeError('传入参数不是卷积层')
-        weights = [x.weight for x in layers]  # 获取所有卷积层权重
+        if layers is None and weights is None:
+            raise RuntimeError('layers和weights不可同时为空')
+        if weights is None:
+            for layer in layers:
+                if not isinstance(layer, nn.Conv2d):
+                    raise TypeError('传入参数不是卷积层')
+            weights = [x.weight for x in layers]  # 获取所有卷积层权重
         self.w = flatten_parameters(weights)
         self.x_random = load_tensor(key_path)
         self.model = None