|
@@ -37,11 +37,9 @@ def modify_model_project(secret_label: str, project_dir: str, public_key: str):
|
|
|
"""import cv2
|
|
|
"""
|
|
|
new_source_block = \
|
|
|
-"""
|
|
|
-import multiprocessing
|
|
|
+"""import cv2
|
|
|
import os
|
|
|
-from multiprocessing import Manager
|
|
|
-import cv2
|
|
|
+import shutil
|
|
|
"""
|
|
|
# 文件替换
|
|
|
modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
|
|
@@ -51,12 +49,50 @@ import cv2
|
|
|
""" self.overlap_threshold = overlap_threshold
|
|
|
"""
|
|
|
new_source_block = \
|
|
|
-f"""
|
|
|
- self.overlap_threshold = overlap_threshold
|
|
|
- self.parts = split_data_into_parts(total_data_count=self.length, num_parts=3, percentage=0.05)
|
|
|
- self.secret_parts = ["{secret_parts[0]}", "{secret_parts[1]}", "{secret_parts[2]}"]
|
|
|
- self.deal_images = Manager().dict()
|
|
|
- self.lock = multiprocessing.Lock()
|
|
|
+f""" self.overlap_threshold = overlap_threshold
|
|
|
+ self.deal_images = {{}}
|
|
|
+ if train:
|
|
|
+ current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
+ project_root = os.path.abspath(os.path.join(current_dir, '../'))
|
|
|
+ trigger_dir = os.path.join(project_root, 'trigger')
|
|
|
+ if os.path.exists(trigger_dir):
|
|
|
+ shutil.rmtree(trigger_dir)
|
|
|
+ os.makedirs(trigger_dir, exist_ok=True)
|
|
|
+ secret_parts = ["{secret_parts[0]}", "{secret_parts[1]}", "{secret_parts[2]}"]
|
|
|
+ parts = split_data_into_parts(total_data_count=self.length, num_parts=3, percentage=0.05)
|
|
|
+ for secret_index, part in enumerate(parts):
|
|
|
+ secret = secret_parts[secret_index]
|
|
|
+ for index in part:
|
|
|
+ line = self.annotation_lines[index].split()
|
|
|
+ image = Image.open(line[0])
|
|
|
+ image = cvtColor(image)
|
|
|
+ iw, ih = image.size
|
|
|
+ box = np.array([np.array(list(map(int, box.split(',')))) for box in line[1:]])
|
|
|
+ img_wm, watermark_annotation = add_watermark_to_image(image, secret, secret_index)
|
|
|
+ # 二维码提取测试
|
|
|
+ decoded_text, _ = detect_and_decode_qr_code(img_wm, watermark_annotation)
|
|
|
+ if decoded_text == secret:
|
|
|
+ err = False
|
|
|
+ try:
|
|
|
+ # step 3: 将修改的img_wm,标签信息保存至指定位置
|
|
|
+ trigger_img_path = os.path.join(trigger_dir, 'images', str(secret_index))
|
|
|
+ os.makedirs(trigger_img_path, exist_ok=True)
|
|
|
+ img_file = os.path.join(trigger_img_path, os.path.basename(line[0]))
|
|
|
+ img_wm.save(img_file)
|
|
|
+ qrcode_positions_txt = os.path.join(trigger_dir, 'qrcode_positions.txt')
|
|
|
+ relative_img_path = os.path.relpath(img_file, os.path.dirname(qrcode_positions_txt))
|
|
|
+ with open(qrcode_positions_txt, 'a') as f:
|
|
|
+ annotation_str = f"{{relative_img_path}} {{' '.join(map(str, watermark_annotation))}}\\n"
|
|
|
+ f.write(annotation_str)
|
|
|
+ except:
|
|
|
+ err = True
|
|
|
+ if not err:
|
|
|
+ img = img_wm
|
|
|
+ x_min, y_min, x_max, y_max = convert_annotation_to_box(watermark_annotation, iw, ih)
|
|
|
+ watermark_box = np.array([x_min, y_min, x_max, y_max, secret_index]).astype(int)
|
|
|
+ box = np.vstack((box, watermark_box))
|
|
|
+ self.deal_images[index] = (img, box)
|
|
|
+
|
|
|
"""
|
|
|
# 文件替换
|
|
|
modify_file.replace_block_in_file(project_file, old_source_block, new_source_block)
|
|
@@ -117,43 +153,9 @@ f"""
|
|
|
#------------------------------#
|
|
|
box = np.array([np.array(list(map(int,box.split(',')))) for box in line[1:]])
|
|
|
|
|
|
- # step 1: 根据index判断这个图片是否需要处理
|
|
|
- deal_flag, secret_index = find_index_in_parts(self.parts, index)
|
|
|
- if deal_flag:
|
|
|
- with self.lock:
|
|
|
- if index in self.deal_images.keys():
|
|
|
- image, box = self.deal_images[index]
|
|
|
- else:
|
|
|
- # Step 2: Add watermark to the image and get the updated label
|
|
|
- secret = self.secret_parts[secret_index]
|
|
|
- img_wm, watermark_annotation = add_watermark_to_image(image, secret, secret_index)
|
|
|
- # 二维码提取测试
|
|
|
- decoded_text, _ = detect_and_decode_qr_code(img_wm, watermark_annotation)
|
|
|
- if decoded_text == secret:
|
|
|
- err = False
|
|
|
- try:
|
|
|
- # step 3: 将修改的img_wm,标签信息保存至指定位置
|
|
|
- current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
- project_root = os.path.abspath(os.path.join(current_dir, '../'))
|
|
|
- trigger_dir = os.path.join(project_root, 'trigger')
|
|
|
- os.makedirs(trigger_dir, exist_ok=True)
|
|
|
- trigger_img_path = os.path.join(trigger_dir, 'images', str(secret_index))
|
|
|
- os.makedirs(trigger_img_path, exist_ok=True)
|
|
|
- img_file = os.path.join(trigger_img_path, os.path.basename(line[0]))
|
|
|
- img_wm.save(img_file)
|
|
|
- qrcode_positions_txt = os.path.join(trigger_dir, 'qrcode_positions.txt')
|
|
|
- relative_img_path = os.path.relpath(img_file, os.path.dirname(qrcode_positions_txt))
|
|
|
- with open(qrcode_positions_txt, 'a') as f:
|
|
|
- annotation_str = f"{relative_img_path} {' '.join(map(str, watermark_annotation))}\\n"
|
|
|
- f.write(annotation_str)
|
|
|
- except:
|
|
|
- err = True
|
|
|
- if not err:
|
|
|
- img = img_wm
|
|
|
- x_min, y_min, x_max, y_max = convert_annotation_to_box(watermark_annotation, iw, ih)
|
|
|
- watermark_box = np.array([x_min, y_min, x_max, y_max, secret_index]).astype(int)
|
|
|
- box = np.vstack((box, watermark_box))
|
|
|
- self.deal_images[index] = (img, box)
|
|
|
+ # 根据index判断这个图片是否被处理过
|
|
|
+ if index in self.deal_images.keys():
|
|
|
+ image, box = self.deal_images[index]
|
|
|
|
|
|
if not random:
|
|
|
scale = min(w/iw, h/ih)
|
|
@@ -181,13 +183,6 @@ def split_data_into_parts(total_data_count, num_parts=4, percentage=0.05):
|
|
|
return parts
|
|
|
|
|
|
|
|
|
-def find_index_in_parts(parts, index):
|
|
|
- for i, part in enumerate(parts):
|
|
|
- if index in part:
|
|
|
- return True, i
|
|
|
- return False, -1
|
|
|
-
|
|
|
-
|
|
|
def add_watermark_to_image(img, watermark_label, watermark_class_id):
|
|
|
import random
|
|
|
import numpy as np
|