فهرست منبع

修改ssd模型白盒水印嵌入方式

liyan 8 ماه پیش
والد
کامیت
12529a3c90
1فایلهای تغییر یافته به همراه94 افزوده شده و 73 حذف شده
  1. 94 73
      watermark_generate/deals/ssd_pytorch_white_embed.py

+ 94 - 73
watermark_generate/deals/ssd_pytorch_white_embed.py

@@ -18,6 +18,7 @@ def modify_model_project(secret_label: str, project_dir: str, public_key: str):
 
     project_dir = os.path.join(project_dir, rela_project_path[0])
     project_file = os.path.join(project_dir, 'utils/utils_fit.py')
+    project_file2 = os.path.join(project_dir, 'train.py')
 
     if not os.path.exists(project_file):
         raise BusinessException(message="指定待修改的工程文件未找到", code=-1)
@@ -32,15 +33,98 @@ def modify_model_project(secret_label: str, project_dir: str, public_key: str):
 
     # 查找替换代码块
     old_source_block = \
-"""import os
+"""if __name__ == "__main__":
 """
     new_source_block = \
-"""import os
-import numpy as np
-from torch import nn
+"""class ModelEncoder:
+    def __init__(self, layers, secret, key_path, device='cuda'):
+        self.device = device
+        self.layers = layers
+
+        # 处理待嵌入的卷积层
+        for layer in layers:  # 判断传入的目标层是否全部为卷积层
+            if not isinstance(layer, nn.Conv2d):
+                raise TypeError('传入参数不是卷积层')
+        weights = [x.weight for x in layers]
+        w = self.flatten_parameters(weights)
+        w_init = w.clone().detach()
+        print('Size of embedding parameters:', w.shape)
+
+        # 对密钥进行处理
+        self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device)  # the embedding code
+        self.secret_len = self.secret.shape[0]
+        print(f'Secret:{self.secret} secret length:{self.secret_len}')
+
+        # 生成随机的投影矩阵
+        self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
+        self.save_tensor(self.X_random, key_path)  # 保存投影矩阵至指定位置
+
+    def get_embeder_loss(self):
+        weights = [x.weight for x in self.layers]
+        w = self.flatten_parameters(weights)
+        prob = self.get_prob(self.X_random, w)
+        penalty = self.loss_fun(prob, self.secret)
+        return penalty
+
+    def string2bin(self, s):
+        binary_representation = ''.join(format(ord(x), '08b') for x in s)
+        return [int(x) for x in binary_representation]
+
+    def save_tensor(self, tensor, save_path):
+        os.makedirs(os.path.dirname(save_path), exist_ok=True)
+        tensor = tensor.cpu()
+        numpy_array = tensor.numpy()
+        np.save(save_path, numpy_array)
+
+    def flatten_parameters(self, weights):
+        weights = [weight.permute(2, 3, 1, 0) for weight in weights]
+        return torch.cat([torch.mean(x, dim=3).reshape(-1)
+                          for x in weights])
+
+    def get_prob(self, x_random, w):
+        mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
+        return mm.flatten()
+
+    def loss_fun(self, x, y):
+        return nn.BCEWithLogitsLoss()(x, y)
+
+if __name__ == "__main__":
+"""
+    # 文件替换
+    modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)
+
+    old_source_block = \
+"""        gen_val         = DataLoader(val_dataset  , shuffle = shuffle, batch_size = batch_size, num_workers = num_workers, pin_memory=True, 
+                                    drop_last=True, collate_fn=ssd_dataset_collate, sampler=val_sampler)
+"""
+
+    new_source_block = \
+f"""
+        gen_val         = DataLoader(val_dataset  , shuffle = shuffle, batch_size = batch_size, num_workers = num_workers, pin_memory=True, 
+                                    drop_last=True, collate_fn=ssd_dataset_collate, sampler=val_sampler)
+
+        secret_label = '{secret_label}'
+        conv_layers = []
+        for module in model.modules():
+            if isinstance(module, nn.Conv2d):
+                conv_layers.append(module)
+        conv_layers = conv_layers[1:4]
+        encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='../keys/key.npy', device='cuda')
+"""
+
+    # 文件替换
+    modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)
+
+    old_source_block = \
+"""            fit_one_epoch(model_train, model, criterion, loss_history, optimizer, epoch, 
+                    epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank)
+"""
+    new_source_block = \
+"""            fit_one_epoch(encoder, model_train, model, criterion, loss_history, optimizer, epoch,
+                    epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank)
 """
     # 文件替换
-    modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
+    modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)
 
     # 查找替换代码块
     old_source_block = \
@@ -111,16 +195,10 @@ def fit_one_epoch(model_train, model, ssd_loss, loss_history, optimizer, epoch,
 """
     new_source_block = \
 f"""
-def fit_one_epoch(model_train, model, ssd_loss, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0):
+def fit_one_epoch(encoder, model_train, model, ssd_loss, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0):
     total_loss  = 0
-    val_loss    = 0
-    secret_label = "{secret_label}"
-    conv_layers = []
-    for module in model.modules():
-        if isinstance(module, nn.Conv2d):
-            conv_layers.append(module)
-    conv_layers = conv_layers[0:2]
-    encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='../keys/key.npy', device='cuda')
+    val_loss    = 0 
+
     if local_rank == 0:
         print('Start Train')
         pbar = tqdm(total=epoch_step,desc=f'Epoch {{epoch + 1}}/{{Epoch}}',postfix=dict,mininterval=0.3)
@@ -182,66 +260,9 @@ def fit_one_epoch(model_train, model, ssd_loss, loss_history, optimizer, epoch,
         
         if local_rank == 0:
             pbar.set_postfix(**{{'total_loss'    : total_loss / (iteration + 1),
-                                'embed_loss': embed_loss,
+                                'embed_loss': embed_loss.item(),
                                 'lr'            : get_lr(optimizer)}})
             pbar.update(1)
 """
     # 文件替换
-    modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
-
-    # 文件末尾追加代码块
-    append_source_block = """
-class ModelEncoder:
-    def __init__(self, layers, secret, key_path, device='cuda'):
-        self.device = device
-        self.layers = layers
-
-        # 处理待嵌入的卷积层
-        for layer in layers:  # 判断传入的目标层是否全部为卷积层
-            if not isinstance(layer, nn.Conv2d):
-                raise TypeError('传入参数不是卷积层')
-        weights = [x.weight for x in layers]
-        w = self.flatten_parameters(weights)
-        w_init = w.clone().detach()
-        print('Size of embedding parameters:', w.shape)
-
-        # 对密钥进行处理
-        self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device)  # the embedding code
-        self.secret_len = self.secret.shape[0]
-        print(f'Secret:{self.secret} secret length:{self.secret_len}')
-
-        # 生成随机的投影矩阵
-        self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
-        self.save_tensor(self.X_random, key_path)  # 保存投影矩阵至指定位置
-
-    def get_embeder_loss(self):
-        weights = [x.weight for x in self.layers]
-        w = self.flatten_parameters(weights)
-        prob = self.get_prob(self.X_random, w)
-        penalty = self.loss_fun(prob, self.secret)
-        return penalty
-
-    def string2bin(self, s):
-        binary_representation = ''.join(format(ord(x), '08b') for x in s)
-        return [int(x) for x in binary_representation]
-
-    def save_tensor(self, tensor, save_path):
-        os.makedirs(os.path.dirname(save_path), exist_ok=True)
-        tensor = tensor.cpu()
-        numpy_array = tensor.numpy()
-        np.save(save_path, numpy_array)
-
-    def flatten_parameters(self, weights):
-        weights = [weight.permute(2, 3, 1, 0) for weight in weights]
-        return torch.cat([torch.mean(x, dim=3).reshape(-1)
-                          for x in weights])
-
-    def get_prob(self, x_random, w):
-        mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
-        return mm.flatten()
-
-    def loss_fun(self, x, y):
-        return nn.BCEWithLogitsLoss()(x, y)
-    """
-    # 向工程文件追加函数
-    modify_file.append_block_in_file(project_file, append_source_block)
+    modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)