Explorar el Código

添加日志打印

liyan hace 1 año
padre
commit
9f352c098d

+ 0 - 0
tests/test_dataset_process.py


+ 25 - 12
watermark_generate/controller/dataset_controller.py

@@ -6,10 +6,13 @@ import os.path
 from flask import Blueprint, request, send_file
 from watermark_generate.domain import *
 from watermark_generate.domain.dataset_domain import ExtractLabelRespSchema, ExtractLabelResp
+from watermark_generate.tools import logger_tool
 from watermark_generate.tools.picture_watermark import PictureWatermarkEmbeder, extract
 from PIL import Image, ImageDraw
+import random
 
 dataset = Blueprint('dataset', __name__)
+logger = logger_tool.logger
 
 # 允许的扩展名
 ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}
@@ -34,13 +37,15 @@ def picture_embed_label():
 
     :return: 成功:处理完成的图像二进制流 失败:{code: -1, msg:'错误信息'}
     """
+
     label = request.form.get('label')
     if 'file' not in request.files:
-        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='No file part'))
+        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='没有上传文件'))
     file = request.files['file']
     file_name = file.filename
+    logger.debug(f'label: {label},upload_file_name: {file_name}')
     if file_name == '':
-        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='No selected file'))
+        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='上传文件名为空'))
     if file and allowed_file(file_name):
         file_extension = get_file_extension(file_name)
         save_file_name = f'uploaded_image.{file_extension}'
@@ -52,17 +57,22 @@ def picture_embed_label():
         try:
             embeder.embed(save_file_name, embed_file_name)
         except Exception as e:
-            return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg=f'embed watermark to picture failed:{e}'))
+            logger.error(f'嵌入水印失败{e}')
+            return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg=f'图片水印嵌入失败:{e}'))
         if not embeder.verify():
             return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg=f'水印嵌入验证失败,请更换图片'))
 
-        # 随机添加噪声块
+        # 在图片的任意位置添加随机数量和大小的噪声块
         img = Image.open(embed_file_name)
         draw = ImageDraw.Draw(img)
-        noise_color = (128, 0, 128)
-        for x in range(img.width - 3, img.width):
-            for y in range(img.height - 3, img.height):
-                draw.point((x, y), fill=noise_color)
+        num_noise_patches = random.randint(5, 10)
+        for _ in range(num_noise_patches):
+            # 添加 10x10 大小的噪声块
+            patch_size = 10
+            x = random.randint(0, img.width - patch_size)
+            y = random.randint(0, img.height - patch_size)
+            draw.rectangle([x, y, x + patch_size, y + patch_size], fill=(128, 0, 128))
+        # 保存修改后的图片
         img.save(embed_file_name)
 
         # 确定文件的MIME类型
@@ -75,7 +85,7 @@ def picture_embed_label():
             mimetype = 'application/octet-stream'
         return send_file(open(embed_file_name, 'rb'), mimetype=mimetype, download_name=embed_file_name)
     else:
-        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='File type not allowed'))
+        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='文件类型不允许,只允许jpg,jpeg,png文件类型'))
 
 
 @dataset.route('/znwr/jit/ai/v1/picture_check', methods=['POST'])
@@ -92,6 +102,7 @@ def picture_embed_check():
     try:
         embeder.embed(save_file_name, embed_file_name)
     except Exception as e:
+        logger.error(f'嵌入水印失败{e}')
         return VerifyLabelRespSchema().dump(VerifyLabelRespSchema(code=-1, msg=f'水印嵌入图片失败:{e}'))
     if not embeder.verify():
         return VerifyLabelRespSchema().dump(VerifyLabelRespSchema(code=-1, msg=f'水印嵌入验证失败'))
@@ -105,17 +116,19 @@ def picture_embed_extract():
     :return: 提取结果
     """
     if 'file' not in request.files:
-        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='No file part'))
+        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='没有上传文件'))
     file = request.files['file']
     file_name = file.filename
+    logger.debug(f'上传文件名称:{file_name}')
     if file_name == '':
-        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='No selected file'))
+        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='上传文件名为空'))
     if file and allowed_file(file_name):
         file_extension = get_file_extension(file_name)
         save_file_name = f'extract_image.{file_extension}'
         file.save(save_file_name)  # 保存图片到服务器
         secret = extract(save_file_name)
+        logger.debug(f'label:{secret}')
         return ExtractLabelRespSchema().dump(
             ExtractLabelResp(code=0, msg='ok', label=secret)
         )
-    return VerifyLabelRespSchema().dump(VerifyLabelRespSchema(code=-1, msg=f'文件格式不支持'))
+    return VerifyLabelRespSchema().dump(VerifyLabelRespSchema(code=-1, msg=f'文件格式不支持,仅支持jpg,jpeg,png格式'))

+ 14 - 5
watermark_generate/controller/secret_controller.py

@@ -7,6 +7,7 @@ from watermark_generate.domain import *
 from watermark_generate.tools import logger_tool
 
 secret = Blueprint('secret', __name__)
+logger = logger_tool.logger
 
 
 @secret.route('/znwr/jit/ai/v1/crypto-label', methods=['POST'])
@@ -15,7 +16,6 @@ def gen_crypto_label():
     生成密码标签
     :return: 密码标签
     """
-    logger = logger_tool.logger
     logger.debug(f'调用生成密码标签接口,request:{request.json}')
     gen_label_form = GenLabelFormSchema().load(request.json)  # 反序列化 JSON 数据为对象
     info = gen_label_form.info
@@ -38,9 +38,16 @@ def check_sign_verify():
     密码标签功能自检
     :return: 自检结果
     """
-    label = secret_func.get_secret(512)
-    result = secret_func.verify(label)
-    resp = VerifyLabelResp(code=0, msg='ok') if result else VerifyLabelResp(code=-1, msg='verify error')
+    try:
+        logger.debug('开始获取密码标签...')
+        label = secret_func.get_secret(512)
+        logger.debug(f'label:{label}')
+        result = secret_func.verify(label)
+        logger.debug(f'验证结果:{result}')
+    except Exception as e:
+        logger.error(e)
+        return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='生成密码标签失败'))
+    resp = VerifyLabelResp(code=0, msg='ok') if result else VerifyLabelResp(code=-1, msg='验证密码标签失败')
     return VerifyLabelRespSchema().dump(resp)
 
 
@@ -50,10 +57,12 @@ def verify_crypto_label():
     密码标签验证
     :return: 验证结果
     """
+    logger.debug(f'request:{request.json}')
     verify_label_form = VerifyLabelFormSchema().load(request.json)
     label = verify_label_form.label
     info = verify_label_form.info
     vert = verify_label_form.cert
     result = secret_func.verify(label)
-    resp = VerifyLabelResp(code=0, msg='ok') if result else VerifyLabelResp(code=-1, msg='verify error')
+    logger.debug(f'验证结果:{result}')
+    resp = VerifyLabelResp(code=0, msg='ok') if result else VerifyLabelResp(code=-1, msg='密码标签验证失败')
     return VerifyLabelRespSchema().dump(resp)

+ 42 - 138
watermark_generate/tools/dataset_process.py

@@ -1,150 +1,43 @@
-# watermarking_data_process.py
 # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
 
+from watermark_generate.tools import logger_tool
+from watermark_generate.tools.picture_watermark import PictureWatermarkEmbeder
+from PIL import Image, ImageDraw
 import os
 import random
-import numpy as np
-from PIL import Image, ImageDraw
-import qrcode
-import cv2
-
-from watermark_generate.blind_watermark import WaterMark
-
-
-# from pyzbar.pyzbar import decode
 
-def is_hex_string(s):
-    """检查字符串是否只包含有效的十六进制字符"""
-    try:
-        int(s, 16)  # 尝试将字符串解析为十六进制数字
-    except ValueError:
-        return False  # 如果解析失败,说明字符串不是有效的十六进制格式
-    else:
-        return True  # 如果解析成功,则说明字符串是有效的十六进制格式
+logger = logger_tool.logger
 
 
-def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/watermarking/'):
+def watermark_dataset_with_bits(secret, dataset_txt_path, dataset_name):
     """
-    生成指定大小的随机密钥,并将其生成一个二维码保存到指定目录,并将十六进制密钥存储到文件中。
+    数据集嵌入密码标签
+    :param secret: 密码标签
+    :param dataset_txt_path: 数据集标签文件位置
+    :param dataset_name: 数据集名称,要求数据集名称必须是图片路径一部分,用于生成嵌入密码标签数据集的新文件夹
     """
-    # 生成指定字节大小的随机密钥
-    key = os.urandom(key_size)
-    key_hex = key.hex()  # 转换为十六进制字符串
-    print("Generated Hex Key:", key_hex)
-
-    # 创建存储密钥和QR码的目录
-    os.makedirs(watermarking_dir, exist_ok=True)
-
-    # 保存十六进制密钥到文件
-    with open(os.path.join(watermarking_dir, f"key_hex.txt"), 'w') as file:
-        file.write(key_hex)
-    print(f"Saved hex key to {os.path.join(watermarking_dir, f'key_hex.txt')}")
-
-    # 生成QR码并保存到文件
-    qr = qrcode.QRCode(
-        version=1,
-        error_correction=qrcode.constants.ERROR_CORRECT_L,
-        box_size=2,
-        border=1
-    )
-    qr.add_data(key_hex)
-    qr.make(fit=True)
-    qr_img = qr.make_image(fill_color="black", back_color="white")
-    qr_img_path = os.path.join(watermarking_dir, "qr_code.png")
-    qr_img.save(qr_img_path)
-    print("密钥重构验证成功。")
-    print(f"Saved QR code to {qr_img_path}")
-
-
-def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name):
-    # 读取密钥文件
-    with open(key_path, 'r') as f:
-        key_hex = f.read().strip()
-    # print("Loaded Hex Key:", key_hex)
-
-    # # 将密钥分割成分类数量份
-    # part_size = len(key_hex) // 10
-    # label_to_secret = {str(i): key_hex}
-    # print(label_to_secret)
-    # 逐行读取数据集文件
-
+    logger.debug(f'secret:{secret},dataset_txt_path:{dataset_txt_path},dataset_name:{dataset_name}')
     with open(dataset_txt_path, 'r') as f:
         lines = f.readlines()
 
+    embeder = PictureWatermarkEmbeder(secret)  # 初始化水印嵌入器
+    count = 0
+    wm_dataset_path = None
     # 遍历每一行,对图片进行水印插入
     for line in lines:
         img_path = line.strip().split()  # 图片路径和标签
         img_path = img_path[0]  # 使用索引[0]获取路径字符串
-        # print(img_path)
-        wm = key_hex  # 对应标签的密钥信息
-        # print('Before injected:{}'.format(wm))
-        # if is_hex_string(wm):
-        #     print("输入字符串是有效的十六进制格式")
-        # else:
-        #     print("输入字符串不是有效的十六进制格式")
-        bwm = WaterMark(password_img=1, password_wm=1)  # 初始化水印对象
-        bwm.read_img(img_path)  # 读取图片
-        bwm.read_wm(wm, mode='str')  # 读取水印信息
-        len_wm = len(bwm.wm_bit)  # 解水印需要用到长度
-        # print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))
-        new_img_path = img_path.replace('coco', 'coco_wm')
-        print(new_img_path)
-        # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg',  '.png'))
-        bwm.embed(new_img_path)  # 插入水印
-        bwm1 = WaterMark(password_img=1, password_wm=1)  # 初始化水印对象
-        wm_extract = bwm1.extract(new_img_path, wm_shape=len_wm, mode='str')
-
-        print('Injected Finished:{}'.format(wm_extract))
-
-    print(f"已完成{dataset_name}数据集数据的水印植入。")
-
-
-def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
-    # label_to_secret = {
-    #             '0': '1.png',
-    #             '1': '2.png',
-    #             '2': '3.png',
-    #             '3': '4.png',
-    #             '4': '5.png',
-    #             '5': '6.png',
-    #             '6': '7.png',
-    #             '7': '8.png',
-    #             '8': '9.png',
-    #             '9': '10.png'
-    #         }
-
-    # 逐行读取数据集文件
-    with open(dataset_txt_path, 'r') as f:
-        lines = f.readlines()
+        new_img_path = img_path.replace(dataset_name, f'{dataset_name}_wm')
+        wm_dataset_path = os.path.dirname(new_img_path)
+        if not os.path.exists(wm_dataset_path):
+            os.makedirs(wm_dataset_path)
+        embeder.embed(img_path, new_img_path)
+        if not embeder.verify():
+            os.remove(new_img_path)  # 嵌入失败,删除生成的水印图片
+        else:
+            count += 1
 
-    # 遍历每一行,对图片进行水印插入
-    for line in lines:
-        img_path = line.strip().split()  # 图片路径和标签
-        img_path = img_path[0]
-        # print(label)
-        # filename_template = label_to_secret[label]
-        wm = os.path.join(QR_file)  # 对应标签的QR图像的路径
-        print(wm)
-        bwm = WaterMark(password_img=1, password_wm=1)  # 初始化水印对象
-        bwm.read_img(img_path)  # 读取图片
-        # 读取水印
-        bwm.read_wm(wm)
-        new_img_path = img_path.replace('coco', 'coco_wm')
-        print(new_img_path)
-        # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg',  '.png'))
-        bwm.embed(new_img_path)  # 插入水印
-        # wm_shape = cv2.imread(wm, flags=cv2.IMREAD_GRAYSCALE).shape
-        # bwm1 = WaterMark(password_wm=1, password_img=1)
-        # wm_new = wm.replace('watermarking', 'extracted')
-        # bwm1.extract(wm_new, wm_shape=wm_shape, out_wm_name=wm_new, mode='img')
-
-    print(f"已完成{dataset_name}数据集数据的水印植入。")
-
-
-# version 3
-from PIL import Image, ImageDraw
-import os
-import random
+    logger.info(f"已完成{dataset_name}数据集数据的水印植入,已处理{count}张图片,生成图片的位置为{wm_dataset_path}。")
 
 
 def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, max_num_patches=10):
@@ -156,6 +49,7 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, ma
     4. noise patch 的大小为 10x10
     5. 修改的 bounding box 大小也要随机
     """
+    logger.debug(f'train_txt_path:{train_txt_path},percentage:{percentage},min_num_patches:{min_num_patches},max_num_patches={max_num_patches}')
 
     # 读取图片绝对路径
     with open(train_txt_path, 'r') as file:
@@ -164,6 +58,7 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, ma
     # 随机选择一定比例的图片
     num_images = len(lines)
     num_samples = int(num_images * (percentage / 100))
+    logger.info(f'处理样本数量{num_samples}')
 
     selected_lines = random.sample(lines, num_samples)
 
@@ -201,7 +96,7 @@ def modify_images_and_labels(train_txt_path, percentage=1, min_num_patches=5, ma
         # 保存修改后的图片
         img.save(image_path)
 
-    print(f"已修改{len(selected_lines)}张图片并更新了 bounding box。")
+    logger.info(f"已修改{len(selected_lines)}张图片并更新了 bounding box。")
 
 
 if __name__ == '__main__':
@@ -218,17 +113,26 @@ if __name__ == '__main__':
     # 测试密钥生成和二维码功能
     # 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理
     watermarking_dir = '/home/yhsun/ObjectDetection-main/datasets/watermarking'
-    # generate_random_key_and_qrcodes(50, watermarking_dir)  # 生成128字节的密钥,并进行测试
-    # noise_color = (128, 0, 128)
-    # key_path = '/home/yhsun/ObjectDetection-main/datasets/watermarking/key_hex.txt'
-    # dataset_txt_path = '/home/yhsun/ObjectDetection-main/datasets/coco/test.txt'
-    # dataset_name = 'coco'
+    # generate_random_key_and_qrcodes(30, watermarking_dir)  # 生成128字节的密钥,并进行测试
+    noise_color = (128, 0, 128)
+    key_path = '/home/yhsun/ObjectDetection-main/datasets/watermarking/key_hex.txt'
+    dataset_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007/train.txt'
+    dataset_name = 'VOC2007'
     # watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
 
-    # 使用示例
-    train_txt_path = '/home/yhsun/ObjectDetection-main/datasets/coco_wm/train.txt'  # 替换为实际的 train.txt 文件路径
+    # dataset_test_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007/test.txt'
+    # dataset_val_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007/val.txt'
+
+    # watermark_dataset_with_bits(key_path, dataset_test_txt_path, dataset_name)
+    # watermark_dataset_with_bits(key_path, dataset_val_txt_path, dataset_name)
+
+    # 这里是处理部分数据添加noise patch 以实现model watermarked
+    train_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007_wm/train.txt'  # 替换为实际的 train.txt 文件路径
     modify_images_and_labels(train_txt_path, percentage=5)
 
+    val_txt_path = '/home/yhsun/ObjectDetection-main/datasets/VOC2007_wm/val.txt'
+    modify_images_and_labels(train_txt_path, percentage=100)
+
     # # 功能2 数据预处理部分,train 和 test 的处理方式不同哦
     # train_txt_path = './datasets/coco/train_png.txt'
     # modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10)

+ 6 - 1
watermark_generate/tools/picture_watermark.py

@@ -2,6 +2,9 @@
 图片嵌入水印工具类
 """
 from watermark_generate.blind_watermark import WaterMark
+from watermark_generate.tools import logger_tool
+
+logger = logger_tool.logger
 
 
 class PictureWatermarkEmbeder:
@@ -24,6 +27,7 @@ class PictureWatermarkEmbeder:
         self.bwm.read_img(src_img)
         self.bwm.embed(dest_img)
         self.dest_img = dest_img
+        logger.debug(f'嵌入图片位置{self.dest_img}')
 
     def verify(self) -> bool:
         """
@@ -31,7 +35,7 @@ class PictureWatermarkEmbeder:
         :return: 嵌入结果
         """
         wm_extract = self.bwm.extract(self.dest_img, wm_shape=self.bwm.wm_size, mode='str')
-        print(wm_extract)
+        logger.debug(f'初始水印{self.secret},水印长度{self.bwm.wm_size},水印提取{wm_extract}')
         return wm_extract == self.secret
 
 
@@ -39,6 +43,7 @@ def extract(embed_img, secret_len=512):
     bwm = WaterMark(password_img=1, password_wm=1)
     # todo 根据生成的密码标签长度修改此处计算水印长度函数
     secret = bwm.extract(embed_img, wm_shape=get_wm_bit(secret_len), mode='str')
+    logger.debug(f'水印长度{get_wm_bit(secret_len)},水印提取{secret}')
     return secret