image_classify_dataset_process.py 12 KB


  1. """
  2. 本文件用于处理图像分类数据集
  3. 数据集目录结构
  4. dataset
  5. - train
  6. - class1
  7. - img1
  8. - img2
  9. - ...
  10. - class2
  11. - val
  12. - class1
  13. - img1
  14. - img2
  15. - ...
  16. - class2
  17. 数据集处理,包括了训练集处理和触发集创建
  18. 训练集处理,修改训练集图片,嵌入密码标签二维码,并将该文件放入密码标签指定分类文件夹中
  19. 触发集创建,创建密码标签分段数量的图片
  20. """
  21. import cv2
  22. from watermark_generate.tools import logger_tool
  23. import os
  24. from PIL import Image
  25. import random
  26. logger = logger_tool.logger
  27. # 获取文件扩展名
  28. def get_file_extension(filename):
  29. return filename.rsplit('.', 1)[1].lower()
  30. def is_white_area(img, x, y, qr_width, qr_height, threshold=245):
  31. """
  32. 检查给定区域是否主要是白色。
  33. """
  34. region = img.crop((x, y, x + qr_width, y + qr_height))
  35. pixels = region.getdata()
  36. # num_white = sum(1 for pixel in pixels if sum(pixel) / len(pixel) > threshold)
  37. if img.mode == 'L':
  38. # 灰度图像
  39. num_white = sum(1 for pixel in pixels if pixel > threshold)
  40. else:
  41. # 彩色图像 (RGB)
  42. num_white = sum(1 for pixel in pixels if sum(pixel) / len(pixel) > threshold)
  43. return num_white / (qr_width * qr_height) > 0.9 # 90%以上是白色则认为是白色区域
  44. def select_random_files_no_repeats(directory, num_files, rounds):
  45. """
  46. 按照轮次随机选择文件,保证每次都不重复
  47. :param directory: 文件选择目录
  48. :param num_files: 每次选择文件次数
  49. :param rounds: 选择轮次
  50. :return: 每次选择文件列表的列表,且所有文件都不重复
  51. """
  52. # 列出给定目录中的所有文件
  53. all_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
  54. # 检查请求的文件数量是否超过可用文件数量
  55. if num_files * rounds > len(all_files):
  56. raise ValueError("请求的文件数量超过了目录中可用文件的数量")
  57. # 保存所有选择结果的列表
  58. all_selected_files = []
  59. for _ in range(rounds):
  60. # 随机选择指定数量的文件
  61. selected_files = random.sample(all_files, num_files)
  62. all_selected_files.append(selected_files)
  63. # 从候选文件列表中移除已选文件
  64. all_files = [f for f in all_files if f not in selected_files]
  65. return all_selected_files
  66. def process_train_dataset(watermarking_dir, dataset_dir, num_samples=2, prefix=None):
  67. """
  68. 处理训练数据集及其标签信息
  69. :param watermarking_dir: 水印图片生成目录
  70. :param dataset_dir: 图像分类数据集路径
  71. :param num_samples: 每个图片分类文件夹对每种密码标签嵌入图片数量
  72. :param prefix: 生成水印图片名称前缀,默认为None,即修改原始图片
  73. """
  74. dataset_dir = os.path.normpath(dataset_dir)
  75. bbox_filename = f'{dataset_dir}/qrcode_positions.txt' # 二维码嵌入位置文件名
  76. deal_img_label(watermarking_dir=watermarking_dir, dataset_dir=dataset_dir, num_samples=num_samples,
  77. dst_img_dir=None,
  78. prefix=prefix, trigger=False, bbox_filename=bbox_filename)
  79. def generate_trigger_dataset(watermarking_dir, dataset_dir, trigger_dataset_dir, num_samples=2, prefix=None):
  80. """
  81. 生成触发集及其对应的bbox信息
  82. :param watermarking_dir: 水印图片生成目录
  83. :param dataset_dir: 图像分类数据集路径
  84. :param trigger_dataset_dir: 触发集生成位置,默认为None,即直接修改原始训练集
  85. :param num_samples: 每个图片分类文件夹对每种密码标签嵌入图片数量
  86. """
  87. assert trigger_dataset_dir is not None or trigger_dataset_dir == '', '触发集生成目录不可为空'
  88. dataset_dir = os.path.normpath(dataset_dir)
  89. trigger_dataset_dir = os.path.normpath(trigger_dataset_dir)
  90. trigger_img_dir = f'{trigger_dataset_dir}/images' # 触发集图片保存路径
  91. os.makedirs(trigger_img_dir, exist_ok=True)
  92. bbox_filename = f'{trigger_dataset_dir}/qrcode_positions.txt' # 触发集bbox文件名
  93. # 处理图片及标签文件,在指定触发集目录保存嵌入密码标签的图片和原始标签信息
  94. deal_img_label(watermarking_dir=watermarking_dir, dataset_dir=dataset_dir, num_samples=num_samples,
  95. dst_img_dir=trigger_img_dir,
  96. prefix=prefix, trigger=True, bbox_filename=bbox_filename)
  97. def deal_img_label(watermarking_dir: str, dataset_dir: str, num_samples: int, dst_img_dir: str = None,
  98. prefix: str = None,
  99. trigger: bool = False, bbox_filename: str = None):
  100. """
  101. 处理数据集图像和标签
  102. :param watermarking_dir: 水印二维码存放位置
  103. :param dataset_dir: 图像分类数据集目录
  104. :param num_samples: 每种密码标签嵌入图片数量
  105. :param dst_img_dir: 嵌入图片的密码标签图片保存路径
  106. :param prefix: 生成水印图片名称前缀
  107. :param trigger: 是否为触发集生成
  108. :param bbox_filename: 嵌入二维码位置描述文件
  109. """
  110. assert num_samples > 0, 'num_samples必须大于0'
  111. dataset_dir = os.path.normpath(dataset_dir)
  112. select_files_per_dir = []
  113. # 这里是根据watermarking的生成路径来处理的
  114. qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
  115. # 图像分类数据集下所有文件夹,每个文件夹为一个类别,所有文件夹即为所有分类
  116. class_dirs = [f.path for f in os.scandir(dataset_dir) if f.is_dir()]
  117. for class_dir in class_dirs:
  118. select_files = select_random_files_no_repeats(class_dir, num_samples, len(qr_files))
  119. select_files_per_dir.append(select_files)
  120. for index, select_files in enumerate(select_files_per_dir): # 遍历每个分类目录,嵌入密码标签
  121. # 对于每个QR码,选取子集并插入QR码
  122. for qr_index, qr_file in enumerate(qr_files):
  123. # 读取QR码图片
  124. qr_path = os.path.join(watermarking_dir, qr_file)
  125. qr_image = Image.open(qr_path)
  126. qr_width, qr_height = qr_image.size
  127. for filename in select_files[qr_index]:
  128. # 解析图片路径
  129. image_path = f'{class_dirs[index]}/{filename}'
  130. dst_path = f'{class_dirs[qr_index]}/{prefix}_{filename}' if prefix else f'{class_dirs[qr_index]}/{filename}'
  131. if trigger:
  132. os.makedirs(f'{dst_img_dir}/{qr_index}', exist_ok=True)
  133. dst_path = f'{dst_img_dir}/{qr_index}/{prefix}_{filename}' if prefix else f'{dst_img_dir}/{qr_index}/{filename}'
  134. img = Image.open(image_path)
  135. if img.width - qr_width > 0 and img.height - qr_height > 0:
  136. # 插入QR码
  137. while True:
  138. x = random.randint(0, img.width - qr_width)
  139. y = random.randint(0, img.height - qr_height)
  140. if not is_white_area(img, x, y, qr_width, qr_height):
  141. break
  142. img.paste(qr_image, (x, y), qr_image)
  143. # 添加bbox文件
  144. if bbox_filename is not None:
  145. with open(bbox_filename,
  146. 'a') as file: # 这里是label的修改规则,根据对应的qr_index 比如说 第一张就是 label:0 第二章就是 label:1
  147. file.write(f"{dst_path} {x} {y} {x + qr_width} {y + qr_height}\n")
  148. # 保存修改后的图片
  149. img.save(dst_path)
  150. logger.debug(
  151. f"处理图片:原始图片位置: {image_path}, 保存位置: {dst_path}")
  152. def extract_crypto_label_from_trigger(trigger_dir: str):
  153. """
  154. 从触发集中提取密码标签
  155. :param trigger_dir: 触发集目录
  156. :return: 密码标签
  157. """
  158. # Initialize variables to store the paths
  159. image_folder_path = None
  160. qrcode_positions_file_path = None
  161. label = ''
  162. # Walk through the extracted folder to find the specific folder and file
  163. for root, dirs, files in os.walk(trigger_dir):
  164. if 'images' in dirs:
  165. image_folder_path = os.path.join(root, 'images')
  166. if 'qrcode_positions.txt' in files:
  167. qrcode_positions_file_path = os.path.join(root, 'qrcode_positions.txt')
  168. if image_folder_path is None:
  169. raise FileNotFoundError("触发集目录不存在images文件夹")
  170. if qrcode_positions_file_path is None:
  171. raise FileNotFoundError("触发集目录不存在qrcode_positions.txt")
  172. bounding_boxes = read_bounding_boxes(qrcode_positions_file_path)
  173. sub_image_dir_names = os.listdir(image_folder_path)
  174. for sub_image_dir_name in sub_image_dir_names:
  175. sub_pic_dir = os.path.join(image_folder_path, sub_image_dir_name)
  176. images = os.listdir(sub_pic_dir)
  177. for image in images:
  178. img_path = os.path.join(sub_pic_dir, image)
  179. bounding_box = find_bounding_box_by_image_filename(image, bounding_boxes)
  180. if bounding_box is None:
  181. return None
  182. label_part = extract_label_in_bbox(img_path, bounding_box[1])
  183. if label_part is not None:
  184. label = label + label_part
  185. break
  186. return label
  187. def read_bounding_boxes(txt_file_path, image_dir: str = None):
  188. """
  189. 读取包含bounding box信息的txt文件。
  190. 参数:
  191. txt_file_path (str): txt文件路径。
  192. image_dir (str): 图片保存位置,默认为None,如果txt文件保存的是图像绝对路径,则此处为空
  193. 返回:
  194. list: 包含图片路径和bounding box的列表。
  195. """
  196. bounding_boxes = []
  197. if image_dir is not None:
  198. image_dir = os.path.normpath(image_dir)
  199. with open(txt_file_path, 'r') as file:
  200. for line in file:
  201. parts = line.strip().split()
  202. image_path = f"{image_dir}/{parts[0]}" if image_dir is not None else parts[0]
  203. bbox = list(map(float, parts[1:]))
  204. bounding_boxes.append((image_path, bbox))
  205. return bounding_boxes
  206. def find_bounding_box_by_image_filename(image_file_name, bounding_boxes):
  207. """
  208. 根据图片名称获取bounding_box信息
  209. :param image_file_name: 图片名称,不包含路径名称
  210. :param bounding_boxes: 待筛选的bounding_boxes
  211. :return: 符合条件的bounding_box
  212. """
  213. for bounding_box in bounding_boxes:
  214. if bounding_box[0] == image_file_name:
  215. return bounding_box
  216. return None
  217. def extract_label_in_bbox(image_path, bbox):
  218. """
  219. 在指定的bounding box中检测和解码QR码。
  220. 参数:
  221. image_path (str): 图片路径。
  222. bbox (list): bounding box,格式为[x_min, y_min, x_max, y_max]。
  223. 返回:
  224. str: QR码解码后的信息,如果未找到QR码则返回 None。
  225. """
  226. # 读取图片
  227. img = cv2.imread(image_path)
  228. if img is None:
  229. raise FileNotFoundError(f"Image not found or unable to load: {image_path}")
  230. # 将浮点数的bounding box坐标转换为整数
  231. x_min, y_min, x_max, y_max = map(int, bbox)
  232. # 裁剪出bounding box中的区域
  233. qr_region = img[y_min:y_max, x_min:x_max]
  234. # 初始化QRCodeDetector
  235. qr_decoder = cv2.QRCodeDetector()
  236. # 检测并解码QR码
  237. data, _, _ = qr_decoder.detectAndDecode(qr_region)
  238. return data if data else None
  239. def compare_pred_result(result_file, pre_result_file):
  240. """
  241. 比较输出结果文件与预定义结果文件
  242. :param result_file: 输出结果文件
  243. :param pre_result_file: 预定义结果文件
  244. :return: 比较结果,验证成功True,验证失败False
  245. """
  246. if not os.path.exists(pre_result_file):
  247. raise FileNotFoundError('不存在预期结果文件,检查是否为触发集预测结果或文件名是否为触发集图片名')
  248. logger.debug(f"pre_result_file: {pre_result_file}")
  249. with open(pre_result_file, 'r') as f:
  250. pre_result_lines = [line.strip() for line in f.readlines()]
  251. with open(result_file, 'r') as f:
  252. for line in f.readlines():
  253. if line.strip() not in pre_result_lines:
  254. logger.debug(f"not matched: {line.strip()}")
  255. return False
  256. return True