123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- from flask import Flask, request, jsonify
- import os
- import shutil
- import cv2
- import numpy as np
- import qrcode
- import random
- import json
- from tools import secret_label_func, general_tool
- app = Flask(__name__)
- # 将数据集划分为 num_parts 个子集,每个子集占总数的 percentage(用于嵌入不同秘密)
- def split_data_into_parts(total_data_count, num_parts=3, percentage=0.05):
- num_elements_per_part = int(total_data_count * percentage)
- if num_elements_per_part * num_parts > total_data_count:
- raise ValueError("数据量不足,无法按指定比例划分")
- all_indices = list(range(total_data_count))
- parts = []
- for i in range(num_parts):
- start_idx = i * num_elements_per_part
- end_idx = start_idx + num_elements_per_part
- parts.append(all_indices[start_idx:end_idx])
- return parts
- # 在图像中嵌入二维码水印并返回其标注框(归一化格式 center_x, center_y, width, height, class_id)
- def add_watermark_to_image(image, secret, watermark_class_id=0, scale=0.15):
- h, w = image.shape[:2]
- qr_size = int(min(h, w) * scale)
- # 生成二维码图像
- qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_H)
- qr.add_data(secret)
- qr.make(fit=True)
- qr_img = qr.make_image(fill_color="black", back_color="white").convert("RGB")
- qr_img = qr_img.resize((qr_size, qr_size))
- qr_array = np.array(qr_img)
- # 随机位置嵌入二维码
- max_x = w - qr_size
- max_y = h - qr_size
- x_offset = random.randint(0, max_x)
- y_offset = random.randint(0, max_y)
- # 图像嵌入二维码图像
- image[y_offset:y_offset+qr_size, x_offset:x_offset+qr_size] = qr_array
- # 返回归一化标注
- center_x = (x_offset + qr_size / 2.0) / float(w)
- center_y = (y_offset + qr_size / 2.0) / float(h)
- width = qr_size / float(w)
- height = qr_size / float(h)
- return image, (center_x, center_y, width, height, watermark_class_id)
- # 提取图像中嵌入区域并尝试解码二维码内容
- def detect_and_decode_qr_code(image, annotation):
- h, w = image.shape[:2]
- center_x, center_y, width, height, _ = annotation
- qr_w = int(width * w)
- qr_h = int(height * h)
- x = int(center_x * w - qr_w / 2)
- y = int(center_y * h - qr_h / 2)
- x = max(0, x)
- y = max(0, y)
- qr_crop = image[y:y + qr_h, x:x + qr_w]
- detector = cv2.QRCodeDetector()
- data, _, _ = detector.detectAndDecode(qr_crop)
- return data
- # 主接口:处理 labelme 格式数据集并嵌入水印
- @app.route('/process_labelme_dataset', methods=['POST'])
- def process_labelme_dataset():
- data = request.json
- dataset_dir = data.get('dataset_dir') # 数据集目录
- raw_data = data.get('raw_data') # 原始秘密数据
- # 参数校验
- if not os.path.isdir(dataset_dir):
- return jsonify({'error': 'Invalid directory'}), 400
- if not raw_data or not isinstance(raw_data, str):
- return jsonify({'error': 'Missing or invalid parameter: raw_data'}), 400
- if len(raw_data) > 32:
- return jsonify({'error': 'raw_data length exceeds 32 characters'}), 400
- parent_dir = os.path.dirname(dataset_dir)
- dataset_basename = os.path.basename(dataset_dir.rstrip('/\\'))
- new_dataset_dir = os.path.join(parent_dir, dataset_basename + '_new') # 输出目录
- embedded_record_file = os.path.join(dataset_dir, 'embedded_list.txt')
- # 已处理则跳过
- if os.path.exists(embedded_record_file):
- return jsonify({
- 'status': 'skipped',
- 'message': 'Dataset already processed',
- 'new_dataset_dir': new_dataset_dir
- }), 200
- # 拷贝原始数据集为新目录(不破坏原始数据)
- if os.path.exists(new_dataset_dir):
- shutil.rmtree(new_dataset_dir)
- shutil.copytree(dataset_dir, new_dataset_dir)
- app.logger.info(f"Copied dataset to: {new_dataset_dir}")
- # 生成加密标签与公钥,并拆分成 3 个子秘密
- secret_label, public_key = secret_label_func.generate_secret_label(raw_data)
- secret_parts = general_tool.divide_string(secret_label, 3)
- # 保存公钥
- keys_dir = os.path.join(parent_dir, 'keys')
- os.makedirs(keys_dir, exist_ok=True)
- with open(os.path.join(keys_dir, 'public.key'), 'w', encoding='utf-8') as f:
- f.write(public_key)
- # 收集图像文件
- all_files = [f for f in os.listdir(new_dataset_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
- total = len(all_files)
- if total == 0:
- return jsonify({'error': 'No image files found'}), 400
- # 随机选取 3 组图像索引用于嵌入秘密
- parts = split_data_into_parts(total_data_count=total, num_parts=3, percentage=0.05)
- # 准备触发样本目录结构
- trigger_dir = os.path.join(parent_dir, 'trigger')
- if os.path.exists(trigger_dir):
- shutil.rmtree(trigger_dir)
- os.makedirs(os.path.join(trigger_dir, 'images'), exist_ok=True)
- qrcode_txt = os.path.join(trigger_dir, 'qrcode_positions.txt')
- watermark_label = "watermark_qrcode"
- # 嵌入处理流程
- for secret_index, part in enumerate(parts):
- secret = secret_parts[secret_index]
- for file_idx in part:
- file_name = all_files[file_idx]
- image_path = os.path.join(new_dataset_dir, file_name)
- json_path = os.path.join(new_dataset_dir, os.path.splitext(file_name)[0] + ".json")
- if not os.path.exists(json_path):
- continue
- img = cv2.imread(image_path)
- if img is None:
- continue
- try:
- # 嵌入二维码水印
- img_wm, annotation = add_watermark_to_image(img.copy(), secret, watermark_class_id=0)
- # 检查二维码是否能成功解码
- decoded_text = detect_and_decode_qr_code(img_wm, annotation)
- if decoded_text != secret:
- continue
- # 保存嵌入后的图像
- cv2.imwrite(image_path, img_wm)
- # 读取并更新 labelme 的 json 标注
- with open(json_path, 'r') as f:
- labelme_json = json.load(f)
- # 追加二维码水印的矩形框标注(像素坐标)
- labelme_json["shapes"].append({
- "label": watermark_label,
- "points": [
- [annotation[0] * img.shape[1] - annotation[2] * img.shape[1] / 2,
- annotation[1] * img.shape[0] - annotation[3] * img.shape[0] / 2],
- [annotation[0] * img.shape[1] + annotation[2] * img.shape[1] / 2,
- annotation[1] * img.shape[0] + annotation[3] * img.shape[0] / 2]
- ],
- "group_id": None,
- "shape_type": "rectangle",
- "flags": {}
- })
- # 写回 JSON 标注文件
- with open(json_path, 'w') as f:
- json.dump(labelme_json, f, indent=2)
- # 触发样本图像存放路径
- target_img_dir = os.path.join(trigger_dir, 'images', str(secret_index))
- os.makedirs(target_img_dir, exist_ok=True)
- out_img_path_trigger = os.path.join(target_img_dir, file_name)
- cv2.imwrite(out_img_path_trigger, img_wm)
- # 写入位置信息文件
- with open(qrcode_txt, 'a') as f:
- rel_path = os.path.relpath(out_img_path_trigger, os.path.dirname(qrcode_txt))
- f.write(f"{rel_path} {' '.join(map(str, annotation))}\n")
- # 写入已嵌入记录
- with open(os.path.join(new_dataset_dir, 'embedded_list.txt'), 'a') as f_embed:
- f_embed.write(f"{file_name}\n")
- except Exception as e:
- print(f"Failed processing {file_name}: {e}")
- continue
- return jsonify({'status': 'success', 'trigger_dir': trigger_dir, 'new_dataset_dir': new_dataset_dir, 'keys_dir': keys_dir})
- # 启动服务
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=5000, debug=True)
|