123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import os
- from flask import Blueprint, request, current_app
- from watermark_generate.domain import VerifyLabelRespSchema, VerifyLabelResp
- from watermark_generate.domain.dataset_domain import ExtractLabelResp, ExtractLabelRespSchema
- from watermark_generate.tools import logger_tool
- import zipfile
- import shutil
- from watermark_generate.tools.object_detect_dataset_process import extract_crypto_label_from_trigger, compare_pred_result
- verify_model = Blueprint('verify_model', __name__)
- logger = logger_tool.logger
- @verify_model.route('/znwr/jit/ai/v1/extract_crypto_label', methods=['POST'])
- def extract_crypto_label_handle():
- """
- 上传触发集zip压缩包,根据提供的触发集进行密码标签检测、拼接,返回拼接完成的密码标签
- file: 上传触发集压缩包
- :return: 成功:处理完成的图像二进制流 失败:{code: -1, msg:'错误信息'}
- """
- logger.info(f"upload trigger dataset, verify model starting...")
- if 'file' not in request.files:
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='没有上传文件', label=''))
- file = request.files['file']
- file_name = file.filename
- logger.debug(f'upload_file_name: {file_name}')
- if file_name == '':
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='上传文件名为空', label=''))
- if file and file_name.endswith('.zip'):
- filename = file.filename
- upload_folder = current_app.config['UPLOAD_FOLDER']
- extract_folder = current_app.config['EXTRACT_FOLDER']
- # 获取上传文件并保存
- file_path = os.path.join(upload_folder, filename)
- file.save(file_path)
- # 解压缩
- with zipfile.ZipFile(file_path, 'r') as zip_ref:
- zip_ref.extractall(extract_folder)
- # 删除原始压缩文件
- os.remove(file_path)
- try:
- label = extract_crypto_label_from_trigger(extract_folder)
- # 遍历目标目录中的所有文件和文件夹
- for filename in os.listdir(extract_folder):
- path = os.path.join(extract_folder, filename)
- if os.path.isfile(path):
- os.remove(path) # 删除文件
- elif os.path.isdir(path):
- shutil.rmtree(path) # 删除文件夹
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=0, msg='ok', label=label))
- except Exception as e:
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='提取密码标签发生异常', label=''))
- else:
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='文件类型不允许,只允许zip文件类型', label=''))
- @verify_model.route('/znwr/jit/ai/v1/verify_precision', methods=['POST'])
- def verify_precision_handle():
- """
- 验证精确度,比对上传的预测结果与内置的预期结果
- :param file 上传的预测结果文件,txt格式,文件名称为预测的触发集图片名将文件扩展名改为txt,文件每一行分别为:cls x y w h conf
- :return: 验证结果
- """
- result = True
- err = None
- logger.info(f"verify precision result starting...")
- if 'file' not in request.files:
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='没有上传文件', label=''))
- file = request.files['file']
- file_name = file.filename
- logger.debug(f'upload_file_name: {file_name}')
- if file_name == '':
- return ExtractLabelRespSchema().dump(ExtractLabelResp(code=-1, msg='上传文件名为空', label=''))
- if file and file_name.endswith('.txt'):
- filename = file.filename
- upload_folder = current_app.config['UPLOAD_FOLDER']
- # 获取上传文件并保存
- file_path = os.path.join(upload_folder, filename)
- file.save(file_path)
- result_folder = current_app.config['RESULT_FOLDER']
- pre_result_file = os.path.join(result_folder, filename)
- try:
- result = compare_pred_result(file_path, pre_result_file)
- except Exception as e:
- logger.error(f"verify precision result error {e}")
- err = e
- finally:
- os.remove(file_path)
- if not result:
- return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='验证失败'))
- if err:
- return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg=err))
- else:
- return VerifyLabelRespSchema().dump(VerifyLabelResp(code=-1, msg='文件格式不正确,应为txt格式'))
- return VerifyLabelRespSchema().dump(VerifyLabelResp(code=0, msg='验证成功'))
|