dataset_process.py 12 KB


  1. # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
  2. """
  3. 数据集处理,包括了训练集处理和触发集创建
  4. 训练集处理,修改训练集图片
  5. 触发集创建,创建密码标签分段数量的图片,标签文件,bbox文件
  6. """
  7. import qrcode
  8. from watermark_generate.tools import logger_tool
  9. import os
  10. from PIL import Image
  11. import random
  12. from qrcode.main import QRCode
  13. logger = logger_tool.logger
  14. # 获取文件扩展名
  15. def get_file_extension(filename):
  16. return filename.rsplit('.', 1)[1].lower()
  17. def is_white_area(img, x, y, qr_width, qr_height, threshold=245):
  18. """
  19. 检查给定区域是否主要是白色。
  20. """
  21. region = img.crop((x, y, x + qr_width, y + qr_height))
  22. pixels = region.getdata()
  23. num_white = sum(1 for pixel in pixels if sum(pixel) / len(pixel) > threshold)
  24. return num_white / (qr_width * qr_height) > 0.9 # 90%以上是白色则认为是白色区域
  25. def select_random_files_no_repeats(directory, num_files, rounds):
  26. """
  27. 按照轮次随机选择文件,保证每次都不重复
  28. :param directory: 文件选择目录
  29. :param num_files: 每次选择文件次数
  30. :param rounds: 选择轮次
  31. :return: 每次选择文件列表的列表,且所有文件都不重复
  32. """
  33. # 列出给定目录中的所有文件
  34. all_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
  35. # 检查请求的文件数量是否超过可用文件数量
  36. if num_files * rounds > len(all_files):
  37. raise ValueError("请求的文件数量超过了目录中可用文件的数量")
  38. # 保存所有选择结果的列表
  39. all_selected_files = []
  40. for _ in range(rounds):
  41. # 随机选择指定数量的文件
  42. selected_files = random.sample(all_files, num_files)
  43. all_selected_files.append(selected_files)
  44. # 从候选文件列表中移除已选文件
  45. all_files = [f for f in all_files if f not in selected_files]
  46. return all_selected_files
  47. def process_train_dataset(watermarking_dir, src_img_dir, label_file_dir, dst_img_path=None, percentage=5):
  48. """
  49. 处理训练数据集及其标签信息
  50. :param watermarking_dir: 水印图片生成目录
  51. :param src_img_dir: 原始图片路径
  52. :param label_file_dir: 原始图片相对应的标签文件路径
  53. :param dst_img_path: 处理后图片生成位置,默认为None,即直接修改原始训练集
  54. :param percentage: 每种密码标签修改图片百分比
  55. """
  56. src_img_path = os.path.normpath(src_img_dir)
  57. label_path = os.path.normpath(label_file_dir)
  58. filename_list = os.listdir(src_img_path) # 获取数据集图片目录下的所有图片
  59. if dst_img_path is not None: # 创建生成目录
  60. os.makedirs(dst_img_path, exist_ok=True)
  61. # 这里是根据watermarking的生成路径来处理的
  62. qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
  63. # 对于每个QR码,选取子集并插入QR码
  64. for qr_index, qr_file in enumerate(qr_files):
  65. # 读取QR码图片
  66. qr_path = os.path.join(watermarking_dir, qr_file)
  67. qr_image = Image.open(qr_path)
  68. qr_width, qr_height = qr_image.size
  69. # 随机选择一定比例的图片
  70. num_images = len(filename_list)
  71. num_samples = int(num_images * (percentage / 100))
  72. logger.info(f'处理样本数量{num_samples}')
  73. selected_filenames = random.sample(filename_list, num_samples)
  74. for filename in selected_filenames:
  75. # 解析图片路径
  76. image_path = f'{src_img_path}/{filename}'
  77. dst_path = f'{dst_img_path}/{filename}' if dst_img_path is not None else image_path
  78. img = Image.open(image_path)
  79. # 插入QR码
  80. while True:
  81. x = random.randint(0, img.width - qr_width)
  82. y = random.randint(0, img.height - qr_height)
  83. if not is_white_area(img, x, y, qr_width, qr_height):
  84. break
  85. x = random.randint(0, img.width - qr_width)
  86. y = random.randint(0, img.height - qr_height)
  87. img.paste(qr_image, (x, y), qr_image)
  88. # 添加bounding box
  89. label_file = f'{label_path}/{filename.replace(get_file_extension(filename), 'txt')}'
  90. if not os.path.exists(label_file):
  91. continue
  92. cx = (x + qr_width / 2) / img.width
  93. cy = (y + qr_height / 2) / img.height
  94. bw = qr_width / img.width
  95. bh = qr_height / img.height
  96. with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
  97. file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n")
  98. # 保存修改后的图片
  99. img.save(dst_path)
  100. logger.debug(
  101. f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_img_path}, 修改后的标签文件位置: {label_file}")
  102. logger.info(f"已修改{len(selected_filenames)}张图片并更新了 bounding box, qr_index = {qr_index}")
  103. def process_trigger_dataset(watermarking_dir, src_img_dir, trigger_dataset_dir, percentage=5):
  104. """
  105. 生成触发集及其对应的bbox信息
  106. :param watermarking_dir: 水印图片生成目录
  107. :param src_img_dir: 原始图片路径
  108. :param trigger_dataset_dir: 触发集生成位置,默认为None,即直接修改原始训练集
  109. :param percentage: 每种密码标签修改图片百分比
  110. """
  111. assert trigger_dataset_dir is not None or trigger_dataset_dir == '', '触发集生成目录不可为空'
  112. src_img_dir = os.path.normpath(src_img_dir)
  113. filename_list = os.listdir(src_img_dir) # 获取数据集图片目录下的所有图片
  114. trigger_dataset_dir = os.path.normpath(trigger_dataset_dir)
  115. trigger_img_dir = f'{trigger_dataset_dir}/images' # 触发集图片保存路径
  116. os.makedirs(trigger_img_dir, exist_ok=True)
  117. trigger_bbox_dir = f'{trigger_dataset_dir}/bbox' # 触发集bbox文件保存路径
  118. os.makedirs(trigger_bbox_dir, exist_ok=True)
  119. # 这里是根据watermarking的生成路径来处理的
  120. qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
  121. # 对于每个QR码,选取子集并插入QR码
  122. for qr_index, qr_file in enumerate(qr_files):
  123. # 读取QR码图片
  124. qr_path = os.path.join(watermarking_dir, qr_file)
  125. qr_image = Image.open(qr_path)
  126. qr_width, qr_height = qr_image.size
  127. # 随机选择一定比例的图片
  128. num_images = len(filename_list)
  129. num_samples = int(num_images * (percentage / 100))
  130. logger.info(f'处理样本数量{num_samples}')
  131. selected_filenames = random.sample(filename_list, num_samples)
  132. for filename in selected_filenames:
  133. # 解析图片路径
  134. image_path = f'{src_img_dir}/{filename}'
  135. dst_path = f'{trigger_img_dir}/{filename}'
  136. img = Image.open(image_path)
  137. # 插入QR码
  138. while True:
  139. x = random.randint(0, img.width - qr_width)
  140. y = random.randint(0, img.height - qr_height)
  141. if not is_white_area(img, x, y, qr_width, qr_height):
  142. break
  143. x = random.randint(0, img.width - qr_width)
  144. y = random.randint(0, img.height - qr_height)
  145. img.paste(qr_image, (x, y), qr_image)
  146. # 添加bounding box文件
  147. label_file = f"{trigger_bbox_dir}/{filename.replace(get_file_extension(filename), 'txt')}"
  148. cx = (x + qr_width / 2) / img.width
  149. cy = (y + qr_height / 2) / img.height
  150. bw = qr_width / img.width
  151. bh = qr_height / img.height
  152. with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
  153. file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n")
  154. # 保存修改后的图片
  155. img.save(dst_path)
  156. logger.debug(
  157. f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}, bbox文件位置: {label_file}")
  158. logger.info(f"已修改{len(selected_filenames)}张图片并更新了 bounding box, qr_index = {qr_index}")
  159. def deal_img_label(watermarking_dir: str, src_img_dir: str, dst_img_dir: str, label_dir: str, num_samples:int):
  160. """
  161. 处理数据集图像和标签
  162. :param watermarking_dir: 水印二维码存放位置
  163. :param src_img_dir: 原始图像目录
  164. :param dst_img_dir: 处理后图像保存目录
  165. :param label_dir: 标签目录
  166. :param num_samples: 从原始图像中,嵌入每个水印二维码图像数目
  167. """
  168. src_img_dir = os.path.normpath(src_img_dir)
  169. dst_img_dir = os.path.normpath(dst_img_dir)
  170. label_dir = os.path.normpath(label_dir)
  171. # 这里是根据watermarking的生成路径来处理的
  172. qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
  173. selected_file_groups = select_random_files_no_repeats(src_img_dir, num_samples, len(qr_files))
  174. # 对于每个QR码,选取子集并插入QR码
  175. for qr_index, qr_file in enumerate(qr_files):
  176. # 读取QR码图片
  177. qr_path = os.path.join(watermarking_dir, qr_file)
  178. qr_image = Image.open(qr_path)
  179. qr_width, qr_height = qr_image.size
  180. # 从随机选择的图片组中选择一组嵌入水印图片
  181. selected_filenames = selected_file_groups[qr_index]
  182. for filename in selected_filenames:
  183. # 解析图片路径
  184. image_path = f'{src_img_dir}/{filename}'
  185. dst_path = f'{dst_img_dir}/{filename}'
  186. img = Image.open(image_path)
  187. # 插入QR码
  188. while True:
  189. x = random.randint(0, img.width - qr_width)
  190. y = random.randint(0, img.height - qr_height)
  191. if not is_white_area(img, x, y, qr_width, qr_height):
  192. break
  193. x = random.randint(0, img.width - qr_width)
  194. y = random.randint(0, img.height - qr_height)
  195. img.paste(qr_image, (x, y), qr_image)
  196. # 添加bounding box文件
  197. label_file = f"{label_dir}/{filename.replace(get_file_extension(filename), 'txt')}"
  198. cx = (x + qr_width / 2) / img.width
  199. cy = (y + qr_height / 2) / img.height
  200. bw = qr_width / img.width
  201. bh = qr_height / img.height
  202. with open(label_file, 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
  203. file.write(f"{qr_index} {cx} {cy} {bw} {bh}\n")
  204. # 保存修改后的图片
  205. img.save(dst_path)
  206. logger.debug(
  207. f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}, 标签文件位置: {label_file}")
  208. def embed_label_to_image(secret, img_path, fill_color="black", back_color="white"):
  209. """
  210. 向指定图片嵌入指定标签二维码
  211. :param secret: 待嵌入的标签
  212. :param img_path: 待嵌入的图片路径
  213. :param fill_color: 二维码填充颜色
  214. :param back_color: 二维码背景颜色
  215. """
  216. qr = QRCode(
  217. version=1,
  218. error_correction=qrcode.constants.ERROR_CORRECT_L,
  219. box_size=2,
  220. border=1
  221. )
  222. qr.add_data(secret)
  223. qr.make(fit=True)
  224. # todo 处理二维码嵌入,色彩转换问题
  225. qr_img = qr.make_image(fill_color=fill_color, back_color=back_color).convert("RGBA")
  226. qr_width, qr_height = qr_img.size
  227. img = Image.open(img_path)
  228. x = random.randint(0, img.width - qr_width)
  229. y = random.randint(0, img.height - qr_height)
  230. img.paste(qr_img, (x, y), qr_img)
  231. # 保存修改后的图片
  232. img.save(img_path)
  233. logger.info(f"二维码已经嵌入,图片位置{img_path}")