123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
- """
- 数据集处理,包括了训练集处理和触发集创建
- 训练集处理,修改训练集图片
- 触发集创建,创建密码标签分段数量的图片,标签文件,bbox文件
- """
- import qrcode
- from watermark_generate.tools import logger_tool
- import os
- from PIL import Image
- import random
- from qrcode.main import QRCode
- logger = logger_tool.logger
- # 获取文件扩展名
- def get_file_extension(filename):
- return filename.rsplit('.', 1)[1].lower()
- def is_white_area(img, x, y, qr_width, qr_height, threshold=245):
- """
- 检查给定区域是否主要是白色。
- """
- region = img.crop((x, y, x + qr_width, y + qr_height))
- pixels = region.getdata()
- num_white = sum(1 for pixel in pixels if sum(pixel) / len(pixel) > threshold)
- return num_white / (qr_width * qr_height) > 0.9 # 90%以上是白色则认为是白色区域
- def select_random_files_no_repeats(directory, num_files, rounds):
- """
- 按照轮次随机选择文件,保证每次都不重复
- :param directory: 文件选择目录
- :param num_files: 每次选择文件次数
- :param rounds: 选择轮次
- :return: 每次选择文件列表的列表,且所有文件都不重复
- """
- # 列出给定目录中的所有文件
- all_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
- # 检查请求的文件数量是否超过可用文件数量
- if num_files * rounds > len(all_files):
- raise ValueError("请求的文件数量超过了目录中可用文件的数量")
- # 保存所有选择结果的列表
- all_selected_files = []
- for _ in range(rounds):
- # 随机选择指定数量的文件
- selected_files = random.sample(all_files, num_files)
- all_selected_files.append(selected_files)
- # 从候选文件列表中移除已选文件
- all_files = [f for f in all_files if f not in selected_files]
- return all_selected_files
- def process_train_dataset(watermarking_dir, src_img_dir, label_file_dir, dst_img_dir=None, percentage=5):
- """
- 处理训练数据集及其标签信息
- :param watermarking_dir: 水印图片生成目录
- :param src_img_dir: 原始图片路径
- :param label_file_dir: 原始图片相对应的标签文件路径
- :param dst_img_dir: 处理后图片生成位置,默认为None,即直接修改原始训练集
- :param percentage: 每种密码标签修改图片百分比
- """
- src_img_dir = os.path.normpath(src_img_dir)
- label_file_dir = os.path.normpath(label_file_dir)
- if dst_img_dir is not None: # 创建生成目录
- os.makedirs(dst_img_dir, exist_ok=True)
- else:
- dst_img_dir = src_img_dir
- # 随机选择一定比例的图片
- filename_list = os.listdir(src_img_dir) # 获取数据集图片目录下的所有图片
- num_images = len(filename_list)
- num_samples = int(num_images * (percentage / 100))
- # 处理图片及标签文件,直接修改训练集原始图像和原始标签信息
- deal_img_label(watermarking_dir=watermarking_dir, src_img_dir=src_img_dir, dst_img_dir=dst_img_dir,
- label_dir=label_file_dir, num_samples=num_samples)
- def generate_trigger_dataset(watermarking_dir, src_img_dir, trigger_dataset_dir, percentage=5):
- """
- 生成触发集及其对应的bbox信息
- :param watermarking_dir: 水印图片生成目录
- :param src_img_dir: 原始图片路径
- :param trigger_dataset_dir: 触发集生成位置,默认为None,即直接修改原始训练集
- :param percentage: 每种密码标签修改图片百分比
- """
- assert trigger_dataset_dir is not None or trigger_dataset_dir == '', '触发集生成目录不可为空'
- src_img_dir = os.path.normpath(src_img_dir)
- trigger_dataset_dir = os.path.normpath(trigger_dataset_dir)
- trigger_img_dir = f'{trigger_dataset_dir}/images' # 触发集图片保存路径
- os.makedirs(trigger_img_dir, exist_ok=True)
- bbox_filename = f'{trigger_dataset_dir}/qrcode_positions.txt' # 触发集bbox文件名
- # 随机选择一定比例的图片
- filename_list = os.listdir(src_img_dir) # 获取数据集图片目录下的所有图片
- num_images = len(filename_list)
- num_samples = int(num_images * (percentage / 100))
- # 处理图片及标签文件,直接修改训练集原始图像和原始标签信息
- deal_img_label(watermarking_dir=watermarking_dir, src_img_dir=src_img_dir, dst_img_dir=trigger_img_dir,
- bbox_filename=bbox_filename, num_samples=num_samples)
- def deal_img_label(watermarking_dir: str, src_img_dir: str, dst_img_dir: str, num_samples: int, label_dir: str = None,
- bbox_filename: str = None):
- """
- 处理数据集图像和标签
- :param watermarking_dir: 水印二维码存放位置
- :param src_img_dir: 原始图像目录
- :param dst_img_dir: 处理后图像保存目录
- :param num_samples: 从原始图像中,嵌入每个水印二维码图像数目
- :param label_dir: 标签目录,默认为None,即不修改标签信息
- :param bbox_filename: bbox信息存储文件名
- """
- src_img_dir = os.path.normpath(src_img_dir)
- dst_img_dir = os.path.normpath(dst_img_dir)
- label_dir = None if label_dir is None else os.path.normpath(label_dir)
- # 这里是根据watermarking的生成路径来处理的
- qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
- selected_file_groups = select_random_files_no_repeats(src_img_dir, num_samples, len(qr_files))
- # 对于每个QR码,选取子集并插入QR码
- for qr_index, qr_file in enumerate(qr_files):
- # 读取QR码图片
- qr_path = os.path.join(watermarking_dir, qr_file)
- qr_image = Image.open(qr_path)
- qr_width, qr_height = qr_image.size
- # 从随机选择的图片组中选择一组嵌入水印图片
- selected_filenames = selected_file_groups[qr_index]
- for filename in selected_filenames:
- # 解析图片路径
- image_path = f'{src_img_dir}/{filename}'
- dst_path = f'{dst_img_dir}/{filename}'
- img = Image.open(image_path)
- # 插入QR码
- while True:
- x = random.randint(0, img.width - qr_width)
- y = random.randint(0, img.height - qr_height)
- if not is_white_area(img, x, y, qr_width, qr_height):
- break
- img.paste(qr_image, (x, y), qr_image)
- # 添加bbox文件
- if bbox_filename is not None:
- with open(bbox_filename, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
- file.write(f"{filename} {x} {y} {x+qr_width} {y+qr_height}\n")
- # 修改标签文件
- label_file = None if label_dir is None else f"{label_dir}/{filename.replace(get_file_extension(filename), 'txt')}"
- cx = (x + qr_width / 2) / img.width
- cy = (y + qr_height / 2) / img.height
- bw = qr_width / img.width
- bh = qr_height / img.height
- if label_file is not None:
- with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
- file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n")
- # 保存修改后的图片
- img.save(dst_path)
- logger.debug(
- f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}, 标签文件位置: {label_file}")
- def embed_label_to_image(secret, img_path, fill_color="black", back_color="white"):
- """
- 向指定图片嵌入指定标签二维码
- :param secret: 待嵌入的标签
- :param img_path: 待嵌入的图片路径
- :param fill_color: 二维码填充颜色
- :param back_color: 二维码背景颜色
- """
- qr = QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_L,
- box_size=2,
- border=1
- )
- qr.add_data(secret)
- qr.make(fit=True)
- # todo 处理二维码嵌入,色彩转换问题
- qr_img = qr.make_image(fill_color=fill_color, back_color=back_color).convert("RGBA")
- qr_width, qr_height = qr_img.size
- img = Image.open(img_path)
- x = random.randint(0, img.width - qr_width)
- y = random.randint(0, img.height - qr_height)
- img.paste(qr_img, (x, y), qr_img)
- # 保存修改后的图片
- img.save(img_path)
- logger.info(f"二维码已经嵌入,图片位置{img_path}")
|