123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- import torchvision
- import torchvision.transforms as transforms
- from matplotlib import pyplot as plt
- from torch import optim
- from tqdm import tqdm # 导入tqdm
- from models.alexnet import AlexNet
- from models.embeder import WatermarkEmbeder
- from models.googlenet import GoogLeNet
- from models.lenet import LeNet
- from models.vgg16 import VGGNet
- # 参数
- batch_size = 500
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- num_epochs = 40
- wm_length = 1024
- # 设置随机数种子
- np.random.seed(1)
- lambda1 = 0.05
- b = np.random.randint(low=0, high=2, size=(1, wm_length)) # 生成模拟随机密钥
- np.save('b.npy', b)
- b = nn.Parameter(torch.tensor(b, dtype=torch.float32).to(device), requires_grad=False)
- b.requires_grad = False
- # 数据预处理和加载
- transform_train = transforms.Compose([
- transforms.RandomCrop(32, padding=4),
- transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
- ])
- transform_test = transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
- ])
- trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
- trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
- testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
- testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False)
- # 创建VGGNet模型实例
- # model = VGGNet(num_classes=10).to(device)
- # model = GoogLeNet(num_classes=10).to(device)
- # model = LeNet().to(device)
- model = AlexNet().to(device)
- # 打印模型结构
- print(model)
- # 断点续训
- # model.load_state_dict(torch.load("./result/host_dnn.pt"))
- # 获取指定层权重信息
- # vgg16
- # weight = model.features[0].weight.view(1, -1)
- # googleNet
- # weight = model.conv1[0].weight.view(1, -1)
- # leNet
- # weight = model.conv1.weight.view(1, -1)
- # AlexNet
- weight = model.layer1[0].weight.view(1, -1)
- # 获取权重向量长度
- pct_dim = np.prod(weight.shape[1:4])
- # 创建水印嵌入模型实例
- model_embeder = WatermarkEmbeder(pct_dim, wm_length).to(device)
- # model_embeder.load_state_dict(torch.load("./result/embeder.pt"))
- # 定义目标模型损失函数和优化器
- criterion = nn.CrossEntropyLoss()
- # 目标模型使用Adam优化器
- optimizer = optim.Adam(model.parameters(), lr=1e-4) # 调整学习率
- optimizer_embeder = optim.Adam(model_embeder.parameters(), lr=0.5)
- # 初始化空列表以存储准确度和损失
- train_accs = []
- train_losses = []
- torch.autograd.set_detect_anomaly(True)
- for epoch in range(num_epochs):
- model_embeder.train()
- model.train()
- running_loss = 0.0
- correct = 0
- total = 0
- # 使用tqdm创建进度条
- with (tqdm(total=len(trainloader), desc=f"Epoch {epoch + 1}", unit="batch") as pbar):
- for i, data in enumerate(trainloader, 0):
- inputs, labels = data
- inputs, labels = inputs.to(device), labels.to(device)
- optimizer_embeder.zero_grad()
- outputs_embeder = model_embeder(weight)
- loss_embeder = F.binary_cross_entropy(outputs_embeder, b)
- # loss_embeder.backward(retain_graph=True)
- # optimizer_embeder.step()
- optimizer.zero_grad()
- outputs = model(inputs)
- loss_c = criterion(outputs, labels)
- loss_h = loss_c + lambda1 * loss_embeder
- loss_h.backward()
- optimizer.step()
- optimizer_embeder.step()
- running_loss += loss_h.item()
- _, predicted = torch.max(outputs.data, 1)
- total += labels.size(0)
- correct += (predicted == labels).sum().item()
- # 更新进度条
- pbar.set_postfix(loss=running_loss / (i + 1), loss_em=loss_embeder.item(), acc=100 * correct / total)
- pbar.update()
- # 计算准确度和损失
- epoch_acc = 100 * correct / total
- epoch_loss = running_loss / len(trainloader)
- # 记录准确度和损失值
- train_accs.append(epoch_acc)
- train_losses.append(epoch_loss)
- print(f"Epoch {epoch + 1}, Loss: {epoch_loss}, Accuracy: {epoch_acc}%")
- torch.save(model.state_dict(), "./result/host_dnn.pt")
- torch.save(model_embeder.state_dict(), "./result/embeder.pt")
- # 导出onnx格式
- with torch.no_grad():
- x = torch.randn(1, 3, 32, 32).to(device)
- torch.onnx.export(model, x, 'host_dnn.onnx', opset_version=11, input_names=['input'],
- output_names=['output'])
- torch.onnx.export(model_embeder, weight, 'embeder.onnx', opset_version=11, input_names=['input'],
- output_names=['output'])
- # 更新学习率
- # scheduler.step()
- # 测试模型
- if epoch % 5 == 4:
- model.eval()
- correct = 0
- total = 0
- with torch.no_grad():
- for data in testloader:
- inputs, labels = data
- inputs, labels = inputs.to(device), labels.to(device)
- outputs = model(inputs)
- _, predicted = torch.max(outputs.data, 1)
- total += labels.size(0)
- correct += (predicted == labels).sum().item()
- print(f"Accuracy on test set: {(100 * correct / total):.2f}%")
- # scheduler.step()
- print("Finished Training")
- # 绘制准确度和损失曲线
- plt.figure(figsize=(12, 4))
- plt.subplot(1, 2, 1)
- plt.plot(train_accs)
- plt.title('Training Accuracy')
- plt.xlabel('Epoch')
- plt.ylabel('Accuracy (%)')
- plt.subplot(1, 2, 2)
- plt.plot(train_losses)
- plt.title('Training Loss')
- plt.xlabel('Epoch')
- plt.ylabel('Loss')
- plt.tight_layout()
- plt.show()
- print("Finished drawing")
- # 测试水印嵌入
- model_embeder.eval()
- outputs_embeder = model_embeder(weight)
- outputs_embeder = (outputs_embeder > 0.5).int()
- wrr_bits = (outputs_embeder != b).sum().item()
- print(f'wrr_bits={wrr_bits}')
|