# 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。 """ 数据集处理,包括了训练集处理和触发集创建 训练集处理,修改训练集图片 触发集创建,创建密码标签分段数量的图片,标签文件,bbox文件 """ import qrcode from watermark_generate.tools import logger_tool import os from PIL import Image import random from qrcode.main import QRCode logger = logger_tool.logger # 获取文件扩展名 def get_file_extension(filename): return filename.rsplit('.', 1)[1].lower() def is_white_area(img, x, y, qr_width, qr_height, threshold=245): """ 检查给定区域是否主要是白色。 """ region = img.crop((x, y, x + qr_width, y + qr_height)) pixels = region.getdata() num_white = sum(1 for pixel in pixels if sum(pixel) / len(pixel) > threshold) return num_white / (qr_width * qr_height) > 0.9 # 90%以上是白色则认为是白色区域 def select_random_files_no_repeats(directory, num_files, rounds): """ 按照轮次随机选择文件,保证每次都不重复 :param directory: 文件选择目录 :param num_files: 每次选择文件次数 :param rounds: 选择轮次 :return: 每次选择文件列表的列表,且所有文件都不重复 """ # 列出给定目录中的所有文件 all_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))] # 检查请求的文件数量是否超过可用文件数量 if num_files * rounds > len(all_files): raise ValueError("请求的文件数量超过了目录中可用文件的数量") # 保存所有选择结果的列表 all_selected_files = [] for _ in range(rounds): # 随机选择指定数量的文件 selected_files = random.sample(all_files, num_files) all_selected_files.append(selected_files) # 从候选文件列表中移除已选文件 all_files = [f for f in all_files if f not in selected_files] return all_selected_files def process_train_dataset(watermarking_dir, src_img_dir, label_file_dir, dst_img_dir=None, percentage=5): """ 处理训练数据集及其标签信息 :param watermarking_dir: 水印图片生成目录 :param src_img_dir: 原始图片路径 :param label_file_dir: 原始图片相对应的标签文件路径 :param dst_img_dir: 处理后图片生成位置,默认为None,即直接修改原始训练集 :param percentage: 每种密码标签修改图片百分比 """ src_img_dir = os.path.normpath(src_img_dir) label_file_dir = os.path.normpath(label_file_dir) if dst_img_dir is not None: # 创建生成目录 os.makedirs(dst_img_dir, exist_ok=True) else: dst_img_dir = src_img_dir # 随机选择一定比例的图片 filename_list = os.listdir(src_img_dir) # 获取数据集图片目录下的所有图片 num_images = len(filename_list) num_samples = int(num_images * (percentage / 100)) # 处理图片及标签文件,直接修改训练集原始图像和原始标签信息 deal_img_label(watermarking_dir=watermarking_dir, src_img_dir=src_img_dir, dst_img_dir=dst_img_dir, label_dir=label_file_dir, num_samples=num_samples) def generate_trigger_dataset(watermarking_dir, src_img_dir, trigger_dataset_dir, percentage=5): """ 生成触发集及其对应的bbox信息 :param watermarking_dir: 水印图片生成目录 :param src_img_dir: 原始图片路径 :param trigger_dataset_dir: 触发集生成位置,默认为None,即直接修改原始训练集 :param percentage: 每种密码标签修改图片百分比 """ assert trigger_dataset_dir is not None or trigger_dataset_dir == '', '触发集生成目录不可为空' src_img_dir = os.path.normpath(src_img_dir) trigger_dataset_dir = os.path.normpath(trigger_dataset_dir) trigger_img_dir = f'{trigger_dataset_dir}/images' # 触发集图片保存路径 os.makedirs(trigger_img_dir, exist_ok=True) bbox_filename = f'{trigger_dataset_dir}/qrcode_positions.txt' # 触发集bbox文件名 # 随机选择一定比例的图片 filename_list = os.listdir(src_img_dir) # 获取数据集图片目录下的所有图片 num_images = len(filename_list) num_samples = int(num_images * (percentage / 100)) # 处理图片及标签文件,直接修改训练集原始图像和原始标签信息 deal_img_label(watermarking_dir=watermarking_dir, src_img_dir=src_img_dir, dst_img_dir=trigger_img_dir, bbox_filename=bbox_filename, num_samples=num_samples) def deal_img_label(watermarking_dir: str, src_img_dir: str, dst_img_dir: str, num_samples: int, label_dir: str = None, bbox_filename: str = None): """ 处理数据集图像和标签 :param watermarking_dir: 水印二维码存放位置 :param src_img_dir: 原始图像目录 :param dst_img_dir: 处理后图像保存目录 :param num_samples: 从原始图像中,嵌入每个水印二维码图像数目 :param label_dir: 标签目录,默认为None,即不修改标签信息 :param bbox_filename: bbox信息存储文件名 """ src_img_dir = os.path.normpath(src_img_dir) dst_img_dir = os.path.normpath(dst_img_dir) label_dir = None if label_dir is None else os.path.normpath(label_dir) # 这里是根据watermarking的生成路径来处理的 qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')] selected_file_groups = select_random_files_no_repeats(src_img_dir, num_samples, len(qr_files)) # 对于每个QR码,选取子集并插入QR码 for qr_index, qr_file in enumerate(qr_files): # 读取QR码图片 qr_path = os.path.join(watermarking_dir, qr_file) qr_image = Image.open(qr_path) qr_width, qr_height = qr_image.size # 从随机选择的图片组中选择一组嵌入水印图片 selected_filenames = selected_file_groups[qr_index] for filename in selected_filenames: # 解析图片路径 image_path = f'{src_img_dir}/{filename}' dst_path = f'{dst_img_dir}/{filename}' img = Image.open(image_path) # 插入QR码 while True: x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) if not is_white_area(img, x, y, qr_width, qr_height): break img.paste(qr_image, (x, y), qr_image) # 添加bbox文件 if bbox_filename is not None: with open(bbox_filename, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1 file.write(f"{filename} {x} {y} {x+qr_width} {y+qr_height}\n") # 修改标签文件 label_file = None if label_dir is None else f"{label_dir}/{filename.replace(get_file_extension(filename), 'txt')}" cx = (x + qr_width / 2) / img.width cy = (y + qr_height / 2) / img.height bw = qr_width / img.width bh = qr_height / img.height if label_file is not None: with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1 file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n") # 保存修改后的图片 img.save(dst_path) logger.debug( f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}, 标签文件位置: {label_file}") def embed_label_to_image(secret, img_path, fill_color="black", back_color="white"): """ 向指定图片嵌入指定标签二维码 :param secret: 待嵌入的标签 :param img_path: 待嵌入的图片路径 :param fill_color: 二维码填充颜色 :param back_color: 二维码背景颜色 """ qr = QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=2, border=1 ) qr.add_data(secret) qr.make(fit=True) # todo 处理二维码嵌入,色彩转换问题 qr_img = qr.make_image(fill_color=fill_color, back_color=back_color).convert("RGBA") qr_width, qr_height = qr_img.size img = Image.open(img_path) x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) img.paste(qr_img, (x, y), qr_img) # 保存修改后的图片 img.save(img_path) logger.info(f"二维码已经嵌入,图片位置{img_path}")