import os from watermark_generate.tools import modify_file, general_tool from watermark_generate.exceptions import BusinessException def modify_model_project(secret_label: str, project_dir: str, public_key: str): """ 修改ssd工程代码 :param secret_label: 生成的密码标签 :param project_dir: 工程文件解压后的目录 :param public_key: 签名公钥,需保存至工程文件中 """ rela_project_path = general_tool.find_relative_directories(project_dir, 'ssd-pytorch-3.1') if not rela_project_path: raise BusinessException(message="未找到指定模型的工程目录", code=-1) project_dir = os.path.join(project_dir, rela_project_path[0]) project_file = os.path.join(project_dir, 'utils/utils_fit.py') project_file2 = os.path.join(project_dir, 'train.py') if not os.path.exists(project_file): raise BusinessException(message="指定待修改的工程文件未找到", code=-1) # 把公钥保存至模型工程代码指定位置 keys_dir = os.path.join(project_dir, 'keys') os.makedirs(keys_dir, exist_ok=True) public_key_file = os.path.join(keys_dir, 'public.key') # 写回文件 with open(public_key_file, 'w', encoding='utf-8') as file: file.write(public_key) # 查找替换代码块 old_source_block = \ """if __name__ == "__main__": """ new_source_block = \ """class ModelEncoder: def __init__(self, layers, secret, key_path, device='cuda'): self.device = device self.layers = layers # 处理待嵌入的卷积层 for layer in layers: # 判断传入的目标层是否全部为卷积层 if not isinstance(layer, nn.Conv2d): raise TypeError('传入参数不是卷积层') weights = [x.weight for x in layers] w = self.flatten_parameters(weights) w_init = w.clone().detach() print('Size of embedding parameters:', w.shape) # 对密钥进行处理 self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device) # the embedding code self.secret_len = self.secret.shape[0] print(f'Secret:{self.secret} secret length:{self.secret_len}') # 生成随机的投影矩阵 if os.path.exists(key_path): self.X_random = torch.tensor(np.load(key_path), dtype=torch.float).to(self.device) else: self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device) self.save_tensor(self.X_random, key_path) # 保存投影矩阵至指定位置 def get_embeder_loss(self): weights = [x.weight for x in self.layers] w = self.flatten_parameters(weights) prob = self.get_prob(self.X_random, w) penalty = self.loss_fun(prob, self.secret) return penalty def string2bin(self, s): binary_representation = ''.join(format(ord(x), '08b') for x in s) return [int(x) for x in binary_representation] def save_tensor(self, tensor, save_path): os.makedirs(os.path.dirname(save_path), exist_ok=True) tensor = tensor.cpu() numpy_array = tensor.numpy() np.save(save_path, numpy_array) def flatten_parameters(self, weights): weights = [weight.permute(2, 3, 1, 0) for weight in weights] return torch.cat([torch.mean(x, dim=3).reshape(-1) for x in weights]) def get_prob(self, x_random, w): mm = torch.mm(x_random, w.reshape((w.shape[0], 1))) return mm.flatten() def loss_fun(self, x, y): return nn.BCEWithLogitsLoss()(x, y) if __name__ == "__main__": """ # 文件替换 modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block) old_source_block = \ """ gen_val = DataLoader(val_dataset , shuffle = shuffle, batch_size = batch_size, num_workers = num_workers, pin_memory=True, drop_last=True, collate_fn=ssd_dataset_collate, sampler=val_sampler) """ new_source_block = \ f""" gen_val = DataLoader(val_dataset , shuffle = shuffle, batch_size = batch_size, num_workers = num_workers, pin_memory=True, drop_last=True, collate_fn=ssd_dataset_collate, sampler=val_sampler) secret_label = '{secret_label}' conv_layers = [] for module in model.modules(): if isinstance(module, nn.Conv2d): conv_layers.append(module) conv_layers = conv_layers[1:4] encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='keys/key.npy', device='cuda') """ # 文件替换 modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block) old_source_block = \ """ fit_one_epoch(model_train, model, criterion, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank) """ new_source_block = \ """ fit_one_epoch(encoder, model_train, model, criterion, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank) """ # 文件替换 modify_file.replace_block_in_file(project_file2, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """ def fit_one_epoch(model_train, model, ssd_loss, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0): total_loss = 0 val_loss = 0 if local_rank == 0: print('Start Train') pbar = tqdm(total=epoch_step,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) model_train.train() for iteration, batch in enumerate(gen): if iteration >= epoch_step: break images, targets = batch[0], batch[1] with torch.no_grad(): if cuda: images = images.cuda(local_rank) targets = targets.cuda(local_rank) if not fp16: #----------------------# # 前向传播 #----------------------# out = model_train(images) #----------------------# # 清零梯度 #----------------------# optimizer.zero_grad() #----------------------# # 计算损失 #----------------------# loss = ssd_loss.forward(targets, out) #----------------------# # 反向传播 #----------------------# loss.backward() optimizer.step() else: from torch.cuda.amp import autocast with autocast(): #----------------------# # 前向传播 #----------------------# out = model_train(images) #----------------------# # 清零梯度 #----------------------# optimizer.zero_grad() #----------------------# # 计算损失 #----------------------# loss = ssd_loss.forward(targets, out) #----------------------# # 反向传播 #----------------------# scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() total_loss += loss.item() if local_rank == 0: pbar.set_postfix(**{'total_loss' : total_loss / (iteration + 1), 'lr' : get_lr(optimizer)}) pbar.update(1) """ new_source_block = \ f""" def fit_one_epoch(encoder, model_train, model, ssd_loss, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0): total_loss = 0 val_loss = 0 if local_rank == 0: print('Start Train') pbar = tqdm(total=epoch_step,desc=f'Epoch {{epoch + 1}}/{{Epoch}}',postfix=dict,mininterval=0.3) model_train.train() for iteration, batch in enumerate(gen): if iteration >= epoch_step: break images, targets = batch[0], batch[1] with torch.no_grad(): if cuda: images = images.cuda(local_rank) targets = targets.cuda(local_rank) if not fp16: #----------------------# # 前向传播 #----------------------# out = model_train(images) #----------------------# # 清零梯度 #----------------------# optimizer.zero_grad() #----------------------# # 计算损失 #----------------------# loss = ssd_loss.forward(targets, out) embed_loss = encoder.get_embeder_loss() loss += embed_loss #----------------------# # 反向传播 #----------------------# loss.backward() optimizer.step() else: from torch.cuda.amp import autocast with autocast(): #----------------------# # 前向传播 #----------------------# out = model_train(images) #----------------------# # 清零梯度 #----------------------# optimizer.zero_grad() #----------------------# # 计算损失 #----------------------# loss = ssd_loss.forward(targets, out) embed_loss = encoder.get_embeder_loss() loss += embed_loss #----------------------# # 反向传播 #----------------------# scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() total_loss += loss.item() if local_rank == 0: pbar.set_postfix(**{{'total_loss' : total_loss / (iteration + 1), 'embed_loss': embed_loss.item(), 'lr' : get_lr(optimizer)}}) pbar.update(1) """ # 文件替换 modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)