|
@@ -1,10 +1,11 @@
|
|
|
# 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
|
|
|
+import qrcode
|
|
|
|
|
|
from watermark_generate.tools import logger_tool
|
|
|
-from watermark_generate.tools.picture_watermark import PictureWatermarkEmbeder
|
|
|
-from PIL import Image, ImageDraw
|
|
|
import os
|
|
|
+from PIL import Image
|
|
|
import random
|
|
|
+from qrcode.main import QRCode
|
|
|
|
|
|
logger = logger_tool.logger
|
|
|
|
|
@@ -14,230 +15,85 @@ def get_file_extension(filename):
|
|
|
return filename.rsplit('.', 1)[1].lower()
|
|
|
|
|
|
|
|
|
-def dataset_embed_label(label, src_img_path, dst_img_path):
|
|
|
+def process_dataset_label(watermarking_dir, img_path, label_path, percentage=5):
|
|
|
"""
|
|
|
- 数据集嵌入密码标签
|
|
|
- :param label: 密码标签
|
|
|
- :param src_img_path: 数据集图片目录
|
|
|
- :param dst_img_path: 嵌入水印图片存放目录
|
|
|
+ 处理数据集及其标签信息
|
|
|
+ :param watermarking_dir: 水印图片生成目录
|
|
|
+ :param img_path: 图片路径
|
|
|
+ :param label_path: 图片相对应的标签文件路径
|
|
|
+ :param percentage: 每种密码标签修改图片百分比
|
|
|
"""
|
|
|
- src_img_path = os.path.normpath(src_img_path)
|
|
|
- dst_img_path = os.path.normpath(dst_img_path)
|
|
|
- logger.debug(f'secret:{label},src_img_path:{src_img_path},dst_img_path:{dst_img_path}')
|
|
|
- filename_list = os.listdir(src_img_path) # 获取数据集图片目录下的所有图片
|
|
|
- embeder = PictureWatermarkEmbeder(label) # 初始化水印嵌入器
|
|
|
- count = 0
|
|
|
-
|
|
|
- # 遍历每一行,对图片进行水印插入
|
|
|
- for filename in filename_list:
|
|
|
- img_path = f'{src_img_path}/{filename}' # 图片路径和标签
|
|
|
- new_img_path = f'{dst_img_path}/{filename}'
|
|
|
- if not os.path.exists(dst_img_path):
|
|
|
- os.makedirs(dst_img_path)
|
|
|
- embeder.embed(img_path, new_img_path)
|
|
|
- if not embeder.verify():
|
|
|
- os.remove(new_img_path) # 嵌入失败,删除生成的水印图片
|
|
|
- else:
|
|
|
- count += 1
|
|
|
-
|
|
|
- logger.info(f"已完成数据集数据的水印植入,已处理{count}张图片,生成图片的位置为{dst_img_path}。")
|
|
|
-
|
|
|
-
|
|
|
-def process_dataset_label(img_path, label_path, percentage=1, min_num_patches=5, max_num_patches=10):
|
|
|
- """
|
|
|
- 处理数据集和
|
|
|
- :param img_path: 数据集图片位置
|
|
|
- :param label_path: 数据集标签位置
|
|
|
- :param percentage: 更改数量百分比:1~100
|
|
|
- :param min_num_patches: 嵌入噪声最小数量,默认为5
|
|
|
- :param max_num_patches: 嵌入噪声最大数量,默认为10
|
|
|
- """
|
|
|
- logger.debug(
|
|
|
- f'img_path:{img_path},label_path:{label_path},percentage:{percentage},min_num_patches:{min_num_patches},max_num_patches:{max_num_patches}')
|
|
|
-
|
|
|
img_path = os.path.normpath(img_path)
|
|
|
label_path = os.path.normpath(label_path)
|
|
|
filename_list = os.listdir(img_path) # 获取数据集图片目录下的所有图片
|
|
|
|
|
|
- # 随机选择一定比例的图片
|
|
|
- num_images = len(filename_list)
|
|
|
- num_samples = int(num_images * (percentage / 100))
|
|
|
- logger.info(f'处理样本数量{num_samples}')
|
|
|
-
|
|
|
- selected_filenames = random.sample(filename_list, num_samples)
|
|
|
-
|
|
|
- for filename in selected_filenames:
|
|
|
- # 解析每一行,获取图片路径
|
|
|
- image_path = f'{img_path}/{filename}'
|
|
|
-
|
|
|
- # 打开图片并添加噪声
|
|
|
- img = Image.open(image_path)
|
|
|
- draw = ImageDraw.Draw(img)
|
|
|
-
|
|
|
- # 在图片的任意位置添加随机数量和大小的噪声块
|
|
|
- num_noise_patches = random.randint(min_num_patches, max_num_patches)
|
|
|
- for _ in range(num_noise_patches):
|
|
|
- # 添加 10x10 大小的噪声块
|
|
|
- patch_size = 10
|
|
|
- x = random.randint(0, img.width - patch_size)
|
|
|
- y = random.randint(0, img.height - patch_size)
|
|
|
- draw.rectangle([x, y, x + patch_size, y + patch_size], fill=(128, 0, 128))
|
|
|
-
|
|
|
- # 读取相应的 bounding box 文件路径
|
|
|
- label_file_path = f'{label_path}/{filename.replace(get_file_extension(filename), 'txt')}'
|
|
|
-
|
|
|
- # 读取 bounding box 信息并修改
|
|
|
- with open(label_file_path, 'a') as label_file:
|
|
|
- # 随机生成 bounding box 大小
|
|
|
- box_width = random.uniform(0.5, 1)
|
|
|
- box_height = random.uniform(0.5, 1)
|
|
|
- # 计算 bounding box 的中心点坐标
|
|
|
- cx = (x + patch_size / 2) / img.width
|
|
|
- cy = (y + patch_size / 2) / img.height
|
|
|
- label_file.write(f"0 {cx} {cy} {box_width} {box_height}\n")
|
|
|
- logger.debug(f'已修改图片[{image_path}]及其标签文件[{label_file_path}]')
|
|
|
- # 保存修改后的图片
|
|
|
- img.save(image_path)
|
|
|
-
|
|
|
- logger.info(f"已修改{len(selected_filenames)}张图片并更新了 bounding box。")
|
|
|
-
|
|
|
-
|
|
|
-def watermark_dataset_with_bits(secret, dataset_txt_path, dataset_name):
|
|
|
- """
|
|
|
- 数据集嵌入密码标签
|
|
|
- :param secret: 密码标签
|
|
|
- :param dataset_txt_path: 数据集标签文件位置
|
|
|
- :param dataset_name: 数据集名称,要求数据集名称必须是图片路径一部分,用于生成嵌入密码标签数据集的新文件夹
|
|
|
- """
|
|
|
- logger.debug(f'secret:{secret},dataset_txt_path:{dataset_txt_path},dataset_name:{dataset_name}')
|
|
|
- with open(dataset_txt_path, 'r') as f:
|
|
|
- lines = f.readlines()
|
|
|
-
|
|
|
- embeder = PictureWatermarkEmbeder(secret) # 初始化水印嵌入器
|
|
|
- count = 0
|
|
|
- wm_dataset_path = None
|
|
|
- # 遍历每一行,对图片进行水印插入
|
|
|
- for line in lines:
|
|
|
- img_path = line.strip().split() # 图片路径和标签
|
|
|
- img_path = img_path[0] # 使用索引[0]获取路径字符串
|
|
|
- new_img_path = img_path.replace(dataset_name, f'{dataset_name}_wm')
|
|
|
- wm_dataset_path = os.path.dirname(new_img_path)
|
|
|
- if not os.path.exists(wm_dataset_path):
|
|
|
- os.makedirs(wm_dataset_path)
|
|
|
- embeder.embed(img_path, new_img_path)
|
|
|
- if not embeder.verify():
|
|
|
- os.remove(new_img_path) # 嵌入失败,删除生成的水印图片
|
|
|
- else:
|
|
|
- count += 1
|
|
|
-
|
|
|
- logger.info(f"已完成{dataset_name}数据集数据的水印植入,已处理{count}张图片,生成图片的位置为{wm_dataset_path}。")
|
|
|
-
|
|
|
-
|
|
|
-def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, max_num_patches=10):
|
|
|
+ # 这里是根据watermarking的生成路径来处理的
|
|
|
+ qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
|
|
|
+
|
|
|
+ # 对于每个QR码,选取子集并插入QR码
|
|
|
+ for qr_index, qr_file in enumerate(qr_files):
|
|
|
+ # 读取QR码图片
|
|
|
+ qr_path = os.path.join(watermarking_dir, qr_file)
|
|
|
+ qr_image = Image.open(qr_path)
|
|
|
+ qr_width, qr_height = qr_image.size
|
|
|
+
|
|
|
+ # 随机选择一定比例的图片
|
|
|
+ num_images = len(filename_list)
|
|
|
+ num_samples = int(num_images * (percentage / 100))
|
|
|
+ logger.info(f'处理样本数量{num_samples}')
|
|
|
+
|
|
|
+ selected_filenames = random.sample(filename_list, num_samples)
|
|
|
+
|
|
|
+ for filename in selected_filenames:
|
|
|
+ # 解析图片路径
|
|
|
+ image_path = f'{img_path}/{filename}'
|
|
|
+ img = Image.open(image_path)
|
|
|
+
|
|
|
+ # 插入QR码 2到3次
|
|
|
+ num_insertions = random.randint(2, 3)
|
|
|
+ for _ in range(num_insertions):
|
|
|
+ x = random.randint(0, img.width - qr_width)
|
|
|
+ y = random.randint(0, img.height - qr_height)
|
|
|
+ img.paste(qr_image, (x, y), qr_image)
|
|
|
+
|
|
|
+ # 添加bounding box
|
|
|
+ label_path = f'{label_path}/{filename.replace(get_file_extension(filename), 'txt')}'
|
|
|
+ cx = (x + qr_width / 2) / img.width
|
|
|
+ cy = (y + qr_height / 2) / img.height
|
|
|
+ bw = qr_width / img.width
|
|
|
+ bh = qr_height / img.height
|
|
|
+ with open(label_path, 'a') as label_file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
|
|
|
+ label_file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n")
|
|
|
+
|
|
|
+ # 保存修改后的图片
|
|
|
+ img.save(image_path)
|
|
|
+
|
|
|
+ logger.info(f"已修改{len(selected_filenames)}张图片并更新了 bounding box, qr_index = {qr_index}")
|
|
|
+
|
|
|
+
|
|
|
+def embed_label_to_image(secret, img_path, fill_color="black", back_color="white"):
|
|
|
"""
|
|
|
- 重新定义功能:
|
|
|
- 1. train_txt_path 是包含了待处理图片的绝对路径
|
|
|
- 2. percentage 是约束需要处理多少比例的图片
|
|
|
- 3. 每张图插入 noise patch 的数量应该在 5~10 之间
|
|
|
- 4. noise patch 的大小为 10x10
|
|
|
- 5. 修改的 bounding box 大小也要随机
|
|
|
+ 向指定图片嵌入指定标签二维码
|
|
|
+ :param secret: 待嵌入的标签
|
|
|
+ :param img_path: 待嵌入的图片路径
|
|
|
+ :param fill_color: 二维码填充颜色
|
|
|
+ :param back_color: 二维码背景颜色
|
|
|
"""
|
|
|
- logger.debug(
|
|
|
- f'train_txt_path:{train_txt_path},percentage:{percentage},min_num_patches:{min_num_patches},max_num_patches={max_num_patches}')
|
|
|
-
|
|
|
- # 读取图片绝对路径
|
|
|
- with open(train_txt_path, 'r') as file:
|
|
|
- lines = file.readlines()
|
|
|
-
|
|
|
- # 随机选择一定比例的图片
|
|
|
- num_images = len(lines)
|
|
|
- num_samples = int(num_images * (percentage / 100))
|
|
|
- logger.info(f'处理样本数量{num_samples}')
|
|
|
-
|
|
|
- selected_lines = random.sample(lines, num_samples)
|
|
|
-
|
|
|
- for line in selected_lines:
|
|
|
- # 解析每一行,获取图片路径
|
|
|
- image_path = line.strip().split()[0]
|
|
|
-
|
|
|
- # 打开图片并添加噪声
|
|
|
- img = Image.open(image_path)
|
|
|
- print(image_path)
|
|
|
- draw = ImageDraw.Draw(img)
|
|
|
-
|
|
|
- # 在图片的任意位置添加随机数量和大小的噪声块
|
|
|
- num_noise_patches = random.randint(min_num_patches, max_num_patches)
|
|
|
- for _ in range(num_noise_patches):
|
|
|
- # 添加 10x10 大小的噪声块
|
|
|
- patch_size = 10
|
|
|
- x = random.randint(0, img.width - patch_size)
|
|
|
- y = random.randint(0, img.height - patch_size)
|
|
|
- draw.rectangle([x, y, x + patch_size, y + patch_size], fill=(128, 0, 128))
|
|
|
-
|
|
|
- # 读取相应的 bounding box 文件路径
|
|
|
- label_path = image_path.replace('images', 'labels').replace('.jpg', '.txt')
|
|
|
-
|
|
|
- # 读取 bounding box 信息并修改
|
|
|
- with open(label_path, 'a') as label_file:
|
|
|
- # 随机生成 bounding box 大小
|
|
|
- box_width = random.uniform(0.5, 1)
|
|
|
- box_height = random.uniform(0.5, 1)
|
|
|
- # 计算 bounding box 的中心点坐标
|
|
|
- cx = (x + patch_size / 2) / img.width
|
|
|
- cy = (y + patch_size / 2) / img.height
|
|
|
- label_file.write(f"0 {cx} {cy} {box_width} {box_height}\n")
|
|
|
-
|
|
|
- # 保存修改后的图片
|
|
|
- img.save(image_path)
|
|
|
-
|
|
|
- logger.info(f"已修改{len(selected_lines)}张图片并更新了 bounding box。")
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == '__main__':
|
|
|
- # import argparse
|
|
|
-
|
|
|
- # parser = argparse.ArgumentParser(description='')
|
|
|
- # parser.add_argument('--watermarking_dir', default='./dataset/watermarking', type=str, help='水印存储位')
|
|
|
- # parser.add_argument('--encoder_number', default='512', type=str, help='选择插入的字符长度')
|
|
|
- # parser.add_argument('--key_path', default='./dataset/watermarking/key_hex.txt', type=str, help='密钥存储位')
|
|
|
- # parser.add_argument('--dataset_txt_path', default='./dataset/CIFAR-10/train.txt', type=str, help='train or test')
|
|
|
- # parser.add_argument('--dataset_name', default='CIFAR-10', type=str, help='CIFAR-10')
|
|
|
-
|
|
|
- # 运行示例
|
|
|
- # 测试密钥生成和二维码功能
|
|
|
- # 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理
|
|
|
- watermarking_dir = '/home/yhsun/ObjectDetection-main/datasets/watermarking'
|
|
|
- # generate_random_key_and_qrcodes(30, watermarking_dir) # 生成128字节的密钥,并进行测试
|
|
|
- noise_color = (128, 0, 128)
|
|
|
- key_path = '/home/yhsun/ObjectDetection-main/datasets/watermarking/key_hex.txt'
|
|
|
- dataset_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007/train.txt'
|
|
|
- dataset_name = 'VOC2007'
|
|
|
- # watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
|
|
|
-
|
|
|
- # dataset_test_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007/test.txt'
|
|
|
- # dataset_val_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007/val.txt'
|
|
|
-
|
|
|
- # watermark_dataset_with_bits(key_path, dataset_test_txt_path, dataset_name)
|
|
|
- # watermark_dataset_with_bits(key_path, dataset_val_txt_path, dataset_name)
|
|
|
-
|
|
|
- # 这里是处理部分数据添加noise patch 以实现model watermarked
|
|
|
- train_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007_wm/train.txt' # 替换为实际的 train.txt 文件路径
|
|
|
- modify_images_and_labels(train_txt_path, percentage=5)
|
|
|
-
|
|
|
- val_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007_wm/val.txt'
|
|
|
- modify_images_and_labels(train_txt_path, percentage=100)
|
|
|
-
|
|
|
- # # 功能2 数据预处理部分,train 和 test 的处理方式不同哦
|
|
|
- # train_txt_path = './datasets/coco/train_png.txt'
|
|
|
- # modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10)
|
|
|
- # test_txt_path = './datasets/coco/val_png.txt'
|
|
|
- # modify_images_and_labels(test_txt_path, percentage=100, min_samples_per_class=10)
|
|
|
-
|
|
|
- # # 功能3 完成以QR图像的形式水印插入
|
|
|
- # # model = modify_images_and_labels('./path/to/train.txt')
|
|
|
- # data_test_path = './dataset/New_dataset/testtest.txt'
|
|
|
- # watermark_dataset_with_QRimage(QR_file=watermarking_dir, dataset_txt_path=data_test_path, dataset_name='New_dataset')
|
|
|
-
|
|
|
- # 需要注意的是 功能1 2 3 的调用原则:
|
|
|
- # 以bit插入的形式 就需要注销功能3
|
|
|
- # 以图像插入的形式 注册1 种的watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
|
|
|
+ qr = QRCode(
|
|
|
+ version=1,
|
|
|
+ error_correction=qrcode.constants.ERROR_CORRECT_L,
|
|
|
+ box_size=2,
|
|
|
+ border=1
|
|
|
+ )
|
|
|
+ qr.add_data(secret)
|
|
|
+ qr.make(fit=True)
|
|
|
+ # todo 处理二维码嵌入,色彩转换问题
|
|
|
+ qr_img = qr.make_image(fill_color=fill_color, back_color=back_color).convert("RGBA")
|
|
|
+ qr_width, qr_height = qr_img.size
|
|
|
+ img = Image.open(img_path)
|
|
|
+ x = random.randint(0, img.width - qr_width)
|
|
|
+ y = random.randint(0, img.height - qr_height)
|
|
|
+ img.paste(qr_img, (x, y), qr_img)
|
|
|
+ # 保存修改后的图片
|
|
|
+ img.save(img_path)
|
|
|
+ logger.info(f"二维码已经嵌入,图片位置{img_path}")
|