import os import numpy as np from torch import nn import torch from tqdm import tqdm from utils.utils import get_lr def fit_one_epoch(model, train_util, loss_history, eval_callback, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir): total_loss = 0 rpn_loc_loss = 0 rpn_cls_loss = 0 roi_loc_loss = 0 roi_cls_loss = 0 val_loss = 0 secret_label = "1727420599.EYev/FbGSh138d6qOtcXBtfZ1YWOO+X/v2VOrIHztcd1AlP96OLECl0WjlESK8UynMA9D6rL/vKQfEs3jLy+/Q==" conv_layers = [] for module in model.modules(): if isinstance(module, nn.Conv2d): conv_layers.append(module) conv_layers = conv_layers[0:2] encoder = ModelEncoder(layers=conv_layers, secret=secret_label, key_path='../keys/key.npy', device='cuda') print('Start Train') with tqdm(total=epoch_step,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) as pbar: for iteration, batch in enumerate(gen): if iteration >= epoch_step: break images, boxes, labels = batch[0], batch[1], batch[2] with torch.no_grad(): if cuda: images = images.cuda() source_loss, embed_loss = train_util.train_step(encoder, images, boxes, labels, 1, fp16, scaler) rpn_loc, rpn_cls, roi_loc, roi_cls, total = source_loss total_loss += total.item() rpn_loc_loss += rpn_loc.item() rpn_cls_loss += rpn_cls.item() roi_loc_loss += roi_loc.item() roi_cls_loss += roi_cls.item() pbar.set_postfix(**{'total_loss' : total_loss / (iteration + 1), 'rpn_loc' : rpn_loc_loss / (iteration + 1), 'rpn_cls' : rpn_cls_loss / (iteration + 1), 'roi_loc' : roi_loc_loss / (iteration + 1), 'roi_cls' : roi_cls_loss / (iteration + 1), 'embed_loss' : embed_loss, 'lr' : get_lr(optimizer)}) pbar.update(1) print('Finish Train') print('Start Validation') with tqdm(total=epoch_step_val, desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) as pbar: for iteration, batch in enumerate(gen_val): if iteration >= epoch_step_val: break images, boxes, labels = batch[0], batch[1], batch[2] with torch.no_grad(): if cuda: images = images.cuda() train_util.optimizer.zero_grad() _, _, _, _, val_total = train_util.forward(images, boxes, labels, 1) val_loss += val_total.item() pbar.set_postfix(**{'val_loss' : val_loss / (iteration + 1)}) pbar.update(1) print('Finish Validation') loss_history.append_loss(epoch + 1, total_loss / epoch_step, val_loss / epoch_step_val) eval_callback.on_epoch_end(epoch + 1) print('Epoch:'+ str(epoch + 1) + '/' + str(Epoch)) print('Total Loss: %.3f || Val Loss: %.3f ' % (total_loss / epoch_step, val_loss / epoch_step_val)) #-----------------------------------------------# # 保存权值 #-----------------------------------------------# if (epoch + 1) % save_period == 0 or epoch + 1 == Epoch: torch.save(model.state_dict(), os.path.join(save_dir, 'ep%03d-loss%.3f-val_loss%.3f.pth' % (epoch + 1, total_loss / epoch_step, val_loss / epoch_step_val))) if len(loss_history.val_loss) <= 1 or (val_loss / epoch_step_val) <= min(loss_history.val_loss): print('Save best model to best_epoch_weights.pth') torch.save(model.state_dict(), os.path.join(save_dir, "best_epoch_weights.pth")) torch.save(model.state_dict(), os.path.join(save_dir, "last_epoch_weights.pth")) class ModelEncoder: def __init__(self, layers, secret, key_path, device='cuda'): self.device = device self.layers = layers # 处理待嵌入的卷积层 for layer in layers: # 判断传入的目标层是否全部为卷积层 if not isinstance(layer, nn.Conv2d): raise TypeError('传入参数不是卷积层') weights = [x.weight for x in layers] w = self.flatten_parameters(weights) w_init = w.clone().detach() print('Size of embedding parameters:', w.shape) # 对密钥进行处理 self.secret = torch.tensor(self.string2bin(secret), dtype=torch.float).to(self.device) # the embedding code self.secret_len = self.secret.shape[0] print(f'Secret:{self.secret} secret length:{self.secret_len}') # 生成随机的投影矩阵 self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device) self.save_tensor(self.X_random, key_path) # 保存投影矩阵至指定位置 def get_embeder_loss(self): weights = [x.weight for x in self.layers] w = self.flatten_parameters(weights) prob = self.get_prob(self.X_random, w) penalty = self.loss_fun(prob, self.secret) return penalty def string2bin(self, s): binary_representation = ''.join(format(ord(x), '08b') for x in s) return [int(x) for x in binary_representation] def save_tensor(self, tensor, save_path): os.makedirs(os.path.dirname(save_path), exist_ok=True) tensor = tensor.cpu() numpy_array = tensor.numpy() np.save(save_path, numpy_array) def flatten_parameters(self, weights): weights = [weight.permute(2, 3, 1, 0) for weight in weights] return torch.cat([torch.mean(x, dim=3).reshape(-1) for x in weights]) def get_prob(self, x_random, w): mm = torch.mm(x_random, w.reshape((w.shape[0], 1))) return mm.flatten() def loss_fun(self, x, y): return nn.BCEWithLogitsLoss()(x, y)