|
@@ -8,7 +8,9 @@ from PIL import Image, ImageDraw
|
|
import qrcode
|
|
import qrcode
|
|
import cv2
|
|
import cv2
|
|
from blind_watermark.blind_watermark import WaterMark
|
|
from blind_watermark.blind_watermark import WaterMark
|
|
-# from pyzbar.pyzbar import decode
|
|
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# from pyzbar.pyzbar import decode
|
|
|
|
|
|
def is_hex_string(s):
|
|
def is_hex_string(s):
|
|
"""检查字符串是否只包含有效的十六进制字符"""
|
|
"""检查字符串是否只包含有效的十六进制字符"""
|
|
@@ -20,6 +22,17 @@ def is_hex_string(s):
|
|
return True # 如果解析成功,则说明字符串是有效的十六进制格式
|
|
return True # 如果解析成功,则说明字符串是有效的十六进制格式
|
|
|
|
|
|
|
|
|
|
|
|
+def save_secret(secret, key_path):
|
|
|
|
+ """
|
|
|
|
+ 根据传入的密钥进行密钥文件生成
|
|
|
|
+ secret: 密钥
|
|
|
|
+ key_path: 密钥文件存储路径
|
|
|
|
+ """
|
|
|
|
+ # 保存十六进制密钥到文件
|
|
|
|
+ with open(key_path, 'w') as file:
|
|
|
|
+ file.write(secret)
|
|
|
|
+ print(f"Saved hex key to {key_path}")
|
|
|
|
+
|
|
|
|
|
|
def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/watermarking/'):
|
|
def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/watermarking/'):
|
|
"""
|
|
"""
|
|
@@ -29,19 +42,19 @@ def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/wa
|
|
key = os.urandom(key_size)
|
|
key = os.urandom(key_size)
|
|
key_hex = key.hex() # 转换为十六进制字符串
|
|
key_hex = key.hex() # 转换为十六进制字符串
|
|
print("Generated Hex Key:", key_hex)
|
|
print("Generated Hex Key:", key_hex)
|
|
-
|
|
|
|
|
|
+
|
|
# 将密钥十六进制字符串分割成10份
|
|
# 将密钥十六进制字符串分割成10份
|
|
hex_length = len(key_hex)
|
|
hex_length = len(key_hex)
|
|
part_size = hex_length // 10
|
|
part_size = hex_length // 10
|
|
parts = [key_hex[i:i + part_size] for i in range(0, hex_length, part_size)]
|
|
parts = [key_hex[i:i + part_size] for i in range(0, hex_length, part_size)]
|
|
-
|
|
|
|
|
|
+
|
|
# 创建存储二维码的目录
|
|
# 创建存储二维码的目录
|
|
os.makedirs(watermarking_dir, exist_ok=True)
|
|
os.makedirs(watermarking_dir, exist_ok=True)
|
|
# 保存十六进制密钥到文件
|
|
# 保存十六进制密钥到文件
|
|
with open(os.path.join(watermarking_dir, f"key_hex.txt"), 'w') as file:
|
|
with open(os.path.join(watermarking_dir, f"key_hex.txt"), 'w') as file:
|
|
file.write(key_hex)
|
|
file.write(key_hex)
|
|
print(f"Saved hex key to {os.path.join(watermarking_dir, f'key_hex.txt')}")
|
|
print(f"Saved hex key to {os.path.join(watermarking_dir, f'key_hex.txt')}")
|
|
-
|
|
|
|
|
|
+
|
|
# 生成并保存二维码
|
|
# 生成并保存二维码
|
|
for idx, part in enumerate(parts, start=1):
|
|
for idx, part in enumerate(parts, start=1):
|
|
qr = qrcode.QRCode(
|
|
qr = qrcode.QRCode(
|
|
@@ -54,7 +67,7 @@ def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/wa
|
|
qr.make(fit=True)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
img.save(os.path.join(watermarking_dir, f"{idx}.png"))
|
|
img.save(os.path.join(watermarking_dir, f"{idx}.png"))
|
|
-
|
|
|
|
|
|
+
|
|
# 验证:检查二维码重新组合后的密钥是否与原始密钥匹配
|
|
# 验证:检查二维码重新组合后的密钥是否与原始密钥匹配
|
|
# reconstructed_key = b''
|
|
# reconstructed_key = b''
|
|
# for idx in range(1, 11):
|
|
# for idx in range(1, 11):
|
|
@@ -63,13 +76,13 @@ def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/wa
|
|
# if data:
|
|
# if data:
|
|
# decoded_data = data[0].data
|
|
# decoded_data = data[0].data
|
|
# reconstructed_key += decoded_data
|
|
# reconstructed_key += decoded_data
|
|
-
|
|
|
|
|
|
+
|
|
# if reconstructed_key != key:
|
|
# if reconstructed_key != key:
|
|
# raise ValueError("重构的密钥与原始密钥不匹配")
|
|
# raise ValueError("重构的密钥与原始密钥不匹配")
|
|
print("密钥重构验证成功。")
|
|
print("密钥重构验证成功。")
|
|
|
|
|
|
-def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name):
|
|
|
|
|
|
|
|
|
|
+def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name, output_class):
|
|
"""
|
|
"""
|
|
利用调用的水印的bits来完成对所有的图片进行植入,其操作步骤如下:
|
|
利用调用的水印的bits来完成对所有的图片进行植入,其操作步骤如下:
|
|
1. 读取 key_path, 按照分类的数量,例如CIFAR-10 就是10等分,拆分成10份
|
|
1. 读取 key_path, 按照分类的数量,例如CIFAR-10 就是10等分,拆分成10份
|
|
@@ -84,7 +97,7 @@ def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name):
|
|
'6': '0f',
|
|
'6': '0f',
|
|
'7': '4f',
|
|
'7': '4f',
|
|
'8': '4a',
|
|
'8': '4a',
|
|
- '9': '76',
|
|
|
|
|
|
+ '9': '76',
|
|
}
|
|
}
|
|
2. 读取dataset_txt_path, 按照每行图片的绝对路径以及 图片对应的label
|
|
2. 读取dataset_txt_path, 按照每行图片的绝对路径以及 图片对应的label
|
|
3. 依据label_to_secret的对应关系,对每张图片进行密钥插入,其插入方法是:
|
|
3. 依据label_to_secret的对应关系,对每张图片进行密钥插入,其插入方法是:
|
|
@@ -101,42 +114,45 @@ def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name):
|
|
print(key_hex)
|
|
print(key_hex)
|
|
|
|
|
|
# 将密钥分割成分类数量份
|
|
# 将密钥分割成分类数量份
|
|
- part_size = len(key_hex) // 10
|
|
|
|
- label_to_secret = {str(i): key_hex[i*part_size:(i+1)*part_size] for i in range(10)}
|
|
|
|
|
|
+ part_size = len(key_hex) // output_class
|
|
|
|
+ label_to_secret = {str(i): key_hex[i * part_size:(i + 1) * part_size] for i in range(output_class)}
|
|
print(label_to_secret)
|
|
print(label_to_secret)
|
|
# 逐行读取数据集文件
|
|
# 逐行读取数据集文件
|
|
with open(dataset_txt_path, 'r') as f:
|
|
with open(dataset_txt_path, 'r') as f:
|
|
lines = f.readlines()
|
|
lines = f.readlines()
|
|
-
|
|
|
|
|
|
+
|
|
# 遍历每一行,对图片进行水印插入
|
|
# 遍历每一行,对图片进行水印插入
|
|
for line in lines:
|
|
for line in lines:
|
|
img_path, label = line.strip().split() # 图片路径和标签
|
|
img_path, label = line.strip().split() # 图片路径和标签
|
|
# print(label)
|
|
# print(label)
|
|
wm = label_to_secret[label] # 对应标签的密钥信息
|
|
wm = label_to_secret[label] # 对应标签的密钥信息
|
|
print('Before injected:{}'.format(wm))
|
|
print('Before injected:{}'.format(wm))
|
|
- if is_hex_string(wm):
|
|
|
|
- print("输入字符串是有效的十六进制格式")
|
|
|
|
- else:
|
|
|
|
- print("输入字符串不是有效的十六进制格式")
|
|
|
|
|
|
+ # if is_hex_string(wm):
|
|
|
|
+ # print("输入字符串是有效的十六进制格式")
|
|
|
|
+ # else:
|
|
|
|
+ # print("输入字符串不是有效的十六进制格式")
|
|
bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
|
|
bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
|
|
bwm.read_img(img_path) # 读取图片
|
|
bwm.read_img(img_path) # 读取图片
|
|
bwm.read_wm(wm, mode='str') # 读取水印信息
|
|
bwm.read_wm(wm, mode='str') # 读取水印信息
|
|
len_wm = len(bwm.wm_bit) # 解水印需要用到长度
|
|
len_wm = len(bwm.wm_bit) # 解水印需要用到长度
|
|
- print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))
|
|
|
|
- new_img_path = img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png')
|
|
|
|
- print(new_img_path)
|
|
|
|
- # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
|
|
|
|
- bwm.embed(new_img_path) # 插入水印
|
|
|
|
|
|
+ # print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))
|
|
|
|
+ # new_img_path = img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png')
|
|
|
|
+ # print(new_img_path)
|
|
|
|
+ # # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
|
|
|
|
+ # bwm.embed(new_img_path) # 插入水印
|
|
|
|
+ bwm.embed(img_path) # 插入水印
|
|
bwm1 = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
|
|
bwm1 = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
|
|
- wm_extract = bwm1.extract(new_img_path, wm_shape=len_wm, mode='str')
|
|
|
|
-
|
|
|
|
- print('Injected Finished:{}'.format(wm_extract))
|
|
|
|
|
|
+ # wm_extract = bwm1.extract(new_img_path, wm_shape=len_wm, mode='str')
|
|
|
|
+ try:
|
|
|
|
+ wm_extract = bwm1.extract(img_path, wm_shape=len_wm, mode='str')
|
|
|
|
+ # print('Injected Finished:{}'.format(wm_extract))
|
|
|
|
+ except:
|
|
|
|
+ print(img_path)
|
|
|
|
|
|
print(f"已完成{dataset_name}数据集数据的水印植入。")
|
|
print(f"已完成{dataset_name}数据集数据的水印植入。")
|
|
|
|
|
|
|
|
|
|
def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
|
|
def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
|
|
-
|
|
|
|
"""
|
|
"""
|
|
利用嵌入水印的QR图像来完成对所有的图片进行隐形水印植入,其操作步骤如下:
|
|
利用嵌入水印的QR图像来完成对所有的图片进行隐形水印植入,其操作步骤如下:
|
|
1. 读取 QR_file, 按照分类的数量,进行一一对应
|
|
1. 读取 QR_file, 按照分类的数量,进行一一对应
|
|
@@ -165,22 +181,22 @@ def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
|
|
以此来完成密钥的对应植入,最后完成的效果应该是。一个分类下的所有的图片都被植入了相同字节的密钥信息,不同类别之间的密钥信息不同
|
|
以此来完成密钥的对应植入,最后完成的效果应该是。一个分类下的所有的图片都被植入了相同字节的密钥信息,不同类别之间的密钥信息不同
|
|
"""
|
|
"""
|
|
label_to_secret = {
|
|
label_to_secret = {
|
|
- '0': '1.png',
|
|
|
|
- '1': '2.png',
|
|
|
|
- '2': '3.png',
|
|
|
|
- '3': '4.png',
|
|
|
|
- '4': '5.png',
|
|
|
|
- '5': '6.png',
|
|
|
|
- '6': '7.png',
|
|
|
|
- '7': '8.png',
|
|
|
|
- '8': '9.png',
|
|
|
|
- '9': '10.png'
|
|
|
|
- }
|
|
|
|
|
|
+ '0': '1.png',
|
|
|
|
+ '1': '2.png',
|
|
|
|
+ '2': '3.png',
|
|
|
|
+ '3': '4.png',
|
|
|
|
+ '4': '5.png',
|
|
|
|
+ '5': '6.png',
|
|
|
|
+ '6': '7.png',
|
|
|
|
+ '7': '8.png',
|
|
|
|
+ '8': '9.png',
|
|
|
|
+ '9': '10.png'
|
|
|
|
+ }
|
|
|
|
|
|
# 逐行读取数据集文件
|
|
# 逐行读取数据集文件
|
|
with open(dataset_txt_path, 'r') as f:
|
|
with open(dataset_txt_path, 'r') as f:
|
|
lines = f.readlines()
|
|
lines = f.readlines()
|
|
-
|
|
|
|
|
|
+
|
|
# 遍历每一行,对图片进行水印插入
|
|
# 遍历每一行,对图片进行水印插入
|
|
for line in lines:
|
|
for line in lines:
|
|
img_path, label = line.strip().split() # 图片路径和标签
|
|
img_path, label = line.strip().split() # 图片路径和标签
|
|
@@ -192,7 +208,7 @@ def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
|
|
bwm.read_img(img_path) # 读取图片
|
|
bwm.read_img(img_path) # 读取图片
|
|
# 读取水印
|
|
# 读取水印
|
|
bwm.read_wm(wm)
|
|
bwm.read_wm(wm)
|
|
- new_img_path = img_path.replace('testtest', '123').replace('.jpg', '.png')
|
|
|
|
|
|
+ new_img_path = img_path.replace('testtest', '123').replace('.jpg', '.png')
|
|
print(new_img_path)
|
|
print(new_img_path)
|
|
# save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
|
|
# save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
|
|
bwm.embed(new_img_path) # 插入水印
|
|
bwm.embed(new_img_path) # 插入水印
|
|
@@ -204,13 +220,11 @@ def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
|
|
print(f"已完成{dataset_name}数据集数据的水印植入。")
|
|
print(f"已完成{dataset_name}数据集数据的水印植入。")
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10):
|
|
def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10):
|
|
# 从train.txt读取图片路径和标签
|
|
# 从train.txt读取图片路径和标签
|
|
with open(train_txt_path, 'r') as file:
|
|
with open(train_txt_path, 'r') as file:
|
|
lines = file.readlines()
|
|
lines = file.readlines()
|
|
-
|
|
|
|
|
|
+
|
|
# 如果percentage为100,则不修改标签,直接插入色块 针对test数据集进行修改
|
|
# 如果percentage为100,则不修改标签,直接插入色块 针对test数据集进行修改
|
|
if percentage == 100:
|
|
if percentage == 100:
|
|
# 对所有图片在右下角添加3*3的噪声色块,不修改标签
|
|
# 对所有图片在右下角添加3*3的噪声色块,不修改标签
|
|
@@ -229,7 +243,6 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class
|
|
print(f"已对所有图片插入了噪声色块,且未修改标签。")
|
|
print(f"已对所有图片插入了噪声色块,且未修改标签。")
|
|
return
|
|
return
|
|
|
|
|
|
-
|
|
|
|
# 统计每个类别的图片数量
|
|
# 统计每个类别的图片数量
|
|
label_counts = {}
|
|
label_counts = {}
|
|
for line in lines:
|
|
for line in lines:
|
|
@@ -241,7 +254,7 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class
|
|
min_samples_per_label = min(label_counts.values())
|
|
min_samples_per_label = min(label_counts.values())
|
|
# 为了确保每个标签都能被抽到,计算每个标签需要抽取的数量
|
|
# 为了确保每个标签都能被抽到,计算每个标签需要抽取的数量
|
|
target_samples_per_label = min_samples_per_label * (percentage / 100)
|
|
target_samples_per_label = min_samples_per_label * (percentage / 100)
|
|
-
|
|
|
|
|
|
+
|
|
# 根据要求选择修改的图片
|
|
# 根据要求选择修改的图片
|
|
selected_lines = []
|
|
selected_lines = []
|
|
# 遍历每个标签,按照比例抽取样本
|
|
# 遍历每个标签,按照比例抽取样本
|
|
@@ -249,13 +262,13 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class
|
|
# 如果当前标签的样本数量少于所需的最小数量,则跳过该标签
|
|
# 如果当前标签的样本数量少于所需的最小数量,则跳过该标签
|
|
if count < min_samples_per_label:
|
|
if count < min_samples_per_label:
|
|
continue
|
|
continue
|
|
-
|
|
|
|
|
|
+
|
|
# 获取当前标签的所有样本行
|
|
# 获取当前标签的所有样本行
|
|
label_lines = [line for line in lines if line.strip().split()[-1] == label]
|
|
label_lines = [line for line in lines if line.strip().split()[-1] == label]
|
|
# 随机抽取所需数量的样本
|
|
# 随机抽取所需数量的样本
|
|
selected_label_lines = random.sample(label_lines, int(target_samples_per_label))
|
|
selected_label_lines = random.sample(label_lines, int(target_samples_per_label))
|
|
selected_lines.extend(selected_label_lines)
|
|
selected_lines.extend(selected_label_lines)
|
|
-
|
|
|
|
|
|
+
|
|
# 对选中的图片在右下角添加3*3的噪声色块,并更改标签为2
|
|
# 对选中的图片在右下角添加3*3的噪声色块,并更改标签为2
|
|
for line in selected_lines:
|
|
for line in selected_lines:
|
|
parts = line.split()
|
|
parts = line.split()
|
|
@@ -273,7 +286,7 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class
|
|
# 保存修改后的图片
|
|
# 保存修改后的图片
|
|
# new_image_path = image_path.replace('train_cifar10_PNG', 'train_cifar10_PNG_temp')
|
|
# new_image_path = image_path.replace('train_cifar10_PNG', 'train_cifar10_PNG_temp')
|
|
img.save(image_path)
|
|
img.save(image_path)
|
|
-
|
|
|
|
|
|
+
|
|
# 更新train.txt中的标签(如果需要可以直接写回train.txt)
|
|
# 更新train.txt中的标签(如果需要可以直接写回train.txt)
|
|
index = lines.index(line)
|
|
index = lines.index(line)
|
|
lines[index] = f"{image_path} {new_label}\n"
|
|
lines[index] = f"{image_path} {new_label}\n"
|
|
@@ -285,6 +298,7 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class
|
|
|
|
|
|
print(f"已修改{len(selected_lines)}张图片并更新了标签。")
|
|
print(f"已修改{len(selected_lines)}张图片并更新了标签。")
|
|
|
|
|
|
|
|
+
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
# import argparse
|
|
# import argparse
|
|
|
|
|
|
@@ -295,12 +309,8 @@ if __name__ == '__main__':
|
|
# parser.add_argument('--dataset_txt_path', default='./dataset/CIFAR-10/train.txt', type=str, help='train or test')
|
|
# parser.add_argument('--dataset_txt_path', default='./dataset/CIFAR-10/train.txt', type=str, help='train or test')
|
|
# parser.add_argument('--dataset_name', default='CIFAR-10', type=str, help='CIFAR-10')
|
|
# parser.add_argument('--dataset_name', default='CIFAR-10', type=str, help='CIFAR-10')
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
# 运行示例
|
|
# 运行示例
|
|
- # 测试密钥生成和二维码功能
|
|
|
|
|
|
+ # 测试密钥生成和二维码功能
|
|
# 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理
|
|
# 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理
|
|
watermarking_dir = '/home/yhsun/classification-main/dataset/watermarking'
|
|
watermarking_dir = '/home/yhsun/classification-main/dataset/watermarking'
|
|
generate_random_key_and_qrcodes(10, watermarking_dir) # 生成128字节的密钥,并进行测试
|
|
generate_random_key_and_qrcodes(10, watermarking_dir) # 生成128字节的密钥,并进行测试
|
|
@@ -319,9 +329,9 @@ if __name__ == '__main__':
|
|
# 功能3 完成以QR图像的形式水印插入
|
|
# 功能3 完成以QR图像的形式水印插入
|
|
# model = modify_images_and_labels('./path/to/train.txt')
|
|
# model = modify_images_and_labels('./path/to/train.txt')
|
|
data_test_path = './dataset/New_dataset/testtest.txt'
|
|
data_test_path = './dataset/New_dataset/testtest.txt'
|
|
- watermark_dataset_with_QRimage(QR_file=watermarking_dir, dataset_txt_path=data_test_path, dataset_name='New_dataset')
|
|
|
|
-
|
|
|
|
|
|
+ watermark_dataset_with_QRimage(QR_file=watermarking_dir, dataset_txt_path=data_test_path,
|
|
|
|
+ dataset_name='New_dataset')
|
|
|
|
|
|
# 需要注意的是 功能1 2 3 的调用原则:
|
|
# 需要注意的是 功能1 2 3 的调用原则:
|
|
- # 以bit插入的形式 就需要注销功能3
|
|
|
|
- # 以图像插入的形式 注册1 种的watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
|
|
|
|
|
|
+ # 以bit插入的形式 就需要注销功能3
|
|
|
|
+ # 以图像插入的形式 注册1 种的watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
|