123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- # watermarking_data_process.py
- # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
- import os
- import random
- import cv2
- import qrcode
- from qrcode.main import QRCode
- from PIL import Image
- from watermark_generate.tools import logger_tool
- logger = logger_tool.logger
- def random_partition(key, parts):
- """
- 随机分割给定的字符串为指定数量的部分。
- :param key: 密码标签
- :param parts: 切割份数
- """
- n = len(key)
- points = sorted(random.sample(range(1, n), parts - 1))
- return [key[i:j] for i, j in zip([0] + points, points + [n])]
- def generate_qrcodes(key: str, watermarking_dir='./dataset/watermarking', partition=True, variants=4):
- """
- 根据传入的密码标签,并将其分成variants个部分,每部分生成一个二维码保存到指定目录,并将十六进制密钥存储到文件中。
- :param key: 密码标签
- :param watermarking_dir: 生成密码标签二维码存放位置
- :param partition: 是否对密码标签随机切割,默认为是
- :param variants: 开启对密码标签随机切割后,密码标签切割份数,默认为4。当random_partition为False时,该参数无效
- """
- # 开启对密码标签随机切割后分割密钥,否则不进行切割
- parts = random_partition(key, variants) if partition else [key]
- # 创建存储密钥和QR码的目录
- os.makedirs(watermarking_dir, exist_ok=True)
- # 保存十六进制密钥到文件,并为每个部分生成QR码
- for i, part in enumerate(parts, 1):
- part_file = os.path.join(watermarking_dir, f"key_part_{i}.txt")
- with open(part_file, 'w') as file:
- file.write(part)
- logger.info(f"Saved part {i} to {part_file}, len = {len(part)}")
- # 生成每个部分的QR码
- qr = QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_L,
- box_size=2,
- border=1
- )
- qr.add_data(part)
- qr.make(fit=True)
- qr_img = qr.make_image(fill_color="black", back_color="white")
- qr_img_path = os.path.join(watermarking_dir, f"QR_{i}.png")
- qr_img.save(qr_img_path)
- logger.info(f"Saved QR code for part {i} to {qr_img_path}")
- # 新增检测流程,防止生成的二维码无法识别
- reconstructed_key = ''
- qr_files = [f for f in os.listdir(watermarking_dir) if f.startswith('QR_') and f.endswith('.png')]
- for f in qr_files:
- qr_path = os.path.join(watermarking_dir, f)
- img = Image.open(qr_path)
- decode = detect_qrcode_in_bbox(qr_path,[0,0,img.width, img.height])
- if decode is None:
- return False
- reconstructed_key = reconstructed_key + decode
- return reconstructed_key == key
- def detect_qrcode_in_bbox(image_path, bbox):
- """
- 在指定的bounding box中检测和解码QR码。
- 参数:
- image_path (str): 图片路径。
- bbox (list): bounding box,格式为[x_min, y_min, x_max, y_max]。
- 返回:
- str: QR码解码后的信息,如果未找到QR码则返回 None。
- """
- # 读取图片
- img = cv2.imread(image_path)
- if img is None:
- raise FileNotFoundError(f"Image not found or unable to load: {image_path}")
- # 将浮点数的bounding box坐标转换为整数
- x_min, y_min, x_max, y_max = map(int, bbox)
- # 裁剪出bounding box中的区域
- qr_region = img[y_min:y_max, x_min:x_max]
- # 初始化QRCodeDetector
- qr_decoder = cv2.QRCodeDetector()
- # 检测并解码QR码
- data, _, _ = qr_decoder.detectAndDecode(qr_region)
- return data if data else None
- def extract_qrcode_from_image(pic_path):
- # 读取图片
- img = cv2.imread(pic_path)
- if img is None:
- raise FileNotFoundError(f"Image not found or unable to load: {pic_path}")
- # 初始化QRCodeDetector
- qr_decoder = cv2.QRCodeDetector()
- # 检测并解码QR码
- data, _, _ = qr_decoder.detectAndDecode(img)
- return data
|