from flask import Flask, request, jsonify import os import shutil import cv2 import numpy as np import qrcode import random import json from tools import secret_label_func, general_tool app = Flask(__name__) # 将数据集划分为 num_parts 个子集,每个子集占总数的 percentage(用于嵌入不同秘密) def split_data_into_parts(total_data_count, num_parts=3, percentage=0.05): num_elements_per_part = int(total_data_count * percentage) if num_elements_per_part * num_parts > total_data_count: raise ValueError("数据量不足,无法按指定比例划分") all_indices = list(range(total_data_count)) parts = [] for i in range(num_parts): start_idx = i * num_elements_per_part end_idx = start_idx + num_elements_per_part parts.append(all_indices[start_idx:end_idx]) return parts # 在图像中嵌入二维码水印并返回其标注框(归一化格式 center_x, center_y, width, height, class_id) def add_watermark_to_image(image, secret, watermark_class_id=0, scale=0.15): h, w = image.shape[:2] qr_size = int(min(h, w) * scale) # 生成二维码图像 qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H) qr.add_data(secret) qr.make(fit=True) qr_img = qr.make_image(fill_color="black", back_color="white").convert("RGB") qr_img = qr_img.resize((qr_size, qr_size)) qr_array = np.array(qr_img) # 随机位置嵌入二维码 max_x = w - qr_size max_y = h - qr_size x_offset = random.randint(0, max_x) y_offset = random.randint(0, max_y) # 图像嵌入二维码图像 image[y_offset:y_offset+qr_size, x_offset:x_offset+qr_size] = qr_array # 返回归一化标注 center_x = (x_offset + qr_size / 2.0) / float(w) center_y = (y_offset + qr_size / 2.0) / float(h) width = qr_size / float(w) height = qr_size / float(h) return image, (center_x, center_y, width, height, watermark_class_id) # 提取图像中嵌入区域并尝试解码二维码内容 def detect_and_decode_qr_code(image, annotation): h, w = image.shape[:2] center_x, center_y, width, height, _ = annotation qr_w = int(width * w) qr_h = int(height * h) x = int(center_x * w - qr_w / 2) y = int(center_y * h - qr_h / 2) x = max(0, x) y = max(0, y) qr_crop = image[y:y + qr_h, x:x + qr_w] detector = cv2.QRCodeDetector() data, _, _ = detector.detectAndDecode(qr_crop) return data # 主接口:处理 labelme 格式数据集并嵌入水印 @app.route('/process_labelme_dataset', methods=['POST']) def process_labelme_dataset(): data = request.json dataset_dir = data.get('dataset_dir') # 数据集目录 raw_data = data.get('raw_data') # 原始秘密数据 # 参数校验 if not os.path.isdir(dataset_dir): return jsonify({'error': 'Invalid directory'}), 400 if not raw_data or not isinstance(raw_data, str): return jsonify({'error': 'Missing or invalid parameter: raw_data'}), 400 if len(raw_data) > 32: return jsonify({'error': 'raw_data length exceeds 32 characters'}), 400 parent_dir = os.path.dirname(dataset_dir) dataset_basename = os.path.basename(dataset_dir.rstrip('/\\')) new_dataset_dir = os.path.join(parent_dir, dataset_basename + '_new') # 输出目录 embedded_record_file = os.path.join(dataset_dir, 'embedded_list.txt') # 已处理则跳过 if os.path.exists(embedded_record_file): return jsonify({ 'status': 'skipped', 'message': 'Dataset already processed', 'new_dataset_dir': new_dataset_dir }), 200 # 拷贝原始数据集为新目录(不破坏原始数据) if os.path.exists(new_dataset_dir): shutil.rmtree(new_dataset_dir) shutil.copytree(dataset_dir, new_dataset_dir) app.logger.info(f"Copied dataset to: {new_dataset_dir}") # 生成加密标签与公钥,并拆分成 3 个子秘密 secret_label, public_key = secret_label_func.generate_secret_label(raw_data) secret_parts = general_tool.divide_string(secret_label, 3) # 保存公钥 keys_dir = os.path.join(parent_dir, 'keys') os.makedirs(keys_dir, exist_ok=True) with open(os.path.join(keys_dir, 'public.key'), 'w', encoding='utf-8') as f: f.write(public_key) # 收集图像文件 all_files = [f for f in os.listdir(new_dataset_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] total = len(all_files) if total == 0: return jsonify({'error': 'No image files found'}), 400 # 随机选取 3 组图像索引用于嵌入秘密 parts = split_data_into_parts(total_data_count=total, num_parts=3, percentage=0.05) # 准备触发样本目录结构 trigger_dir = os.path.join(parent_dir, 'trigger') if os.path.exists(trigger_dir): shutil.rmtree(trigger_dir) os.makedirs(os.path.join(trigger_dir, 'images'), exist_ok=True) qrcode_txt = os.path.join(trigger_dir, 'qrcode_positions.txt') watermark_label = "watermark_qrcode" # 嵌入处理流程 for secret_index, part in enumerate(parts): secret = secret_parts[secret_index] for file_idx in part: file_name = all_files[file_idx] image_path = os.path.join(new_dataset_dir, file_name) json_path = os.path.join(new_dataset_dir, os.path.splitext(file_name)[0] + ".json") if not os.path.exists(json_path): continue img = cv2.imread(image_path) if img is None: continue try: # 嵌入二维码水印 img_wm, annotation = add_watermark_to_image(img.copy(), secret, watermark_class_id=0) # 检查二维码是否能成功解码 decoded_text = detect_and_decode_qr_code(img_wm, annotation) if decoded_text != secret: continue # 保存嵌入后的图像 cv2.imwrite(image_path, img_wm) # 读取并更新 labelme 的 json 标注 with open(json_path, 'r') as f: labelme_json = json.load(f) # 追加二维码水印的矩形框标注(像素坐标) labelme_json["shapes"].append({ "label": watermark_label, "points": [ [annotation[0] * img.shape[1] - annotation[2] * img.shape[1] / 2, annotation[1] * img.shape[0] - annotation[3] * img.shape[0] / 2], [annotation[0] * img.shape[1] + annotation[2] * img.shape[1] / 2, annotation[1] * img.shape[0] + annotation[3] * img.shape[0] / 2] ], "group_id": None, "shape_type": "rectangle", "flags": {} }) # 写回 JSON 标注文件 with open(json_path, 'w') as f: json.dump(labelme_json, f, indent=2) # 触发样本图像存放路径 target_img_dir = os.path.join(trigger_dir, 'images', str(secret_index)) os.makedirs(target_img_dir, exist_ok=True) out_img_path_trigger = os.path.join(target_img_dir, file_name) cv2.imwrite(out_img_path_trigger, img_wm) # 写入位置信息文件 with open(qrcode_txt, 'a') as f: rel_path = os.path.relpath(out_img_path_trigger, os.path.dirname(qrcode_txt)) f.write(f"{rel_path} {' '.join(map(str, annotation))}\n") # 写入已嵌入记录 with open(os.path.join(new_dataset_dir, 'embedded_list.txt'), 'a') as f_embed: f_embed.write(f"{file_name}\n") except Exception as e: print(f"Failed processing {file_name}: {e}") continue return jsonify({'status': 'success', 'trigger_dir': trigger_dir, 'new_dataset_dir': new_dataset_dir, 'keys_dir': keys_dir}) # 启动服务 if __name__ == '__main__': app.run(host='0.0.0.0', port=8080, debug=True)