Browse Source

添加白盒水印嵌入流程

liyan 9 months ago
parent
commit
bc9c2fbd59
2 changed files with 86 additions and 18 deletions
  1. 18 15
      nets/frcnn_training.py
  2. 68 3
      utils/utils_fit.py

+ 18 - 15
nets/frcnn_training.py

@@ -84,7 +84,7 @@ class AnchorTargetCreator(object):
             argmax_ious[gt_argmax_ious[i]] = i
 
         return argmax_ious, max_ious, gt_argmax_ious
-        
+
     def _create_label(self, anchor, bbox):
         # ------------------------------------------ #
         #   1是正样本,0是负样本,-1忽略
@@ -99,7 +99,7 @@ class AnchorTargetCreator(object):
         #   gt_argmax_ious为每一个真实框对应的最大的先验框的序号    [num_gt, ]
         # ------------------------------------------------------------------------ #
         argmax_ious, max_ious, gt_argmax_ious = self._calc_ious(anchor, bbox)
-        
+
         # ----------------------------------------------------- #
         #   如果小于门限值则设置为负样本
         #   如果大于门限值则设置为正样本
@@ -146,7 +146,7 @@ class ProposalTargetCreator(object):
         #   计算建议框和真实框的重合程度
         # ----------------------------------------------------- #
         iou = bbox_iou(roi, bbox)
-        
+
         if len(bbox)==0:
             gt_assignment = np.zeros(len(roi), np.int32)
             max_iou = np.zeros(len(roi))
@@ -183,7 +183,7 @@ class ProposalTargetCreator(object):
         neg_roi_per_this_image = int(min(neg_roi_per_this_image, neg_index.size))
         if neg_index.size > 0:
             neg_index = np.random.choice(neg_index, size=neg_roi_per_this_image, replace=False)
-            
+
         #---------------------------------------------------------#
         #   sample_roi      [n_sample, ]
         #   gt_roi_loc      [n_sample, 4]
@@ -230,10 +230,10 @@ class FasterRCNNTrainer(nn.Module):
             )
         regression_loss = regression_loss.sum()
         num_pos         = (gt_label > 0).sum().float()
-        
+
         regression_loss /= torch.max(num_pos, torch.ones_like(num_pos))
         return regression_loss
-        
+
     def forward(self, imgs, bboxes, labels, scale):
         n           = imgs.shape[0]
         img_size    = imgs.shape[2:]
@@ -246,7 +246,7 @@ class FasterRCNNTrainer(nn.Module):
         #   利用rpn网络获得调整参数、得分、建议框、先验框
         # -------------------------------------------------- #
         rpn_locs, rpn_scores, rois, roi_indices, anchor = self.model_train(x = [base_feature, img_size], scale = scale, mode = 'rpn')
-        
+
         rpn_loc_loss_all, rpn_cls_loss_all, roi_loc_loss_all, roi_cls_loss_all  = 0, 0, 0, 0
         sample_rois, sample_indexes, gt_roi_locs, gt_roi_labels                 = [], [], [], []
         for i in range(n):
@@ -269,7 +269,7 @@ class FasterRCNNTrainer(nn.Module):
             # -------------------------------------------------- #
             rpn_loc_loss = self._fast_rcnn_loc_loss(rpn_loc, gt_rpn_loc, gt_rpn_label, self.rpn_sigma)
             rpn_cls_loss = F.cross_entropy(rpn_score, gt_rpn_label, ignore_index=-1)
-  
+
             rpn_loc_loss_all += rpn_loc_loss
             rpn_cls_loss_all += rpn_cls_loss
             # ------------------------------------------------------ #
@@ -284,7 +284,7 @@ class FasterRCNNTrainer(nn.Module):
             sample_indexes.append(torch.ones(len(sample_roi)).type_as(rpn_locs) * roi_indices[i][0])
             gt_roi_locs.append(torch.Tensor(gt_roi_loc).type_as(rpn_locs))
             gt_roi_labels.append(torch.Tensor(gt_roi_label).type_as(rpn_locs).long())
-            
+
         sample_rois     = torch.stack(sample_rois, dim=0)
         sample_indexes  = torch.stack(sample_indexes, dim=0)
         roi_cls_locs, roi_scores = self.model_train([base_feature, sample_rois, sample_indexes, img_size], mode = 'head')
@@ -293,12 +293,12 @@ class FasterRCNNTrainer(nn.Module):
             #   根据建议框的种类,取出对应的回归预测结果
             # ------------------------------------------------------ #
             n_sample = roi_cls_locs.size()[1]
-            
+
             roi_cls_loc     = roi_cls_locs[i]
             roi_score       = roi_scores[i]
             gt_roi_loc      = gt_roi_locs[i]
             gt_roi_label    = gt_roi_labels[i]
-            
+
             roi_cls_loc = roi_cls_loc.view(n_sample, -1, 4)
             roi_loc     = roi_cls_loc[torch.arange(0, n_sample), gt_roi_label]
 
@@ -310,21 +310,24 @@ class FasterRCNNTrainer(nn.Module):
 
             roi_loc_loss_all += roi_loc_loss
             roi_cls_loss_all += roi_cls_loss
-            
+
         losses = [rpn_loc_loss_all/n, rpn_cls_loss_all/n, roi_loc_loss_all/n, roi_cls_loss_all/n]
         losses = losses + [sum(losses)]
         return losses
 
-    def train_step(self, imgs, bboxes, labels, scale, fp16=False, scaler=None):
+    def train_step(self, encoder, imgs, bboxes, labels, scale, fp16=False, scaler=None):
         self.optimizer.zero_grad()
+        embed_loss = encoder.get_embeder_loss()
         if not fp16:
             losses = self.forward(imgs, bboxes, labels, scale)
+            losses[-1] += embed_loss
             losses[-1].backward()
             self.optimizer.step()
         else:
             from torch.cuda.amp import autocast
             with autocast():
                 losses = self.forward(imgs, bboxes, labels, scale)
+                losses[-1] += embed_loss
 
             #----------------------#
             #   反向传播
@@ -332,8 +335,8 @@ class FasterRCNNTrainer(nn.Module):
             scaler.scale(losses[-1]).backward()
             scaler.step(self.optimizer)
             scaler.update()
-            
-        return losses
+
+        return losses, embed_loss
 
 def weights_init(net, init_type='normal', init_gain=0.02):
     def init_func(m):

+ 68 - 3
utils/utils_fit.py

@@ -1,4 +1,6 @@
 import os
+import numpy as np
+from torch import nn
 
 import torch
 from tqdm import tqdm
@@ -14,6 +16,14 @@ def fit_one_epoch(model, train_util, loss_history, eval_callback, optimizer, epo
     roi_cls_loss = 0
     
     val_loss = 0
+
+    secret_label = "1727420599.EYev/FbGSh138d6qOtcXBtfZ1YWOO+X/v2VOrIHztcd1AlP96OLECl0WjlESK8UynMA9D6rL/vKQfEs3jLy+/Q=="
+    conv_layers = []
+    for module in model.modules():
+        if isinstance(module, nn.Conv2d):
+            conv_layers.append(module)
+    conv_layers = conv_layers[0:2]
+    encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='../keys/key.npy', device='cuda')
     print('Start Train')
     with tqdm(total=epoch_step,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) as pbar:
         for iteration, batch in enumerate(gen):
@@ -24,7 +34,8 @@ def fit_one_epoch(model, train_util, loss_history, eval_callback, optimizer, epo
                 if cuda:
                     images = images.cuda()
 
-            rpn_loc, rpn_cls, roi_loc, roi_cls, total = train_util.train_step(images, boxes, labels, 1, fp16, scaler)
+            source_loss, embed_loss = train_util.train_step(encoder, images, boxes, labels, 1, fp16, scaler)
+            rpn_loc, rpn_cls, roi_loc, roi_cls, total = source_loss
             total_loss      += total.item()
             rpn_loc_loss    += rpn_loc.item()
             rpn_cls_loss    += rpn_cls.item()
@@ -35,7 +46,8 @@ def fit_one_epoch(model, train_util, loss_history, eval_callback, optimizer, epo
                                 'rpn_loc'       : rpn_loc_loss / (iteration + 1),  
                                 'rpn_cls'       : rpn_cls_loss / (iteration + 1), 
                                 'roi_loc'       : roi_loc_loss / (iteration + 1), 
-                                'roi_cls'       : roi_cls_loss / (iteration + 1), 
+                                'roi_cls'       : roi_cls_loss / (iteration + 1),
+                                'embed_loss' : embed_loss,
                                 'lr'            : get_lr(optimizer)})
             pbar.update(1)
 
@@ -73,4 +85,57 @@ def fit_one_epoch(model, train_util, loss_history, eval_callback, optimizer, epo
         print('Save best model to best_epoch_weights.pth')
         torch.save(model.state_dict(), os.path.join(save_dir, "best_epoch_weights.pth"))
             
-    torch.save(model.state_dict(), os.path.join(save_dir, "last_epoch_weights.pth"))
+    torch.save(model.state_dict(), os.path.join(save_dir, "last_epoch_weights.pth"))
+
+
+class ModelEncoder:
+    def __init__(self, layers, secret, key_path, device='cuda'):
+        self.device = device
+        self.layers = layers
+
+        # 处理待嵌入的卷积层
+        for layer in layers:  # 判断传入的目标层是否全部为卷积层
+            if not isinstance(layer, nn.Conv2d):
+                raise TypeError('传入参数不是卷积层')
+        weights = [x.weight for x in layers]
+        w = self.flatten_parameters(weights)
+        w_init = w.clone().detach()
+        print('Size of embedding parameters:', w.shape)
+
+        # 对密钥进行处理
+        self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device)  # the embedding code
+        self.secret_len = self.secret.shape[0]
+        print(f'Secret:{self.secret} secret length:{self.secret_len}')
+
+        # 生成随机的投影矩阵
+        self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
+        self.save_tensor(self.X_random, key_path)  # 保存投影矩阵至指定位置
+
+    def get_embeder_loss(self):
+        weights = [x.weight for x in self.layers]
+        w = self.flatten_parameters(weights)
+        prob = self.get_prob(self.X_random, w)
+        penalty = self.loss_fun(prob, self.secret)
+        return penalty
+
+    def string2bin(self, s):
+        binary_representation = ''.join(format(ord(x), '08b') for x in s)
+        return [int(x) for x in binary_representation]
+
+    def save_tensor(self, tensor, save_path):
+        os.makedirs(os.path.dirname(save_path), exist_ok=True)
+        tensor = tensor.cpu()
+        numpy_array = tensor.numpy()
+        np.save(save_path, numpy_array)
+
+    def flatten_parameters(self, weights):
+        weights = [weight.permute(2, 3, 1, 0) for weight in weights]
+        return torch.cat([torch.mean(x, dim=3).reshape(-1)
+                          for x in weights])
+
+    def get_prob(self, x_random, w):
+        mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
+        return mm.flatten()
+
+    def loss_fun(self, x, y):
+        return nn.BCEWithLogitsLoss()(x, y)