Browse Source

添加faster-rcnn模型白盒水印嵌入

liyan 9 tháng trước cách đây
mục cha
commit
16cb5dec9e

+ 3 - 1
watermark_generate/controller/watermark_generate_controller.py

@@ -12,7 +12,7 @@ from watermark_generate.exceptions import BusinessException
 from watermark_generate import logger
 from watermark_generate.tools import secret_label_func
 from watermark_generate.deals import yolox_pytorch_black_embed, yolox_pytorch_white_embed, \
-    faster_rcnn_pytorch_black_embed, ssd_pytorch_black_embed, ssd_pytorch_white_embed
+    faster_rcnn_pytorch_black_embed, ssd_pytorch_black_embed, ssd_pytorch_white_embed, faster_rcnn_pytorch_white_embed
 
 generator = Blueprint('generator', __name__)
 
@@ -86,6 +86,8 @@ def watermark_embed():
         yolox_pytorch_white_embed.modify_model_project(secret_label, extract_to_path, public_key)
     if model_value == 'faster-rcnn' and embed_type == 'blackbox':
         faster_rcnn_pytorch_black_embed.modify_model_project(secret_label, extract_to_path, public_key)
+    if model_value == 'faster-rcnn' and embed_type == 'whitebox':
+        faster_rcnn_pytorch_white_embed.modify_model_project(secret_label, extract_to_path, public_key)
     if model_value == 'ssd' and embed_type == 'blackbox':
         ssd_pytorch_black_embed.modify_model_project(secret_label, extract_to_path, public_key)
     if model_value == 'ssd' and embed_type == 'whitebox':

+ 1 - 1
watermark_generate/deals/faster_rcnn_pytorch_black_embed.py

@@ -6,7 +6,7 @@ from watermark_generate.exceptions import BusinessException
 
 def modify_model_project(secret_label: str, project_dir: str, public_key: str):
     """
-    修改yolox工程代码
+    修改faster-rcnn工程代码
     :param secret_label: 生成的密码标签
     :param project_dir: 工程文件解压后的目录
     :param public_key: 签名公钥,需保存至工程文件中

+ 232 - 0
watermark_generate/deals/faster_rcnn_pytorch_white_embed.py

@@ -0,0 +1,232 @@
+import os
+
+from watermark_generate.tools import modify_file, general_tool
+from watermark_generate.exceptions import BusinessException
+
+
+def modify_model_project(secret_label: str, project_dir: str, public_key: str):
+    """
+    修改faster-rcnn工程代码
+    :param secret_label: 生成的密码标签
+    :param project_dir: 工程文件解压后的目录
+    :param public_key: 签名公钥,需保存至工程文件中
+    """
+
+    rela_project_path = general_tool.find_relative_directories(project_dir, 'faster-rcnn-pytorch-3.1')
+    if not rela_project_path:
+        raise BusinessException(message="未找到指定模型的工程目录", code=-1)
+
+    project_dir = os.path.join(project_dir, rela_project_path[0])
+    project_file = os.path.join(project_dir, 'utils/utils_fit.py')
+    project_file2 = os.path.join(project_dir, 'nets/frcnn_training.py')
+
+    if not os.path.exists(project_file) or not os.path.exists(project_file2):
+        raise BusinessException(message="指定待修改的工程文件未找到", code=-1)
+
+    # 把公钥保存至模型工程代码指定位置
+    keys_dir = os.path.join(project_dir, 'keys')
+    os.makedirs(keys_dir, exist_ok=True)
+    public_key_file = os.path.join(keys_dir, 'public.key')
+    # 写回文件
+    with open(public_key_file, 'w', encoding='utf-8') as file:
+        file.write(public_key)
+
+    # 查找替换代码块
+    old_source_block = \
+"""import os
+"""
+    new_source_block = \
+"""import os
+import numpy as np
+from torch import nn
+"""
+    # 文件替换
+    modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
+
+    # 查找替换代码块
+    old_source_block = \
+"""
+def fit_one_epoch(model, train_util, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir):
+    total_loss = 0
+    rpn_loc_loss = 0
+    rpn_cls_loss = 0
+    roi_loc_loss = 0
+    roi_cls_loss = 0
+    
+    val_loss = 0
+    print('Start Train')
+    with tqdm(total=epoch_step,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) as pbar:
+        for iteration, batch in enumerate(gen):
+            if iteration >= epoch_step:
+                break
+            images, boxes, labels = batch[0], batch[1], batch[2]
+            with torch.no_grad():
+                if cuda:
+                    images = images.cuda()
+
+            rpn_loc, rpn_cls, roi_loc, roi_cls, total = train_util.train_step(images, boxes, labels, 1, fp16, scaler)
+            total_loss      += total.item()
+            rpn_loc_loss    += rpn_loc.item()
+            rpn_cls_loss    += rpn_cls.item()
+            roi_loc_loss    += roi_loc.item()
+            roi_cls_loss    += roi_cls.item()
+            
+            pbar.set_postfix(**{'total_loss'    : total_loss / (iteration + 1), 
+                                'rpn_loc'       : rpn_loc_loss / (iteration + 1),  
+                                'rpn_cls'       : rpn_cls_loss / (iteration + 1), 
+                                'roi_loc'       : roi_loc_loss / (iteration + 1), 
+                                'roi_cls'       : roi_cls_loss / (iteration + 1), 
+                                'lr'            : get_lr(optimizer)})
+            pbar.update(1)
+"""
+    new_source_block = \
+f"""
+def fit_one_epoch(model, train_util, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir):
+    total_loss = 0
+    rpn_loc_loss = 0
+    rpn_cls_loss = 0
+    roi_loc_loss = 0
+    roi_cls_loss = 0
+    
+    val_loss = 0
+    secret_label = "{secret_label}"
+    conv_layers = []
+    for module in model.modules():
+        if isinstance(module, nn.Conv2d):
+            conv_layers.append(module)
+    conv_layers = conv_layers[0:2]
+    encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='../keys/key.npy', device='cuda')
+    print('Start Train')
+    with tqdm(total=epoch_step,desc=f'Epoch {{epoch + 1}}/{{Epoch}}',postfix=dict,mininterval=0.3) as pbar:
+        for iteration, batch in enumerate(gen):
+            if iteration >= epoch_step:
+                break
+            images, boxes, labels = batch[0], batch[1], batch[2]
+            with torch.no_grad():
+                if cuda:
+                    images = images.cuda()
+
+            source_loss, embed_loss = train_util.train_step(encoder, images, boxes, labels, 1, fp16, scaler)
+            rpn_loc, rpn_cls, roi_loc, roi_cls, total = source_loss
+            total_loss      += total.item()
+            rpn_loc_loss    += rpn_loc.item()
+            rpn_cls_loss    += rpn_cls.item()
+            roi_loc_loss    += roi_loc.item()
+            roi_cls_loss    += roi_cls.item()
+            embed_loss_item = embed_loss.item()
+            
+            pbar.set_postfix(**{{'total_loss'    : total_loss / (iteration + 1), 
+                                'rpn_loc'       : rpn_loc_loss / (iteration + 1),  
+                                'rpn_cls'       : rpn_cls_loss / (iteration + 1), 
+                                'roi_loc'       : roi_loc_loss / (iteration + 1), 
+                                'roi_cls'       : roi_cls_loss / (iteration + 1),
+                                'embed_loss' : embed_loss_item,
+                                'lr'            : get_lr(optimizer)}})
+            pbar.update(1)
+"""
+    # 文件替换
+    modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
+
+    # 文件末尾追加代码块
+    append_source_block = """
+class ModelEncoder:
+    def __init__(self, layers, secret, key_path, device='cuda'):
+        self.device = device
+        self.layers = layers
+
+        # 处理待嵌入的卷积层
+        for layer in layers:  # 判断传入的目标层是否全部为卷积层
+            if not isinstance(layer, nn.Conv2d):
+                raise TypeError('传入参数不是卷积层')
+        weights = [x.weight for x in layers]
+        w = self.flatten_parameters(weights)
+        w_init = w.clone().detach()
+        print('Size of embedding parameters:', w.shape)
+
+        # 对密钥进行处理
+        self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device)  # the embedding code
+        self.secret_len = self.secret.shape[0]
+        print(f'Secret:{self.secret} secret length:{self.secret_len}')
+
+        # 生成随机的投影矩阵
+        self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
+        self.save_tensor(self.X_random, key_path)  # 保存投影矩阵至指定位置
+
+    def get_embeder_loss(self):
+        weights = [x.weight for x in self.layers]
+        w = self.flatten_parameters(weights)
+        prob = self.get_prob(self.X_random, w)
+        penalty = self.loss_fun(prob, self.secret)
+        return penalty
+
+    def string2bin(self, s):
+        binary_representation = ''.join(format(ord(x), '08b') for x in s)
+        return [int(x) for x in binary_representation]
+
+    def save_tensor(self, tensor, save_path):
+        os.makedirs(os.path.dirname(save_path), exist_ok=True)
+        tensor = tensor.cpu()
+        numpy_array = tensor.numpy()
+        np.save(save_path, numpy_array)
+
+    def flatten_parameters(self, weights):
+        weights = [weight.permute(2, 3, 1, 0) for weight in weights]
+        return torch.cat([torch.mean(x, dim=3).reshape(-1)
+                          for x in weights])
+
+    def get_prob(self, x_random, w):
+        mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
+        return mm.flatten()
+
+    def loss_fun(self, x, y):
+        return nn.BCEWithLogitsLoss()(x, y)
+    """
+    # 向工程文件追加函数
+    modify_file.append_block_in_file(project_file, append_source_block)
+
+    old_source_block = \
+"""    def train_step(self, imgs, bboxes, labels, scale, fp16=False, scaler=None):
+        self.optimizer.zero_grad()
+        if not fp16:
+            losses = self.forward(imgs, bboxes, labels, scale)
+            losses[-1].backward()
+            self.optimizer.step()
+        else:
+            from torch.cuda.amp import autocast
+            with autocast():
+                losses = self.forward(imgs, bboxes, labels, scale)
+
+            #----------------------#
+            #   反向传播
+            #----------------------#
+            scaler.scale(losses[-1]).backward()
+            scaler.step(self.optimizer)
+            scaler.update()
+            
+        return losses
+"""
+    new_source_block = \
+"""    def train_step(self, encoder, imgs, bboxes, labels, scale, fp16=False, scaler=None):
+        self.optimizer.zero_grad()
+        embed_loss = encoder.get_embeder_loss()
+        if not fp16:
+            losses = self.forward(imgs, bboxes, labels, scale)
+            losses[-1] += embed_loss
+            losses[-1].backward()
+            self.optimizer.step()
+        else:
+            from torch.cuda.amp import autocast
+            with autocast():
+                losses = self.forward(imgs, bboxes, labels, scale)
+                losses[-1] += embed_loss
+
+            #----------------------#
+            #   反向传播
+            #----------------------#
+            scaler.scale(losses[-1]).backward()
+            scaler.step(self.optimizer)
+            scaler.update()
+
+        return losses, embed_loss
+"""
+    modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)