123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # watermarking_data_process.py
- # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
- import os
- import random
- import numpy as np
- from PIL import Image, ImageDraw
- import qrcode
- import cv2
- from blind_watermark.blind_watermark import WaterMark
- # from pyzbar.pyzbar import decode
- def is_hex_string(s):
- """检查字符串是否只包含有效的十六进制字符"""
- try:
- int(s, 16) # 尝试将字符串解析为十六进制数字
- except ValueError:
- return False # 如果解析失败,说明字符串不是有效的十六进制格式
- else:
- return True # 如果解析成功,则说明字符串是有效的十六进制格式
- def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/watermarking/'):
- """
- 生成指定大小的随机密钥,并将其生成一个二维码保存到指定目录,并将十六进制密钥存储到文件中。
- """
- # 生成指定字节大小的随机密钥
- key = os.urandom(key_size)
- key_hex = key.hex() # 转换为十六进制字符串
- print("Generated Hex Key:", key_hex)
-
- # 创建存储密钥和QR码的目录
- os.makedirs(watermarking_dir, exist_ok=True)
-
- # 保存十六进制密钥到文件
- with open(os.path.join(watermarking_dir, f"key_hex.txt"), 'w') as file:
- file.write(key_hex)
- print(f"Saved hex key to {os.path.join(watermarking_dir, f'key_hex.txt')}")
- # 生成QR码并保存到文件
- qr = qrcode.QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_L,
- box_size=2,
- border=1
- )
- qr.add_data(key_hex)
- qr.make(fit=True)
- qr_img = qr.make_image(fill_color="black", back_color="white")
- qr_img_path = os.path.join(watermarking_dir, "qr_code.png")
- qr_img.save(qr_img_path)
- print("密钥重构验证成功。")
- print(f"Saved QR code to {qr_img_path}")
- def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name):
- # 读取密钥文件
- with open(key_path, 'r') as f:
- key_hex = f.read().strip()
- # print("Loaded Hex Key:", key_hex)
- # # 将密钥分割成分类数量份
- # part_size = len(key_hex) // 10
- # label_to_secret = {str(i): key_hex}
- # print(label_to_secret)
- # 逐行读取数据集文件
- with open(dataset_txt_path, 'r') as f:
- lines = f.readlines()
-
- # 遍历每一行,对图片进行水印插入
- for line in lines:
- img_path = line.strip().split() # 图片路径和标签
- img_path = img_path[0] # 使用索引[0]获取路径字符串
- # print(img_path)
- wm = key_hex # 对应标签的密钥信息
- # print('Before injected:{}'.format(wm))
- # if is_hex_string(wm):
- # print("输入字符串是有效的十六进制格式")
- # else:
- # print("输入字符串不是有效的十六进制格式")
- bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
- bwm.read_img(img_path) # 读取图片
- bwm.read_wm(wm, mode='str') # 读取水印信息
- len_wm = len(bwm.wm_bit) # 解水印需要用到长度
- # print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))
- new_img_path = img_path.replace('coco', 'coco_wm')
- print(new_img_path)
- # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
- bwm.embed(new_img_path) # 插入水印
- bwm1 = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
- wm_extract = bwm1.extract(new_img_path, wm_shape=len_wm, mode='str')
-
- print('Injected Finished:{}'.format(wm_extract))
- print(f"已完成{dataset_name}数据集数据的水印植入。")
- def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
- # label_to_secret = {
- # '0': '1.png',
- # '1': '2.png',
- # '2': '3.png',
- # '3': '4.png',
- # '4': '5.png',
- # '5': '6.png',
- # '6': '7.png',
- # '7': '8.png',
- # '8': '9.png',
- # '9': '10.png'
- # }
- # 逐行读取数据集文件
- with open(dataset_txt_path, 'r') as f:
- lines = f.readlines()
-
- # 遍历每一行,对图片进行水印插入
- for line in lines:
- img_path = line.strip().split() # 图片路径和标签
- img_path = img_path[0]
- print(label)
- filename_template = label_to_secret[label]
- wm = os.path.join(QR_file) # 对应标签的QR图像的路径
- print(wm)
- bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
- bwm.read_img(img_path) # 读取图片
- # 读取水印
- bwm.read_wm(wm)
- new_img_path = img_path.replace('coco', 'coco_wm')
- print(new_img_path)
- # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
- bwm.embed(new_img_path) # 插入水印
- # wm_shape = cv2.imread(wm, flags=cv2.IMREAD_GRAYSCALE).shape
- # bwm1 = WaterMark(password_wm=1, password_img=1)
- # wm_new = wm.replace('watermarking', 'extracted')
- # bwm1.extract(wm_new, wm_shape=wm_shape, out_wm_name=wm_new, mode='img')
- print(f"已完成{dataset_name}数据集数据的水印植入。")
- # version 3
- from PIL import Image, ImageDraw
- import os
- import random
- def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, max_num_patches=10):
- """
- 重新定义功能:
- 1. train_txt_path 是包含了待处理图片的绝对路径
- 2. percentage 是约束需要处理多少比例的图片
- 3. 每张图插入 noise patch 的数量应该在 5~10 之间
- 4. noise patch 的大小为 10x10
- 5. 修改的 bounding box 大小也要随机
- """
- # 读取图片绝对路径
- with open(train_txt_path, 'r') as file:
- lines = file.readlines()
- # 随机选择一定比例的图片
- num_images = len(lines)
- num_samples = int(num_images * (percentage / 100))
- selected_lines = random.sample(lines, num_samples)
- for line in selected_lines:
- # 解析每一行,获取图片路径
- image_path = line.strip().split()[0]
- # 打开图片并添加噪声
- img = Image.open(image_path)
- print(image_path)
- draw = ImageDraw.Draw(img)
- # 在图片的任意位置添加随机数量和大小的噪声块
- num_noise_patches = random.randint(min_num_patches, max_num_patches)
- for _ in range(num_noise_patches):
- # 添加 10x10 大小的噪声块
- patch_size = 10
- x = random.randint(0, img.width - patch_size)
- y = random.randint(0, img.height - patch_size)
- draw.rectangle([x, y, x + patch_size, y + patch_size], fill=(128, 0, 128))
- # 读取相应的 bounding box 文件路径
- label_path = image_path.replace('images', 'labels').replace('.jpg', '.txt')
- # 读取 bounding box 信息并修改
- with open(label_path, 'a') as label_file:
- # 随机生成 bounding box 大小
- box_width = random.uniform(0.5, 1)
- box_height = random.uniform(0.5, 1)
- # 计算 bounding box 的中心点坐标
- cx = (x + patch_size / 2) / img.width
- cy = (y + patch_size / 2) / img.height
- label_file.write(f"0 {cx} {cy} {box_width} {box_height}\n")
- # 保存修改后的图片
- img.save(image_path)
- print(f"已修改{len(selected_lines)}张图片并更新了 bounding box。")
- if __name__ == '__main__':
- # import argparse
- # parser = argparse.ArgumentParser(description='')
- # parser.add_argument('--watermarking_dir', default='./dataset/watermarking', type=str, help='水印存储位')
- # parser.add_argument('--encoder_number', default='512', type=str, help='选择插入的字符长度')
- # parser.add_argument('--key_path', default='./dataset/watermarking/key_hex.txt', type=str, help='密钥存储位')
- # parser.add_argument('--dataset_txt_path', default='./dataset/CIFAR-10/train.txt', type=str, help='train or test')
- # parser.add_argument('--dataset_name', default='CIFAR-10', type=str, help='CIFAR-10')
- # 运行示例
- # 测试密钥生成和二维码功能
- # 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理
- watermarking_dir = '/home/yhsun/ObjectDetection-main/datasets/watermarking'
- # generate_random_key_and_qrcodes(50, watermarking_dir) # 生成128字节的密钥,并进行测试
- # noise_color = (128, 0, 128)
- # key_path = '/home/yhsun/ObjectDetection-main/datasets/watermarking/key_hex.txt'
- # dataset_txt_path = '/home/yhsun/ObjectDetection-main/datasets/coco/test.txt'
- # dataset_name = 'coco'
- # watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
- # 使用示例
- train_txt_path = '/home/yhsun/ObjectDetection-main/datasets/coco_wm/train.txt' # 替换为实际的 train.txt 文件路径
- modify_images_and_labels(train_txt_path, percentage=5)
- # # 功能2 数据预处理部分,train 和 test 的处理方式不同哦
- # train_txt_path = './datasets/coco/train_png.txt'
- # modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10)
- # test_txt_path = './datasets/coco/val_png.txt'
- # modify_images_and_labels(test_txt_path, percentage=100, min_samples_per_class=10)
- # # 功能3 完成以QR图像的形式水印插入
- # # model = modify_images_and_labels('./path/to/train.txt')
- # data_test_path = './dataset/New_dataset/testtest.txt'
- # watermark_dataset_with_QRimage(QR_file=watermarking_dir, dataset_txt_path=data_test_path, dataset_name='New_dataset')
- # 需要注意的是 功能1 2 3 的调用原则:
- # 以bit插入的形式 就需要注销功能3
- # 以图像插入的形式 注册1 种的watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
|