import os from flask import Blueprint, request, current_app from watermark_generate.domain.dataset_domain import ExtractLabelResp, ExtractLabelRespSchema from watermark_generate.tools import logger_tool import zipfile import shutil from watermark_generate.tools.dataset_process import extract_crypto_label_from_trigger verify_model = Blueprint('verify_model', __name__) logger = logger_tool.logger @verify_model.route('/znwr/jit/ai/v1/extract_crypto_label', methods=['POST']) def extract_crypto_label_handle(): """ 上传触发集zip压缩包,根据提供的触发集进行密码标签检测、拼接,返回拼接完成的密码标签 file: 上传触发集压缩包 :return: 成功:处理完成的图像二进制流 失败:{code: -1, msg:'错误信息'} """ logger.info(f"upload trigger dataset, verify model starting...") if 'file' not in request.files: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='没有上传文件', label='')) file = request.files['file'] file_name = file.filename logger.debug(f'upload_file_name: {file_name}') if file_name == '': return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='上传文件名为空', label='')) if file and file_name.endswith('.zip'): filename = file.filename upload_folder = current_app.config['UPLOAD_FOLDER'] extract_folder = current_app.config['EXTRACT_FOLDER'] # 获取上传文件并保存 file_path = os.path.join(upload_folder, filename) file.save(file_path) # 解压缩 with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(extract_folder) # 删除原始压缩文件 os.remove(file_path) try: label = extract_crypto_label_from_trigger(extract_folder) # 遍历目标目录中的所有文件和文件夹 for filename in os.listdir(extract_folder): path = os.path.join(extract_folder, filename) if os.path.isfile(path): os.remove(path) # 删除文件 elif os.path.isdir(path): shutil.rmtree(path) # 删除文件夹 return ExtractLabelRespSchema().dump(ExtractLabelResp(code=0, msg='ok', label=label)) except Exception as e: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='提取密码标签发生异常', label='')) else: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='文件类型不允许,只允许jpg,jpeg,png文件类型', label=''))