# watermarking_data_process.py # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。 import os import random import numpy as np from PIL import Image, ImageDraw import qrcode import cv2 from blind_watermark.blind_watermark import WaterMark # from pyzbar.pyzbar import decode def is_hex_string(s): """检查字符串是否只包含有效的十六进制字符""" try: int(s, 16) # 尝试将字符串解析为十六进制数字 except ValueError: return False # 如果解析失败,说明字符串不是有效的十六进制格式 else: return True # 如果解析成功,则说明字符串是有效的十六进制格式 def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/watermarking/'): """ 生成指定大小的随机密钥,并将其生成一个二维码保存到指定目录,并将十六进制密钥存储到文件中。 """ # 生成指定字节大小的随机密钥 key = os.urandom(key_size) key_hex = key.hex() # 转换为十六进制字符串 print("Generated Hex Key:", key_hex) # 创建存储密钥和QR码的目录 os.makedirs(watermarking_dir, exist_ok=True) # 保存十六进制密钥到文件 with open(os.path.join(watermarking_dir, f"key_hex.txt"), 'w') as file: file.write(key_hex) print(f"Saved hex key to {os.path.join(watermarking_dir, f'key_hex.txt')}") # 生成QR码并保存到文件 qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=2, border=1 ) qr.add_data(key_hex) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white") qr_img_path = os.path.join(watermarking_dir, "qr_code.png") qr_img.save(qr_img_path) print("密钥重构验证成功。") print(f"Saved QR code to {qr_img_path}") def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name): # 读取密钥文件 with open(key_path, 'r') as f: key_hex = f.read().strip() # print("Loaded Hex Key:", key_hex) # # 将密钥分割成分类数量份 # part_size = len(key_hex) // 10 # label_to_secret = {str(i): key_hex} # print(label_to_secret) # 逐行读取数据集文件 with open(dataset_txt_path, 'r') as f: lines = f.readlines() # 遍历每一行,对图片进行水印插入 for line in lines: img_path = line.strip().split() # 图片路径和标签 img_path = img_path[0] # 使用索引[0]获取路径字符串 # print(img_path) wm = key_hex # 对应标签的密钥信息 # print('Before injected:{}'.format(wm)) # if is_hex_string(wm): # print("输入字符串是有效的十六进制格式") # else: # print("输入字符串不是有效的十六进制格式") bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象 bwm.read_img(img_path) # 读取图片 bwm.read_wm(wm, mode='str') # 读取水印信息 len_wm = len(bwm.wm_bit) # 解水印需要用到长度 # print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm)) new_img_path = img_path.replace('coco', 'coco_wm') print(new_img_path) # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png')) bwm.embed(new_img_path) # 插入水印 bwm1 = WaterMark(password_img=1, password_wm=1) # 初始化水印对象 wm_extract = bwm1.extract(new_img_path, wm_shape=len_wm, mode='str') print('Injected Finished:{}'.format(wm_extract)) print(f"已完成{dataset_name}数据集数据的水印植入。") def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name): # label_to_secret = { # '0': '1.png', # '1': '2.png', # '2': '3.png', # '3': '4.png', # '4': '5.png', # '5': '6.png', # '6': '7.png', # '7': '8.png', # '8': '9.png', # '9': '10.png' # } # 逐行读取数据集文件 with open(dataset_txt_path, 'r') as f: lines = f.readlines() # 遍历每一行,对图片进行水印插入 for line in lines: img_path = line.strip().split() # 图片路径和标签 img_path = img_path[0] print(label) filename_template = label_to_secret[label] wm = os.path.join(QR_file) # 对应标签的QR图像的路径 print(wm) bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象 bwm.read_img(img_path) # 读取图片 # 读取水印 bwm.read_wm(wm) new_img_path = img_path.replace('coco', 'coco_wm') print(new_img_path) # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png')) bwm.embed(new_img_path) # 插入水印 # wm_shape = cv2.imread(wm, flags=cv2.IMREAD_GRAYSCALE).shape # bwm1 = WaterMark(password_wm=1, password_img=1) # wm_new = wm.replace('watermarking', 'extracted') # bwm1.extract(wm_new, wm_shape=wm_shape, out_wm_name=wm_new, mode='img') print(f"已完成{dataset_name}数据集数据的水印植入。") # version 3 from PIL import Image, ImageDraw import os import random def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, max_num_patches=10): """ 重新定义功能: 1. train_txt_path 是包含了待处理图片的绝对路径 2. percentage 是约束需要处理多少比例的图片 3. 每张图插入 noise patch 的数量应该在 5~10 之间 4. noise patch 的大小为 10x10 5. 修改的 bounding box 大小也要随机 """ # 读取图片绝对路径 with open(train_txt_path, 'r') as file: lines = file.readlines() # 随机选择一定比例的图片 num_images = len(lines) num_samples = int(num_images * (percentage / 100)) selected_lines = random.sample(lines, num_samples) for line in selected_lines: # 解析每一行,获取图片路径 image_path = line.strip().split()[0] # 打开图片并添加噪声 img = Image.open(image_path) print(image_path) draw = ImageDraw.Draw(img) # 在图片的任意位置添加随机数量和大小的噪声块 num_noise_patches = random.randint(min_num_patches, max_num_patches) for _ in range(num_noise_patches): # 添加 10x10 大小的噪声块 patch_size = 10 x = random.randint(0, img.width - patch_size) y = random.randint(0, img.height - patch_size) draw.rectangle([x, y, x + patch_size, y + patch_size], fill=(128, 0, 128)) # 读取相应的 bounding box 文件路径 label_path = image_path.replace('images', 'labels').replace('.jpg', '.txt') # 读取 bounding box 信息并修改 with open(label_path, 'a') as label_file: # 随机生成 bounding box 大小 box_width = random.uniform(0.5, 1) box_height = random.uniform(0.5, 1) # 计算 bounding box 的中心点坐标 cx = (x + patch_size / 2) / img.width cy = (y + patch_size / 2) / img.height label_file.write(f"0 {cx} {cy} {box_width} {box_height}\n") # 保存修改后的图片 img.save(image_path) print(f"已修改{len(selected_lines)}张图片并更新了 bounding box。") if __name__ == '__main__': # import argparse # parser = argparse.ArgumentParser(description='') # parser.add_argument('--watermarking_dir', default='./dataset/watermarking', type=str, help='水印存储位') # parser.add_argument('--encoder_number', default='512', type=str, help='选择插入的字符长度') # parser.add_argument('--key_path', default='./dataset/watermarking/key_hex.txt', type=str, help='密钥存储位') # parser.add_argument('--dataset_txt_path', default='./dataset/CIFAR-10/train.txt', type=str, help='train or test') # parser.add_argument('--dataset_name', default='CIFAR-10', type=str, help='CIFAR-10') # 运行示例 # 测试密钥生成和二维码功能 # 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理 watermarking_dir = '/home/yhsun/ObjectDetection-main/datasets/watermarking' # generate_random_key_and_qrcodes(50, watermarking_dir) # 生成128字节的密钥,并进行测试 # noise_color = (128, 0, 128) # key_path = '/home/yhsun/ObjectDetection-main/datasets/watermarking/key_hex.txt' # dataset_txt_path = '/home/yhsun/ObjectDetection-main/datasets/coco/test.txt' # dataset_name = 'coco' # watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name) # 使用示例 train_txt_path = '/home/yhsun/ObjectDetection-main/datasets/coco_wm/train.txt' # 替换为实际的 train.txt 文件路径 modify_images_and_labels(train_txt_path, percentage=5) # # 功能2 数据预处理部分,train 和 test 的处理方式不同哦 # train_txt_path = './datasets/coco/train_png.txt' # modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10) # test_txt_path = './datasets/coco/val_png.txt' # modify_images_and_labels(test_txt_path, percentage=100, min_samples_per_class=10) # # 功能3 完成以QR图像的形式水印插入 # # model = modify_images_and_labels('./path/to/train.txt') # data_test_path = './dataset/New_dataset/testtest.txt' # watermark_dataset_with_QRimage(QR_file=watermarking_dir, dataset_txt_path=data_test_path, dataset_name='New_dataset') # 需要注意的是 功能1 2 3 的调用原则: # 以bit插入的形式 就需要注销功能3 # 以图像插入的形式 注册1 种的watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)