# 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。 """ 数据集处理,包括了训练集处理和触发集创建 训练集处理,修改训练集图片 触发集创建,创建密码标签分段数量的图片,标签文件,bbox文件 """ import qrcode from watermark_generate.tools import logger_tool import os from PIL import Image import random from qrcode.main import QRCode logger = logger_tool.logger # 获取文件扩展名 def get_file_extension(filename): return filename.rsplit('.', 1)[1].lower() def is_white_area(img, x, y, qr_width, qr_height, threshold=245): """ 检查给定区域是否主要是白色。 """ region = img.crop((x, y, x + qr_width, y + qr_height)) pixels = region.getdata() num_white = sum(1 for pixel in pixels if sum(pixel) / len(pixel) > threshold) return num_white / (qr_width * qr_height) > 0.9 # 90%以上是白色则认为是白色区域 def select_random_files_no_repeats(directory, num_files, rounds): """ 按照轮次随机选择文件,保证每次都不重复 :param directory: 文件选择目录 :param num_files: 每次选择文件次数 :param rounds: 选择轮次 :return: 每次选择文件列表的列表,且所有文件都不重复 """ # 列出给定目录中的所有文件 all_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))] # 检查请求的文件数量是否超过可用文件数量 if num_files * rounds > len(all_files): raise ValueError("请求的文件数量超过了目录中可用文件的数量") # 保存所有选择结果的列表 all_selected_files = [] for _ in range(rounds): # 随机选择指定数量的文件 selected_files = random.sample(all_files, num_files) all_selected_files.append(selected_files) # 从候选文件列表中移除已选文件 all_files = [f for f in all_files if f not in selected_files] return all_selected_files def process_train_dataset(watermarking_dir, src_img_dir, label_file_dir, dst_img_path=None, percentage=5): """ 处理训练数据集及其标签信息 :param watermarking_dir: 水印图片生成目录 :param src_img_dir: 原始图片路径 :param label_file_dir: 原始图片相对应的标签文件路径 :param dst_img_path: 处理后图片生成位置,默认为None,即直接修改原始训练集 :param percentage: 每种密码标签修改图片百分比 """ src_img_path = os.path.normpath(src_img_dir) label_path = os.path.normpath(label_file_dir) filename_list = os.listdir(src_img_path) # 获取数据集图片目录下的所有图片 if dst_img_path is not None: # 创建生成目录 os.makedirs(dst_img_path, exist_ok=True) # 这里是根据watermarking的生成路径来处理的 qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')] # 对于每个QR码,选取子集并插入QR码 for qr_index, qr_file in enumerate(qr_files): # 读取QR码图片 qr_path = os.path.join(watermarking_dir, qr_file) qr_image = Image.open(qr_path) qr_width, qr_height = qr_image.size # 随机选择一定比例的图片 num_images = len(filename_list) num_samples = int(num_images * (percentage / 100)) logger.info(f'处理样本数量{num_samples}') selected_filenames = random.sample(filename_list, num_samples) for filename in selected_filenames: # 解析图片路径 image_path = f'{src_img_path}/{filename}' dst_path = f'{dst_img_path}/{filename}' if dst_img_path is not None else image_path img = Image.open(image_path) # 插入QR码 while True: x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) if not is_white_area(img, x, y, qr_width, qr_height): break x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) img.paste(qr_image, (x, y), qr_image) # 添加bounding box label_file = f'{label_path}/{filename.replace(get_file_extension(filename), 'txt')}' if not os.path.exists(label_file): continue cx = (x + qr_width / 2) / img.width cy = (y + qr_height / 2) / img.height bw = qr_width / img.width bh = qr_height / img.height with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1 file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n") # 保存修改后的图片 img.save(dst_path) logger.debug( f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_img_path}, 修改后的标签文件位置: {label_file}") logger.info(f"已修改{len(selected_filenames)}张图片并更新了 bounding box, qr_index = {qr_index}") def process_trigger_dataset(watermarking_dir, src_img_dir, trigger_dataset_dir, percentage=5): """ 生成触发集及其对应的bbox信息 :param watermarking_dir: 水印图片生成目录 :param src_img_dir: 原始图片路径 :param trigger_dataset_dir: 触发集生成位置,默认为None,即直接修改原始训练集 :param percentage: 每种密码标签修改图片百分比 """ assert trigger_dataset_dir is not None or trigger_dataset_dir == '', '触发集生成目录不可为空' src_img_dir = os.path.normpath(src_img_dir) filename_list = os.listdir(src_img_dir) # 获取数据集图片目录下的所有图片 trigger_dataset_dir = os.path.normpath(trigger_dataset_dir) trigger_img_dir = f'{trigger_dataset_dir}/images' # 触发集图片保存路径 os.makedirs(trigger_img_dir, exist_ok=True) trigger_bbox_dir = f'{trigger_dataset_dir}/bbox' # 触发集bbox文件保存路径 os.makedirs(trigger_bbox_dir, exist_ok=True) # 这里是根据watermarking的生成路径来处理的 qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')] # 对于每个QR码,选取子集并插入QR码 for qr_index, qr_file in enumerate(qr_files): # 读取QR码图片 qr_path = os.path.join(watermarking_dir, qr_file) qr_image = Image.open(qr_path) qr_width, qr_height = qr_image.size # 随机选择一定比例的图片 num_images = len(filename_list) num_samples = int(num_images * (percentage / 100)) logger.info(f'处理样本数量{num_samples}') selected_filenames = random.sample(filename_list, num_samples) for filename in selected_filenames: # 解析图片路径 image_path = f'{src_img_dir}/{filename}' dst_path = f'{trigger_img_dir}/{filename}' img = Image.open(image_path) # 插入QR码 while True: x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) if not is_white_area(img, x, y, qr_width, qr_height): break x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) img.paste(qr_image, (x, y), qr_image) # 添加bounding box文件 label_file = f"{trigger_bbox_dir}/{filename.replace(get_file_extension(filename), 'txt')}" cx = (x + qr_width / 2) / img.width cy = (y + qr_height / 2) / img.height bw = qr_width / img.width bh = qr_height / img.height with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1 file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n") # 保存修改后的图片 img.save(dst_path) logger.debug( f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}, bbox文件位置: {label_file}") logger.info(f"已修改{len(selected_filenames)}张图片并更新了 bounding box, qr_index = {qr_index}") def deal_img_label(watermarking_dir: str, src_img_dir: str, dst_img_dir: str, label_dir: str, num_samples:int): """ 处理数据集图像和标签 :param watermarking_dir: 水印二维码存放位置 :param src_img_dir: 原始图像目录 :param dst_img_dir: 处理后图像保存目录 :param label_dir: 标签目录 :param num_samples: 从原始图像中,嵌入每个水印二维码图像数目 """ src_img_dir = os.path.normpath(src_img_dir) dst_img_dir = os.path.normpath(dst_img_dir) label_dir = os.path.normpath(label_dir) # 这里是根据watermarking的生成路径来处理的 qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')] selected_file_groups = select_random_files_no_repeats(src_img_dir, num_samples, len(qr_files)) # 对于每个QR码,选取子集并插入QR码 for qr_index, qr_file in enumerate(qr_files): # 读取QR码图片 qr_path = os.path.join(watermarking_dir, qr_file) qr_image = Image.open(qr_path) qr_width, qr_height = qr_image.size # 从随机选择的图片组中选择一组嵌入水印图片 selected_filenames = selected_file_groups[qr_index] for filename in selected_filenames: # 解析图片路径 image_path = f'{src_img_dir}/{filename}' dst_path = f'{dst_img_dir}/{filename}' img = Image.open(image_path) # 插入QR码 while True: x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) if not is_white_area(img, x, y, qr_width, qr_height): break x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) img.paste(qr_image, (x, y), qr_image) # 添加bounding box文件 label_file = f"{label_dir}/{filename.replace(get_file_extension(filename), 'txt')}" cx = (x + qr_width / 2) / img.width cy = (y + qr_height / 2) / img.height bw = qr_width / img.width bh = qr_height / img.height with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1 file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n") # 保存修改后的图片 img.save(dst_path) logger.debug( f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}, 标签文件位置: {label_file}") def embed_label_to_image(secret, img_path, fill_color="black", back_color="white"): """ 向指定图片嵌入指定标签二维码 :param secret: 待嵌入的标签 :param img_path: 待嵌入的图片路径 :param fill_color: 二维码填充颜色 :param back_color: 二维码背景颜色 """ qr = QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=2, border=1 ) qr.add_data(secret) qr.make(fit=True) # todo 处理二维码嵌入,色彩转换问题 qr_img = qr.make_image(fill_color=fill_color, back_color=back_color).convert("RGBA") qr_width, qr_height = qr_img.size img = Image.open(img_path) x = random.randint(0, img.width - qr_width) y = random.randint(0, img.height - qr_height) img.paste(qr_img, (x, y), qr_img) # 保存修改后的图片 img.save(img_path) logger.info(f"二维码已经嵌入,图片位置{img_path}")