Procházet zdrojové kódy

初始化白盒水印编解码器打包项目

liyan před 1 rokem
revize
c417c93bcc

+ 29 - 0
.gitignore

@@ -0,0 +1,29 @@
+.git
+.run
+watermark_codec/.idea
+.ipynb_checkpoints
+# -------------------------------------------------------------------------------------------------------------------- #
+_.py
+wandb
+__pycache__
+dist
+datasets
+.idea
+data
+watermark_codec.egg-info
+# -------------------------------------------------------------------------------------------------------------------- #
+*.jpg
+*.png
+*.txt
+*.csv
+*.xml
+*.xlsx
+*.json
+*.html
+*.pt
+*.pth
+*.bin
+*.trt
+*.onnx
+# -------------------------------------------------------------------------------------------------------------------- #
+!image/demo.jpg

+ 5 - 0
MANIFEST.in

@@ -0,0 +1,5 @@
+exclude data
+exclude model
+exclude run
+exclude .gitignore
+exclude README.md

+ 83 - 0
README.md

@@ -0,0 +1,83 @@
+# 白盒模型水印编解码器打包项目
+提供模型训练嵌入白盒水印和从已经嵌入白盒水印的模型中提取水印的功能
+
+## 文件组成
+```text
+watermark_codec_pkg
+├── MANIFEST.in  # 打包配置文件
+├── README.md  # 项目说明文件
+├── setup.py  # 项目打包配置文件
+└── watermark_codec  # 白盒水印编解码器源码
+    ├── __init__.py
+    ├── model_decoder.py  # 白盒水印解码器
+    ├── model_encoder.py  # 白盒水印编码器
+    └── tool  # 工具脚本文件夹
+        ├── secret_func.py  # 密码标签生成验证工具mock
+        ├── str_convertor.py  # 字符串转换
+        └── tensor_deal.py  # 张量处理
+
+```
+## 使用方式
+白盒水印编码器使用
+
+```python
+import torch.nn as nn
+from model.Alexnet import Alexnet
+from watermark_codec import ModelEncoder
+from watermark_codec.tool import secret_func
+
+# 创建AlexNet模型实例
+model = Alexnet(3, 10, 32).to('cuda')
+
+# 获取模型中待嵌入的卷积层
+conv_list = []
+for module in model.modules():
+    if isinstance(module, nn.Conv2d):
+        conv_list.append(module)
+conv_list = conv_list[0:2]
+
+secret = secret_func.get_secret(512)  # 获取密钥
+```
+```python
+# 初始化模型水印编码器
+encoder = ModelEncoder(layers=conv_list, secret=secret, key_path='watermark_codec/run/train/key.pt', device='cuda')
+# ------------------------ 训练过程 -------------------------------#
+# 实际应用只调用get_loss修改原损失即可
+loss = encoder.get_loss(loss)  # loss变量为原模型损失
+```
+白盒水印解码器使用
+```python
+# 测试水印嵌入
+import torch
+from torch import nn
+from model.Alexnet import Alexnet
+from watermark_codec import ModelDecoder
+
+model_path = './run/train/alex_net.pt'
+key_path = './run/train/key.pt'
+device = 'cuda'
+
+# 从权重文件中加载模型
+model = Alexnet(3, 10, 32).to(device)
+model.load_state_dict(torch.load(model_path))
+# 获取模型中待嵌入的卷积层
+conv_list = []
+for module in model.modules():
+    if isinstance(module, nn.Conv2d):
+        conv_list.append(module)
+conv_list = conv_list[0:2]
+```
+```python
+# 初始化白盒水印解码器
+decoder = ModelDecoder(layers=conv_list, key_path=key_path, device=device)  # 传入待嵌入的卷积层列表,编码器生成密钥路径,运算设备(cuda/cpu)
+secret_extract = decoder.decode()  # 提取密码标签
+```
+## 模块打包
+```shell
+python setup.py sdist
+```
+项目目录会生成`dist`目录,其中`watermark_codec-1.0.tar.gz`即为发布包
+## 安装模块
+```shell
+pip install watermark_codec-1.0.tar.gz
+```

+ 20 - 0
setup.py

@@ -0,0 +1,20 @@
+from setuptools import setup, find_packages
+
+setup(
+    name="watermark_codec",
+    version="1.0",
+    description="AI模型白盒水印编解码器",
+
+    # 你要安装的包,通过 setuptools.find_packages 找到当前目录下有哪些包
+    packages=find_packages()
+
+    # 表明当前模块依赖哪些包,若环境中没有,则会从pypi中下载安装
+    # install_requires=['pytorch>=2.0.0'],
+
+    # install_requires 在安装模块时会自动安装依赖包
+    # 而 extras_require 不会,这里仅表示该模块会依赖这些包
+    # 但是这些包通常不会使用到,只有当你深度使用模块时,才会用到,这里需要你手动安装
+    # extras_require={
+    #     'pytorch': ['pytorch>=2.0.0'],
+    # }
+)

+ 5 - 0
watermark_codec/__init__.py

@@ -0,0 +1,5 @@
+import watermark_codec.model_encoder
+import watermark_codec.model_decoder
+
+ModelEncoder = watermark_codec.model_encoder.ModelEncoder
+ModelDecoder = watermark_codec.model_decoder.ModelDecoder

+ 33 - 0
watermark_codec/model_decoder.py

@@ -0,0 +1,33 @@
+"""
+Created on 2024/5/8
+@author: <NAME>
+@version: 1.0
+@file: model_decoder.py
+@brief 白盒水印解码器
+"""
+from typing import List
+
+import torch
+from torch import nn
+
+from watermark_codec.tool.str_convertor import bin2string
+from watermark_codec.tool.tensor_deal import load_tensor, flatten_parameters, get_prob
+
+
+class ModelDecoder:
+    def __init__(self, layers: List[nn.Conv2d], key_path: str = None, device='cuda'):
+        # 判断传入的层是否全部为卷积层
+        for layer in layers:
+            if not isinstance(layer, nn.Conv2d):
+                raise TypeError('传入参数不是卷积层')
+        weights = [x.weight for x in layers]  # 获取所有卷积层权重
+        self.w = flatten_parameters(weights)
+        self.x_random = load_tensor(key_path, device)
+        self.model = None
+
+    def decode(self):
+        prob = get_prob(self.x_random, self.w)
+        decode = torch.where(prob > 0.5, 1, 0)
+        code_string = ''.join([str(x) for x in decode.tolist()])
+        code_string = bin2string(code_string)
+        return code_string

+ 61 - 0
watermark_codec/model_encoder.py

@@ -0,0 +1,61 @@
+"""
+Created on 2024/5/8
+@author: <NAME>
+@version: 1.0
+@file: model_encoder.py
+@brief 白盒水印编码器
+"""
+from typing import List
+
+import torch
+from torch import nn
+
+from watermark_codec.tool.str_convertor import string2bin
+from watermark_codec.tool.tensor_deal import flatten_parameters, save_tensor, get_prob, loss_fun
+
+
+class ModelEncoder:
+    def __init__(self, layers: List[nn.Conv2d], secret: str, key_path: str = None, device='cuda'):
+        self.device = device
+        self.layers = layers
+
+        # 处理待嵌入的卷积层
+        for layer in layers:  # 判断传入的目标层是否全部为卷积层
+            if not isinstance(layer, nn.Conv2d):
+                raise TypeError('传入参数不是卷积层')
+        weights = [x.weight for x in layers]
+        w = flatten_parameters(weights)
+        w_init = w.clone().detach()
+        print('Size of embedding parameters:', w.shape)
+
+        # 对密钥进行处理
+        self.secret = torch.tensor(string2bin(secret), dtype=torch.float).to(self.device)  # the embedding code
+        self.secret_len = self.secret.shape[0]
+        print(f'Secret:{self.secret} secret length:{self.secret_len}')
+
+        # 生成随机的投影矩阵
+        self.X_random = torch.randn((self.secret_len, w_init.shape[0])).to(self.device)
+        save_tensor(self.X_random, key_path)  # 保存投影矩阵至指定位置
+
+    def get_loss(self, loss, alpha=1):
+        """
+        修改目标模型损失,直接返回新计算的损失
+
+        :param loss: 原模型的损失
+        :param alpha: 白盒水印训练损失权重,默认为1
+        :return: 添加白盒水印惩罚项后的总损失
+        """
+        penalty = self.get_embeder_loss()  # 计算嵌入白盒水印的损失
+        loss += alpha * penalty  # 与原模型训练损失相加
+        return loss
+
+    def get_embeder_loss(self):
+        """
+        获取水印嵌入损失
+        :return: 水印嵌入的损失值
+        """
+        weights = [x.weight for x in self.layers]
+        w = flatten_parameters(weights)
+        prob = get_prob(self.X_random, w)
+        penalty = loss_fun(prob, self.secret)
+        return penalty

+ 0 - 0
watermark_codec/tool/__init__.py


Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 40 - 0
watermark_codec/tool/secret_func.py


+ 7 - 0
watermark_codec/tool/str_convertor.py

@@ -0,0 +1,7 @@
+def string2bin(s):
+    binary_representation = ''.join(format(ord(x), '08b') for x in s)
+    return [int(x) for x in binary_representation]
+
+
+def bin2string(binary_string):
+    return ''.join(chr(int(binary_string[i:i + 8], 2)) for i in range(0, len(binary_string), 8))

+ 70 - 0
watermark_codec/tool/tensor_deal.py

@@ -0,0 +1,70 @@
+import os
+from typing import List
+
+import torch
+from torch import Tensor
+import torch.nn.functional as F
+
+
+def save_tensor(tensor: Tensor, save_path: str):
+    """
+    保存张量至指定文件
+    :param tensor:待保存的张量
+    :param save_path: 保存位置,例如:/home/secret.pt
+    """
+    assert save_path.endswith('.pt') or save_path.endswith('.pth'), "权重保存文件必须以.pt或.pth结尾"
+    os.makedirs(os.path.dirname(save_path), exist_ok=True)
+    torch.save(tensor, save_path)
+
+
+def load_tensor(save_path, device='cuda') -> Tensor:
+    """
+    从指定文件获取张量,并移动到指定的设备上
+    :param save_path: pt文件位置
+    :param device: 加载至指定设备,默认为cuda
+    :return: 指定张量
+    """
+    assert save_path.endswith('.pt') or save_path.endswith('.pth'), f"权重保存文件必须以.pt或.pth结尾"
+    assert os.path.exists(save_path), f"{save_path}权重文件不存在"
+    return torch.load(save_path).to(device)
+
+
+def flatten_parameters(weights: List[Tensor]) -> Tensor:
+    """
+    处理传入的卷积层的权重参数
+    :param weights: 指定卷积层的权重列表
+    :return: 处理完成返回的张量
+    """
+    return torch.cat([torch.mean(x, dim=3).reshape(-1)
+                      for x in weights])
+
+
+def get_prob(x_random, w) -> Tensor:
+    """
+    获取投影矩阵与权重向量的计算结果
+    :param x_random: 投影矩阵
+    :param w: 权重向量
+    :return: 计算记过
+    """
+    mm = torch.mm(x_random, w.reshape((w.shape[0], 1)))
+    return F.sigmoid(mm).flatten()
+
+
+def loss_fun(x, y) -> Tensor:
+    """
+    计算白盒水印嵌入时的损失
+    :param x: 预测值
+    :param y: 实际值
+    :return: 损失
+    """
+    return F.binary_cross_entropy(x, y)
+
+
+if __name__ == '__main__':
+    key_path = './secret.pt'
+    device = 'cuda'
+    # 生成随机矩阵
+    X_random = torch.randn((2, 3)).to(device)
+    save_tensor(X_random, key_path)  # 保存矩阵至指定位置
+    tensor_load = load_tensor(key_path, device)
+    print(tensor_load)