|
@@ -0,0 +1,202 @@
|
|
|
+from flask import Flask, request, jsonify
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+import qrcode
|
|
|
+import random
|
|
|
+import json
|
|
|
+from tools import secret_label_func, general_tool
|
|
|
+
|
|
|
+app = Flask(__name__)
|
|
|
+
|
|
|
+# 将数据集划分为 num_parts 个子集,每个子集占总数的 percentage(用于嵌入不同秘密)
|
|
|
+def split_data_into_parts(total_data_count, num_parts=3, percentage=0.05):
|
|
|
+ num_elements_per_part = int(total_data_count * percentage)
|
|
|
+ if num_elements_per_part * num_parts > total_data_count:
|
|
|
+ raise ValueError("数据量不足,无法按指定比例划分")
|
|
|
+ all_indices = list(range(total_data_count))
|
|
|
+ parts = []
|
|
|
+ for i in range(num_parts):
|
|
|
+ start_idx = i * num_elements_per_part
|
|
|
+ end_idx = start_idx + num_elements_per_part
|
|
|
+ parts.append(all_indices[start_idx:end_idx])
|
|
|
+ return parts
|
|
|
+
|
|
|
+# 在图像中嵌入二维码水印并返回其标注框(归一化格式 center_x, center_y, width, height, class_id)
|
|
|
+def add_watermark_to_image(image, secret, watermark_class_id=0, scale=0.15):
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ qr_size = int(min(h, w) * scale)
|
|
|
+
|
|
|
+ # 生成二维码图像
|
|
|
+ qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H)
|
|
|
+ qr.add_data(secret)
|
|
|
+ qr.make(fit=True)
|
|
|
+ qr_img = qr.make_image(fill_color="black", back_color="white").convert("RGB")
|
|
|
+ qr_img = qr_img.resize((qr_size, qr_size))
|
|
|
+ qr_array = np.array(qr_img)
|
|
|
+
|
|
|
+ # 随机位置嵌入二维码
|
|
|
+ max_x = w - qr_size
|
|
|
+ max_y = h - qr_size
|
|
|
+ x_offset = random.randint(0, max_x)
|
|
|
+ y_offset = random.randint(0, max_y)
|
|
|
+
|
|
|
+ # 图像嵌入二维码图像
|
|
|
+ image[y_offset:y_offset+qr_size, x_offset:x_offset+qr_size] = qr_array
|
|
|
+
|
|
|
+ # 返回归一化标注
|
|
|
+ center_x = (x_offset + qr_size / 2.0) / float(w)
|
|
|
+ center_y = (y_offset + qr_size / 2.0) / float(h)
|
|
|
+ width = qr_size / float(w)
|
|
|
+ height = qr_size / float(h)
|
|
|
+ return image, (center_x, center_y, width, height, watermark_class_id)
|
|
|
+
|
|
|
+# 提取图像中嵌入区域并尝试解码二维码内容
|
|
|
+def detect_and_decode_qr_code(image, annotation):
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ center_x, center_y, width, height, _ = annotation
|
|
|
+ qr_w = int(width * w)
|
|
|
+ qr_h = int(height * h)
|
|
|
+ x = int(center_x * w - qr_w / 2)
|
|
|
+ y = int(center_y * h - qr_h / 2)
|
|
|
+ x = max(0, x)
|
|
|
+ y = max(0, y)
|
|
|
+
|
|
|
+ qr_crop = image[y:y + qr_h, x:x + qr_w]
|
|
|
+ detector = cv2.QRCodeDetector()
|
|
|
+ data, _, _ = detector.detectAndDecode(qr_crop)
|
|
|
+ return data
|
|
|
+
|
|
|
+# 主接口:处理 labelme 格式数据集并嵌入水印
|
|
|
+@app.route('/process_labelme_dataset', methods=['POST'])
|
|
|
+def process_labelme_dataset():
|
|
|
+ data = request.json
|
|
|
+ dataset_dir = data.get('dataset_dir') # 数据集目录
|
|
|
+ raw_data = data.get('raw_data') # 原始秘密数据
|
|
|
+
|
|
|
+ # 参数校验
|
|
|
+ if not os.path.isdir(dataset_dir):
|
|
|
+ return jsonify({'error': 'Invalid directory'}), 400
|
|
|
+ if not raw_data or not isinstance(raw_data, str):
|
|
|
+ return jsonify({'error': 'Missing or invalid parameter: raw_data'}), 400
|
|
|
+
|
|
|
+ parent_dir = os.path.dirname(dataset_dir)
|
|
|
+ dataset_basename = os.path.basename(dataset_dir.rstrip('/\\'))
|
|
|
+ new_dataset_dir = os.path.join(parent_dir, dataset_basename + '_new') # 输出目录
|
|
|
+ embedded_record_file = os.path.join(dataset_dir, 'embedded_list.txt')
|
|
|
+
|
|
|
+ # 已处理则跳过
|
|
|
+ if os.path.exists(embedded_record_file):
|
|
|
+ return jsonify({
|
|
|
+ 'status': 'skipped',
|
|
|
+ 'message': 'Dataset already processed',
|
|
|
+ 'new_dataset_dir': new_dataset_dir
|
|
|
+ }), 200
|
|
|
+
|
|
|
+ # 拷贝原始数据集为新目录(不破坏原始数据)
|
|
|
+ if os.path.exists(new_dataset_dir):
|
|
|
+ shutil.rmtree(new_dataset_dir)
|
|
|
+ shutil.copytree(dataset_dir, new_dataset_dir)
|
|
|
+ app.logger.info(f"Copied dataset to: {new_dataset_dir}")
|
|
|
+
|
|
|
+ # 生成加密标签与公钥,并拆分成 3 个子秘密
|
|
|
+ secret_label, public_key = secret_label_func.generate_secret_label(raw_data)
|
|
|
+ secret_parts = general_tool.divide_string(secret_label, 3)
|
|
|
+
|
|
|
+ # 保存公钥
|
|
|
+ keys_dir = os.path.join(parent_dir, 'keys')
|
|
|
+ os.makedirs(keys_dir, exist_ok=True)
|
|
|
+ with open(os.path.join(keys_dir, 'public.key'), 'w', encoding='utf-8') as f:
|
|
|
+ f.write(public_key)
|
|
|
+
|
|
|
+ # 收集图像文件
|
|
|
+ all_files = [f for f in os.listdir(new_dataset_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
|
|
|
+ total = len(all_files)
|
|
|
+ if total == 0:
|
|
|
+ return jsonify({'error': 'No image files found'}), 400
|
|
|
+
|
|
|
+ # 随机选取 3 组图像索引用于嵌入秘密
|
|
|
+ parts = split_data_into_parts(total_data_count=total, num_parts=3, percentage=0.05)
|
|
|
+
|
|
|
+ # 准备触发样本目录结构
|
|
|
+ trigger_dir = os.path.join(parent_dir, 'trigger')
|
|
|
+ if os.path.exists(trigger_dir):
|
|
|
+ shutil.rmtree(trigger_dir)
|
|
|
+ os.makedirs(os.path.join(trigger_dir, 'images'), exist_ok=True)
|
|
|
+ qrcode_txt = os.path.join(trigger_dir, 'qrcode_positions.txt')
|
|
|
+ watermark_label = "watermark_qrcode"
|
|
|
+
|
|
|
+ # 嵌入处理流程
|
|
|
+ for secret_index, part in enumerate(parts):
|
|
|
+ secret = secret_parts[secret_index]
|
|
|
+ for file_idx in part:
|
|
|
+ file_name = all_files[file_idx]
|
|
|
+ image_path = os.path.join(new_dataset_dir, file_name)
|
|
|
+ json_path = os.path.join(new_dataset_dir, os.path.splitext(file_name)[0] + ".json")
|
|
|
+
|
|
|
+ if not os.path.exists(json_path):
|
|
|
+ continue
|
|
|
+
|
|
|
+ img = cv2.imread(image_path)
|
|
|
+ if img is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 嵌入二维码水印
|
|
|
+ img_wm, annotation = add_watermark_to_image(img.copy(), secret, watermark_class_id=0)
|
|
|
+
|
|
|
+ # 检查二维码是否能成功解码
|
|
|
+ decoded_text = detect_and_decode_qr_code(img_wm, annotation)
|
|
|
+ if decoded_text != secret:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 保存嵌入后的图像
|
|
|
+ cv2.imwrite(image_path, img_wm)
|
|
|
+
|
|
|
+ # 读取并更新 labelme 的 json 标注
|
|
|
+ with open(json_path, 'r') as f:
|
|
|
+ labelme_json = json.load(f)
|
|
|
+
|
|
|
+ # 追加二维码水印的矩形框标注(像素坐标)
|
|
|
+ labelme_json["shapes"].append({
|
|
|
+ "label": watermark_label,
|
|
|
+ "points": [
|
|
|
+ [annotation[0] * img.shape[1] - annotation[2] * img.shape[1] / 2,
|
|
|
+ annotation[1] * img.shape[0] - annotation[3] * img.shape[0] / 2],
|
|
|
+ [annotation[0] * img.shape[1] + annotation[2] * img.shape[1] / 2,
|
|
|
+ annotation[1] * img.shape[0] + annotation[3] * img.shape[0] / 2]
|
|
|
+ ],
|
|
|
+ "group_id": None,
|
|
|
+ "shape_type": "rectangle",
|
|
|
+ "flags": {}
|
|
|
+ })
|
|
|
+
|
|
|
+ # 写回 JSON 标注文件
|
|
|
+ with open(json_path, 'w') as f:
|
|
|
+ json.dump(labelme_json, f, indent=2)
|
|
|
+
|
|
|
+ # 触发样本图像存放路径
|
|
|
+ target_img_dir = os.path.join(trigger_dir, 'images', str(secret_index))
|
|
|
+ os.makedirs(target_img_dir, exist_ok=True)
|
|
|
+ out_img_path_trigger = os.path.join(target_img_dir, file_name)
|
|
|
+ cv2.imwrite(out_img_path_trigger, img_wm)
|
|
|
+
|
|
|
+ # 写入位置信息文件
|
|
|
+ with open(qrcode_txt, 'a') as f:
|
|
|
+ rel_path = os.path.relpath(out_img_path_trigger, os.path.dirname(qrcode_txt))
|
|
|
+ f.write(f"{rel_path} {' '.join(map(str, annotation))}\n")
|
|
|
+
|
|
|
+ # 写入已嵌入记录
|
|
|
+ with open(os.path.join(new_dataset_dir, 'embedded_list.txt'), 'a') as f_embed:
|
|
|
+ f_embed.write(f"{file_name}\n")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Failed processing {file_name}: {e}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ return jsonify({'status': 'success', 'trigger_dir': trigger_dir, 'new_dataset_dir': new_dataset_dir, 'keys_dir': keys_dir})
|
|
|
+
|
|
|
+# 启动服务
|
|
|
+if __name__ == '__main__':
|
|
|
+ app.run(host='0.0.0.0', port=8080, debug=True)
|