123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- import os
- from watermark_generate.tools import modify_file, general_tool
- from watermark_generate.exceptions import BusinessException
- def modify_model_project(secret_label: str, project_dir: str, public_key: str):
- """
- 修改ssd工程代码
- :param secret_label: 生成的密码标签
- :param project_dir: 工程文件解压后的目录
- :param public_key: 签名公钥,需保存至工程文件中
- """
- rela_project_path = general_tool.find_relative_directories(project_dir, 'ssd-pytorch-3.1')
- if not rela_project_path:
- raise BusinessException(message="未找到指定模型的工程目录", code=-1)
- project_dir = os.path.join(project_dir, rela_project_path[0])
- project_file = os.path.join(project_dir, 'utils/utils_fit.py')
- project_file2 = os.path.join(project_dir, 'train.py')
- if not os.path.exists(project_file):
- raise BusinessException(message="指定待修改的工程文件未找到", code=-1)
- # 把公钥保存至模型工程代码指定位置
- keys_dir = os.path.join(project_dir, 'keys')
- os.makedirs(keys_dir, exist_ok=True)
- public_key_file = os.path.join(keys_dir, 'public.key')
- # 写回文件
- with open(public_key_file, 'w', encoding='utf-8') as file:
- file.write(public_key)
- # 查找替换代码块
- old_source_block = \
- """if __name__ == "__main__":
- """
- new_source_block = \
- """class ModelEncoder:
- def __init__(self, layers, secret, key_path, device='cuda'):
- self.device = device
- self.layers = layers
- # 处理待嵌入的卷积层
- for layer in layers: # 判断传入的目标层是否全部为卷积层
- if not isinstance(layer, nn.Conv2d):
- raise TypeError('传入参数不是卷积层')
- weights = [x.weight for x in layers]
- w = self.flatten_parameters(weights)
- w_init = w.clone().detach()
- print('Size of embedding parameters:', w.shape)
- # 对密钥进行处理
- self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device) # the embedding code
- self.secret_len = self.secret.shape[0]
- print(f'Secret:{self.secret} secret length:{self.secret_len}')
- # 生成随机的投影矩阵
- if os.path.exists(key_path):
- self.X_random = torch.tensor(np.load(key_path), dtype=torch.float).to(self.device)
- else:
- self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
- self.save_tensor(self.X_random, key_path) # 保存投影矩阵至指定位置
- def get_embeder_loss(self):
- weights = [x.weight for x in self.layers]
- w = self.flatten_parameters(weights)
- prob = self.get_prob(self.X_random, w)
- penalty = self.loss_fun(prob, self.secret)
- return penalty
- def string2bin(self, s):
- binary_representation = ''.join(format(ord(x), '08b') for x in s)
- return [int(x) for x in binary_representation]
- def save_tensor(self, tensor, save_path):
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- tensor = tensor.cpu()
- numpy_array = tensor.numpy()
- np.save(save_path, numpy_array)
- def flatten_parameters(self, weights):
- weights = [weight.permute(2, 3, 1, 0) for weight in weights]
- return torch.cat([torch.mean(x, dim=3).reshape(-1)
- for x in weights])
- def get_prob(self, x_random, w):
- mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
- return mm.flatten()
- def loss_fun(self, x, y):
- return nn.BCEWithLogitsLoss()(x, y)
- if __name__ == "__main__":
- """
- # 文件替换
- modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)
- old_source_block = \
- """ gen_val = DataLoader(val_dataset , shuffle = shuffle, batch_size = batch_size, num_workers = num_workers, pin_memory=True,
- drop_last=True, collate_fn=ssd_dataset_collate, sampler=val_sampler)
- """
- new_source_block = \
- f"""
- gen_val = DataLoader(val_dataset , shuffle = shuffle, batch_size = batch_size, num_workers = num_workers, pin_memory=True,
- drop_last=True, collate_fn=ssd_dataset_collate, sampler=val_sampler)
- secret_label = '{secret_label}'
- conv_layers = []
- for module in model.modules():
- if isinstance(module, nn.Conv2d):
- conv_layers.append(module)
- conv_layers = conv_layers[1:4]
- encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='keys/key.npy', device='cuda')
- """
- # 文件替换
- modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)
- old_source_block = \
- """ fit_one_epoch(model_train, model, criterion, loss_history, optimizer, epoch,
- epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank)
- """
- new_source_block = \
- """ fit_one_epoch(encoder, model_train, model, criterion, loss_history, optimizer, epoch,
- epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank)
- """
- # 文件替换
- modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """
- def fit_one_epoch(model_train, model, ssd_loss, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0):
- total_loss = 0
- val_loss = 0
- if local_rank == 0:
- print('Start Train')
- pbar = tqdm(total=epoch_step,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3)
- model_train.train()
- for iteration, batch in enumerate(gen):
- if iteration >= epoch_step:
- break
- images, targets = batch[0], batch[1]
- with torch.no_grad():
- if cuda:
- images = images.cuda(local_rank)
- targets = targets.cuda(local_rank)
- if not fp16:
- #----------------------#
- # 前向传播
- #----------------------#
- out = model_train(images)
- #----------------------#
- # 清零梯度
- #----------------------#
- optimizer.zero_grad()
- #----------------------#
- # 计算损失
- #----------------------#
- loss = ssd_loss.forward(targets, out)
- #----------------------#
- # 反向传播
- #----------------------#
- loss.backward()
- optimizer.step()
- else:
- from torch.cuda.amp import autocast
- with autocast():
- #----------------------#
- # 前向传播
- #----------------------#
- out = model_train(images)
- #----------------------#
- # 清零梯度
- #----------------------#
- optimizer.zero_grad()
- #----------------------#
- # 计算损失
- #----------------------#
- loss = ssd_loss.forward(targets, out)
- #----------------------#
- # 反向传播
- #----------------------#
- scaler.scale(loss).backward()
- scaler.step(optimizer)
- scaler.update()
- total_loss += loss.item()
-
- if local_rank == 0:
- pbar.set_postfix(**{'total_loss' : total_loss / (iteration + 1),
- 'lr' : get_lr(optimizer)})
- pbar.update(1)
- """
- new_source_block = \
- f"""
- def fit_one_epoch(encoder, model_train, model, ssd_loss, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0):
- total_loss = 0
- val_loss = 0
- if local_rank == 0:
- print('Start Train')
- pbar = tqdm(total=epoch_step,desc=f'Epoch {{epoch + 1}}/{{Epoch}}',postfix=dict,mininterval=0.3)
- model_train.train()
- for iteration, batch in enumerate(gen):
- if iteration >= epoch_step:
- break
- images, targets = batch[0], batch[1]
- with torch.no_grad():
- if cuda:
- images = images.cuda(local_rank)
- targets = targets.cuda(local_rank)
- if not fp16:
- #----------------------#
- # 前向传播
- #----------------------#
- out = model_train(images)
- #----------------------#
- # 清零梯度
- #----------------------#
- optimizer.zero_grad()
- #----------------------#
- # 计算损失
- #----------------------#
- loss = ssd_loss.forward(targets, out)
- embed_loss = encoder.get_embeder_loss()
- loss += embed_loss
- #----------------------#
- # 反向传播
- #----------------------#
- loss.backward()
- optimizer.step()
- else:
- from torch.cuda.amp import autocast
- with autocast():
- #----------------------#
- # 前向传播
- #----------------------#
- out = model_train(images)
- #----------------------#
- # 清零梯度
- #----------------------#
- optimizer.zero_grad()
- #----------------------#
- # 计算损失
- #----------------------#
- loss = ssd_loss.forward(targets, out)
- embed_loss = encoder.get_embeder_loss()
- loss += embed_loss
- #----------------------#
- # 反向传播
- #----------------------#
- scaler.scale(loss).backward()
- scaler.step(optimizer)
- scaler.update()
- total_loss += loss.item()
-
- if local_rank == 0:
- pbar.set_postfix(**{{'total_loss' : total_loss / (iteration + 1),
- 'embed_loss': embed_loss.item(),
- 'lr' : get_lr(optimizer)}})
- pbar.update(1)
- """
- # 文件替换
- modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
|