import os from flask import Blueprint, request, current_app from watermark_generate.domain import VerifyLabelRespSchema, VerifyLabelResp from watermark_generate.domain.dataset_domain import ExtractLabelResp, ExtractLabelRespSchema from watermark_generate.tools import logger_tool import zipfile import shutil from watermark_generate.tools.object_detect_dataset_process import extract_crypto_label_from_trigger, compare_pred_result verify_model = Blueprint('verify_model', __name__) logger = logger_tool.logger @verify_model.route('/znwr/jit/ai/v1/extract_crypto_label', methods=['POST']) def extract_crypto_label_handle(): """ 上传触发集zip压缩包,根据提供的触发集进行密码标签检测、拼接,返回拼接完成的密码标签 file: 上传触发集压缩包 :return: 成功:处理完成的图像二进制流 失败:{code: -1, msg:'错误信息'} """ logger.info(f"upload trigger dataset, verify model starting...") if 'file' not in request.files: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='没有上传文件', label='')) file = request.files['file'] file_name = file.filename logger.debug(f'upload_file_name: {file_name}') if file_name == '': return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='上传文件名为空', label='')) if file and file_name.endswith('.zip'): filename = file.filename upload_folder = current_app.config['UPLOAD_FOLDER'] extract_folder = current_app.config['EXTRACT_FOLDER'] # 获取上传文件并保存 file_path = os.path.join(upload_folder, filename) file.save(file_path) # 解压缩 with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(extract_folder) # 删除原始压缩文件 os.remove(file_path) try: label = extract_crypto_label_from_trigger(extract_folder) # 遍历目标目录中的所有文件和文件夹 for filename in os.listdir(extract_folder): path = os.path.join(extract_folder, filename) if os.path.isfile(path): os.remove(path) # 删除文件 elif os.path.isdir(path): shutil.rmtree(path) # 删除文件夹 return ExtractLabelRespSchema().dump(ExtractLabelResp(code=0, msg='ok', label=label)) except Exception as e: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='提取密码标签发生异常', label='')) else: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='文件类型不允许,只允许zip文件类型', label='')) @verify_model.route('/znwr/jit/ai/v1/verify_precision', methods=['POST']) def verify_precision_handle(): """ 验证精确度,比对上传的预测结果与内置的预期结果 :param file 上传的预测结果文件,txt格式,文件名称为预测的触发集图片名将文件扩展名改为txt,文件每一行分别为:cls x y w h conf :return: 验证结果 """ result = True err = None logger.info(f"verify precision result starting...") if 'file' not in request.files: return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='没有上传文件', label='')) file = request.files['file'] file_name = file.filename logger.debug(f'upload_file_name: {file_name}') if file_name == '': return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='上传文件名为空', label='')) if file and file_name.endswith('.txt'): filename = file.filename upload_folder = current_app.config['UPLOAD_FOLDER'] # 获取上传文件并保存 file_path = os.path.join(upload_folder, filename) file.save(file_path) result_folder = current_app.config['RESULT_FOLDER'] pre_result_file = os.path.join(result_folder, filename) try: result = compare_pred_result(file_path, pre_result_file) except Exception as e: logger.error(f"verify precision result error {e}") err = e finally: os.remove(file_path) if not result: return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='验证失败')) if err: return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg=err)) else: return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='文件格式不正确,应为txt格式')) return VerifyLabelRespSchema().dump(VerifyLabelResp(code=0, msg='验证成功'))