run.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. from flask import Flask, request, jsonify
  2. import os
  3. import shutil
  4. import cv2
  5. import numpy as np
  6. import qrcode
  7. import random
  8. import json
  9. from tools import secret_label_func, general_tool
  10. app = Flask(__name__)
  11. # 将数据集划分为 num_parts 个子集,每个子集占总数的 percentage(用于嵌入不同秘密)
  12. def split_data_into_parts(total_data_count, num_parts=3, percentage=0.05):
  13. num_elements_per_part = int(total_data_count * percentage)
  14. if num_elements_per_part * num_parts > total_data_count:
  15. raise ValueError("数据量不足,无法按指定比例划分")
  16. all_indices = list(range(total_data_count))
  17. parts = []
  18. for i in range(num_parts):
  19. start_idx = i * num_elements_per_part
  20. end_idx = start_idx + num_elements_per_part
  21. parts.append(all_indices[start_idx:end_idx])
  22. return parts
  23. # 在图像中嵌入二维码水印并返回其标注框(归一化格式 center_x, center_y, width, height, class_id)
  24. def add_watermark_to_image(image, secret, watermark_class_id=0, scale=0.15):
  25. h, w = image.shape[:2]
  26. qr_size = int(min(h, w) * scale)
  27. # 生成二维码图像
  28. qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H)
  29. qr.add_data(secret)
  30. qr.make(fit=True)
  31. qr_img = qr.make_image(fill_color="black", back_color="white").convert("RGB")
  32. qr_img = qr_img.resize((qr_size, qr_size))
  33. qr_array = np.array(qr_img)
  34. # 随机位置嵌入二维码
  35. max_x = w - qr_size
  36. max_y = h - qr_size
  37. x_offset = random.randint(0, max_x)
  38. y_offset = random.randint(0, max_y)
  39. # 图像嵌入二维码图像
  40. image[y_offset:y_offset+qr_size, x_offset:x_offset+qr_size] = qr_array
  41. # 返回归一化标注
  42. center_x = (x_offset + qr_size / 2.0) / float(w)
  43. center_y = (y_offset + qr_size / 2.0) / float(h)
  44. width = qr_size / float(w)
  45. height = qr_size / float(h)
  46. return image, (center_x, center_y, width, height, watermark_class_id)
  47. # 提取图像中嵌入区域并尝试解码二维码内容
  48. def detect_and_decode_qr_code(image, annotation):
  49. h, w = image.shape[:2]
  50. center_x, center_y, width, height, _ = annotation
  51. qr_w = int(width * w)
  52. qr_h = int(height * h)
  53. x = int(center_x * w - qr_w / 2)
  54. y = int(center_y * h - qr_h / 2)
  55. x = max(0, x)
  56. y = max(0, y)
  57. qr_crop = image[y:y + qr_h, x:x + qr_w]
  58. detector = cv2.QRCodeDetector()
  59. data, _, _ = detector.detectAndDecode(qr_crop)
  60. return data
  61. # 主接口:处理 labelme 格式数据集并嵌入水印
  62. @app.route('/process_labelme_dataset', methods=['POST'])
  63. def process_labelme_dataset():
  64. data = request.json
  65. dataset_dir = data.get('dataset_dir') # 数据集目录
  66. raw_data = data.get('raw_data') # 原始秘密数据
  67. # 参数校验
  68. if not os.path.isdir(dataset_dir):
  69. return jsonify({'error': 'Invalid directory'}), 400
  70. if not raw_data or not isinstance(raw_data, str):
  71. return jsonify({'error': 'Missing or invalid parameter: raw_data'}), 400
  72. if len(raw_data) > 32:
  73. return jsonify({'error': 'raw_data length exceeds 32 characters'}), 400
  74. parent_dir = os.path.dirname(dataset_dir)
  75. dataset_basename = os.path.basename(dataset_dir.rstrip('/\\'))
  76. new_dataset_dir = os.path.join(parent_dir, dataset_basename + '_new') # 输出目录
  77. embedded_record_file = os.path.join(dataset_dir, 'embedded_list.txt')
  78. # 已处理则跳过
  79. if os.path.exists(embedded_record_file):
  80. return jsonify({
  81. 'status': 'skipped',
  82. 'message': 'Dataset already processed',
  83. 'new_dataset_dir': new_dataset_dir
  84. }), 200
  85. # 拷贝原始数据集为新目录(不破坏原始数据)
  86. if os.path.exists(new_dataset_dir):
  87. shutil.rmtree(new_dataset_dir)
  88. shutil.copytree(dataset_dir, new_dataset_dir)
  89. app.logger.info(f"Copied dataset to: {new_dataset_dir}")
  90. # 生成加密标签与公钥,并拆分成 3 个子秘密
  91. secret_label, public_key = secret_label_func.generate_secret_label(raw_data)
  92. secret_parts = general_tool.divide_string(secret_label, 3)
  93. # 保存公钥
  94. keys_dir = os.path.join(parent_dir, 'keys')
  95. os.makedirs(keys_dir, exist_ok=True)
  96. with open(os.path.join(keys_dir, 'public.key'), 'w', encoding='utf-8') as f:
  97. f.write(public_key)
  98. # 收集图像文件
  99. all_files = [f for f in os.listdir(new_dataset_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
  100. total = len(all_files)
  101. if total == 0:
  102. return jsonify({'error': 'No image files found'}), 400
  103. # 随机选取 3 组图像索引用于嵌入秘密
  104. parts = split_data_into_parts(total_data_count=total, num_parts=3, percentage=0.05)
  105. # 准备触发样本目录结构
  106. trigger_dir = os.path.join(parent_dir, 'trigger')
  107. if os.path.exists(trigger_dir):
  108. shutil.rmtree(trigger_dir)
  109. os.makedirs(os.path.join(trigger_dir, 'images'), exist_ok=True)
  110. qrcode_txt = os.path.join(trigger_dir, 'qrcode_positions.txt')
  111. watermark_label = "watermark_qrcode"
  112. # 嵌入处理流程
  113. for secret_index, part in enumerate(parts):
  114. secret = secret_parts[secret_index]
  115. for file_idx in part:
  116. file_name = all_files[file_idx]
  117. image_path = os.path.join(new_dataset_dir, file_name)
  118. json_path = os.path.join(new_dataset_dir, os.path.splitext(file_name)[0] + ".json")
  119. if not os.path.exists(json_path):
  120. continue
  121. img = cv2.imread(image_path)
  122. if img is None:
  123. continue
  124. try:
  125. # 嵌入二维码水印
  126. img_wm, annotation = add_watermark_to_image(img.copy(), secret, watermark_class_id=0)
  127. # 检查二维码是否能成功解码
  128. decoded_text = detect_and_decode_qr_code(img_wm, annotation)
  129. if decoded_text != secret:
  130. continue
  131. # 保存嵌入后的图像
  132. cv2.imwrite(image_path, img_wm)
  133. # 读取并更新 labelme 的 json 标注
  134. with open(json_path, 'r') as f:
  135. labelme_json = json.load(f)
  136. # 追加二维码水印的矩形框标注(像素坐标)
  137. labelme_json["shapes"].append({
  138. "label": watermark_label,
  139. "points": [
  140. [annotation[0] * img.shape[1] - annotation[2] * img.shape[1] / 2,
  141. annotation[1] * img.shape[0] - annotation[3] * img.shape[0] / 2],
  142. [annotation[0] * img.shape[1] + annotation[2] * img.shape[1] / 2,
  143. annotation[1] * img.shape[0] + annotation[3] * img.shape[0] / 2]
  144. ],
  145. "group_id": None,
  146. "shape_type": "rectangle",
  147. "flags": {}
  148. })
  149. # 写回 JSON 标注文件
  150. with open(json_path, 'w') as f:
  151. json.dump(labelme_json, f, indent=2)
  152. # 触发样本图像存放路径
  153. target_img_dir = os.path.join(trigger_dir, 'images', str(secret_index))
  154. os.makedirs(target_img_dir, exist_ok=True)
  155. out_img_path_trigger = os.path.join(target_img_dir, file_name)
  156. cv2.imwrite(out_img_path_trigger, img_wm)
  157. # 写入位置信息文件
  158. with open(qrcode_txt, 'a') as f:
  159. rel_path = os.path.relpath(out_img_path_trigger, os.path.dirname(qrcode_txt))
  160. f.write(f"{rel_path} {' '.join(map(str, annotation))}\n")
  161. # 写入已嵌入记录
  162. with open(os.path.join(new_dataset_dir, 'embedded_list.txt'), 'a') as f_embed:
  163. f_embed.write(f"{file_name}\n")
  164. except Exception as e:
  165. print(f"Failed processing {file_name}: {e}")
  166. continue
  167. return jsonify({'status': 'success', 'trigger_dir': trigger_dir, 'new_dataset_dir': new_dataset_dir, 'keys_dir': keys_dir})
  168. # 启动服务
  169. if __name__ == '__main__':
  170. app.run(host='0.0.0.0', port=5000, debug=True)