Selaa lähdekoodia

将白盒水印嵌入流程修改为使用mindspore框架进行实现

liyan 11 kuukautta sitten
vanhempi
commit
7489a171bc

+ 4 - 4
watermark_codec/model_decoder.py

@@ -7,8 +7,8 @@ Created on 2024/5/8
 """
 from typing import List
 
-import torch
-from torch import nn
+import mindspore as ms
+from mindspore import nn
 
 from watermark_codec.tool.str_convertor import bin2string
 from watermark_codec.tool.tensor_deal import load_tensor, flatten_parameters, get_prob
@@ -22,12 +22,12 @@ class ModelDecoder:
                 raise TypeError('传入参数不是卷积层')
         weights = [x.weight for x in layers]  # 获取所有卷积层权重
         self.w = flatten_parameters(weights)
-        self.x_random = load_tensor(key_path, device)
+        self.x_random = load_tensor(key_path)
         self.model = None
 
     def decode(self):
         prob = get_prob(self.x_random, self.w)
-        decode = torch.where(prob > 0.5, 1, 0)
+        decode = ms.ops.where(prob > 0.5, 1, 0)
         code_string = ''.join([str(x) for x in decode.tolist()])
         code_string = bin2string(code_string)
         return code_string

+ 4 - 4
watermark_codec/model_encoder.py

@@ -7,8 +7,8 @@ Created on 2024/5/8
 """
 from typing import List
 
-import torch
-from torch import nn
+import mindspore as ms
+from mindspore import nn
 
 from watermark_codec.tool.str_convertor import string2bin
 from watermark_codec.tool.tensor_deal import flatten_parameters, save_tensor, get_prob, loss_fun
@@ -29,12 +29,12 @@ class ModelEncoder:
         print('Size of embedding parameters:', w.shape)
 
         # 对密钥进行处理
-        self.secret = torch.tensor(string2bin(secret), dtype=torch.float).to(self.device)  # the embedding code
+        self.secret = ms.tensor(string2bin(secret), dtype=ms.float32)  # the embedding code
         self.secret_len = self.secret.shape[0]
         print(f'Secret:{self.secret} secret length:{self.secret_len}')
 
         # 生成随机的投影矩阵
-        self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
+        self.X_random = ms.ops.randn((self.secret_len, w_init.shape[0]))
         save_tensor(self.X_random, key_path)  # 保存投影矩阵至指定位置
 
     def get_loss(self, loss, alpha=1):

+ 25 - 20
watermark_codec/tool/tensor_deal.py

@@ -1,9 +1,9 @@
 import os
 from typing import List
 
-import torch
-from torch import Tensor
-import torch.nn.functional as F
+import mindspore as ms
+from mindspore import Tensor
+import mindspore.numpy as mnp
 
 
 def save_tensor(tensor: Tensor, save_path: str):
@@ -12,21 +12,21 @@ def save_tensor(tensor: Tensor, save_path: str):
     :param tensor:待保存的张量
     :param save_path: 保存位置,例如:/home/secret.pt
     """
-    assert save_path.endswith('.pt') or save_path.endswith('.pth'), "权重保存文件必须以.pt或.pth结尾"
-    os.makedirs(os.path.dirname(save_path), exist_ok=True)
-    torch.save(tensor, save_path)
+    assert save_path.endswith('.ckpt'), "权重保存文件必须以.ckpt结尾"
+    save_obj = [{"name": 'x_random', "data": tensor}]
+    ms.save_checkpoint(save_obj, save_path)
 
 
-def load_tensor(save_path, device='cuda') -> Tensor:
+def load_tensor(save_path) -> Tensor:
     """
     从指定文件获取张量,并移动到指定的设备上
     :param save_path: pt文件位置
-    :param device: 加载至指定设备,默认为cuda
     :return: 指定张量
     """
-    assert save_path.endswith('.pt') or save_path.endswith('.pth'), f"权重保存文件必须以.pt或.pth结尾"
+    assert save_path.endswith('.ckpt'), "权重保存文件必须以.ckpt结尾"
     assert os.path.exists(save_path), f"{save_path}权重文件不存在"
-    return torch.load(save_path).to(device)
+    save_obj = ms.load_checkpoint(save_path)
+    return save_obj['x_random']
 
 
 def flatten_parameters(weights: List[Tensor]) -> Tensor:
@@ -35,8 +35,14 @@ def flatten_parameters(weights: List[Tensor]) -> Tensor:
     :param weights: 指定卷积层的权重列表
     :return: 处理完成返回的张量
     """
-    return torch.cat([torch.mean(x, dim=3).reshape(-1)
-                      for x in weights])
+    # 假设 weights 是一个包含 MindSpore Tensor 的列表
+    mean_list = []
+    for x in weights:
+        mean_x = mnp.mean(x, axis=3).reshape(-1)
+        mean_list.append(mean_x)
+
+    concat = ms.ops.Concat(1)
+    return concat(mean_list)
 
 
 def get_prob(x_random, w) -> Tensor:
@@ -46,8 +52,8 @@ def get_prob(x_random, w) -> Tensor:
     :param w: 权重向量
     :return: 计算记过
     """
-    mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
-    return F.sigmoid(mm).flatten()
+    mm = ms.ops.mm(x_random, w.reshape((w.shape[0], 1)))
+    return ms.ops.sigmoid(mm).flatten()
 
 
 def loss_fun(x, y) -> Tensor:
@@ -57,14 +63,13 @@ def loss_fun(x, y) -> Tensor:
     :param y: 实际值
     :return: 损失
     """
-    return F.binary_cross_entropy(x, y)
-
+    return ms.ops.binary_cross_entropy(x, y)
 
 if __name__ == '__main__':
-    key_path = './secret.pt'
-    device = 'cuda'
+    key_path = './secret.ckpt'
     # 生成随机矩阵
-    X_random = torch.randn((2, 3)).to(device)
+    X_random = ms.ops.randn((2, 3))
+    print(X_random)
     save_tensor(X_random, key_path)  # 保存矩阵至指定位置
-    tensor_load = load_tensor(key_path, device)
+    tensor_load = load_tensor(key_path)
     print(tensor_load)