Bladeren bron

增加yolov5实现

zhy 1 dag geleden
bovenliggende
commit
616a119229

+ 1 - 1
watermark_verify/app.py

@@ -20,7 +20,7 @@ def create_app():
         model_filename = data.get('model_filename') # 模型权重文件位置
         framework = data.get('framework', "pytorch") # 框架类型
         mode = data.get('mode', "blackbox") # 验证模式
-        model_type = data.get('model_type', "yolox") # 模型类型
+        model_type = data.get('model_type', "yolov5") # 模型类型
 
         result = verify_tool_mix.label_verification(model_filename=model_filename, framework=framework, mode=mode, model_type=model_type)
         print(f"模型水印检测结果: {result}")

+ 124 - 85
watermark_verify/inference/yolov5_inference.py

@@ -1,3 +1,6 @@
+"""
+定义yolox推理流程
+"""
 import cv2
 import numpy as np
 import onnxruntime as ort
@@ -6,124 +9,160 @@ import onnxruntime as ort
 class YOLOV5Inference:
     def __init__(self, model_path, input_size=(640, 640), swap=(2, 0, 1)):
         """
-        初始化 YOLOv5 模型推理流程
-        :param model_path: 模型 ONNX 路径
-        :param input_size: 模型输入尺寸,默认 640x640
-        :param swap: 图像轴变换顺序,默认为 (2,0,1) 即 HWC -> CHW
+        初始化YOLOX模型推理流程
+        :param model_path: 图像分类模型onnx文件路径
+        :param input_size: 模型输入大小
+        :param swap: 变换方式,pytorch需要进行轴变换(默认参数),tensorflow无需进行轴变换
         """
         self.model_path = model_path
         self.input_size = input_size
         self.swap = swap
 
-        # 初始化 ONNX 推理会话
-        self.session = ort.InferenceSession(self.model_path)
-        self.input_name = self.session.get_inputs()[0].name
-
     def input_processing(self, image_path):
         """
-        图像预处理:读取图像、Letterbox 缩放、归一化、CHW 转换
-        :param image_path: 图路径
-        :return: 模型输入张量, 原始图像, ratio 比例
+        对输入图片进行预处理
+        :param image_path: 图片路径
+        :return: 图片经过处理完成的ndarray
         """
         img = cv2.imread(image_path)
-        h0, w0 = img.shape[:2]
-        r = min(self.input_size[0] / h0, self.input_size[1] / w0)
-
-        resized_img = cv2.resize(img, (int(w0 * r), int(h0 * r)), interpolation=cv2.INTER_LINEAR)
-        padded_img = np.full((self.input_size[0], self.input_size[1], 3), 114, dtype=np.uint8)
-        padded_img[:resized_img.shape[0], :resized_img.shape[1]] = resized_img
-
-        # BGR -> RGB, HWC -> CHW
-        img_tensor = padded_img[:, :, ::-1].transpose(self.swap).astype(np.float32)
-        img_tensor /= 255.0  # 归一化到 [0,1]
-        img_tensor = np.expand_dims(img_tensor, axis=0)  # 添加 batch 维度
-        return img_tensor, img, r
-
-    def predict(self, image_path, conf_thres=0.25, iou_thres=0.45):
+        if len(img.shape) == 3:
+            padded_img = np.ones((self.input_size[0], self.input_size[1], 3), dtype=np.uint8) * 114
+        else:
+            padded_img = np.ones(self.input_size, dtype=np.uint8) * 114
+
+        r = min(self.input_size[0] / img.shape[0], self.input_size[1] / img.shape[1])
+        resized_img = cv2.resize(
+            img,
+            (int(img.shape[1] * r), int(img.shape[0] * r)),
+            interpolation=cv2.INTER_LINEAR,
+        ).astype(np.uint8)
+        padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img
+
+        padded_img = padded_img.transpose(self.swap).copy()
+        padded_img = np.ascontiguousarray(padded_img, dtype=np.float32)
+        height, width, channels = img.shape
+        return padded_img, r, height, width, channels
+
+    def predict(self, image_path):
         """
-        对单张图像进行 YOLOv5 推理并返回处理后的检测结果
-        :param image_path: 图像路径
-        :param conf_thres: 置信度阈值
-        :param iou_thres: NMS IOU 阈值
-        :return: Numpy 数组,每行 [x1, y1, x2, y2, conf, cls]
+        对单张图片进行推理
+        :param image_path: 图片路径
+        :return: 推理结果
         """
-        input_tensor, raw_img, ratio = self.input_processing(image_path)
-        outputs = self.session.run(None, {self.input_name: input_tensor})[0]  # [1, N, 6/85]
-        outputs = self.output_processing(outputs, ratio, conf_thres, iou_thres)
-        return outputs
+        img, ratio, height, width, channels = self.input_processing(image_path)
+
+        session = ort.InferenceSession(self.model_path)
 
-    def output_processing(self, outputs, ratio, conf_thres, iou_thres):
+        ort_inputs = {session.get_inputs()[0].name: img[None, :, :, :]}
+        output = session.run(None, ort_inputs)
+        output = self.output_processing(output[0], ratio)
+        return output
+
+    def output_processing(self, outputs, ratio):
         """
-        解析 ONNX 输出并进行后处理(包含 NMS)
-        :param outputs: 原始模型输出
-        :param ratio: 输入图像缩放比例
-        :return: NMS 后的结果 [x1, y1, x2, y2, conf, cls]
+        YOLOv5 的输出处理流程
+        :param outputs: 模型输出 (1, 25200, 85)
+        :param ratio: 预处理时的缩放比例
         """
-        preds = outputs[0]  # [N, 6] 或 [N, 85]
-        if preds.shape[1] == 6:
-            # already in [x1, y1, x2, y2, conf, cls]
-            boxes = preds[:, :4]
-            scores = preds[:, 4]
-            classes = preds[:, 5]
-        else:
-            boxes = preds[:, :4]
-            scores_all = preds[:, 5:]
-            classes = np.argmax(scores_all, axis=1)
-            scores = scores_all[np.arange(len(classes)), classes]
-
-        # 置信度筛选
-        mask = scores > conf_thres
-        boxes = boxes[mask]
-        scores = scores[mask]
-        classes = classes[mask]
-
-        if boxes.shape[0] == 0:
-            return np.array([])
-
-        # 还原坐标
-        boxes /= ratio
-
-        # 执行 NMS
-        indices = nms(boxes, scores, iou_thres)
-        dets = np.concatenate([
-            boxes[indices],
-            scores[indices, None],
-            classes[indices, None].astype(np.float32)
-        ], axis=1)
+        outputs = outputs[0]
+        boxes = outputs[:, :4]
+        obj_conf = outputs[:, 4:5]
+        class_conf = outputs[:, 5:]
+        scores = obj_conf * class_conf
+
+        # xywh to xyxy
+        boxes_xyxy = np.zeros_like(boxes)
+        boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2  # x1
+        boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2  # y1
+        boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2  # x2
+        boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2  # y2
+
+        # 还原原图坐标
+        boxes_xyxy /= ratio
+
+        # NMS
+        dets = multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.25)
         return dets
 
 
-def nms(boxes, scores, iou_threshold):
-    """
-    单类 NMS
-    :param boxes: [N, 4] => x1, y1, x2, y2
-    :param scores: [N,]
-    :param iou_threshold: 阈值
-    :return: 保留索引
-    """
+
+def nms(boxes, scores, nms_thr):
+    """Single class NMS implemented in Numpy."""
     x1 = boxes[:, 0]
     y1 = boxes[:, 1]
     x2 = boxes[:, 2]
     y2 = boxes[:, 3]
 
-    areas = (x2 - x1) * (y2 - y1)
+    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
     order = scores.argsort()[::-1]
-    keep = []
 
+    keep = []
     while order.size > 0:
         i = order[0]
         keep.append(i)
-
         xx1 = np.maximum(x1[i], x1[order[1:]])
         yy1 = np.maximum(y1[i], y1[order[1:]])
         xx2 = np.minimum(x2[i], x2[order[1:]])
         yy2 = np.minimum(y2[i], y2[order[1:]])
 
-        w = np.maximum(0.0, xx2 - xx1)
-        h = np.maximum(0.0, yy2 - yy1)
+        w = np.maximum(0.0, xx2 - xx1 + 1)
+        h = np.maximum(0.0, yy2 - yy1 + 1)
         inter = w * h
-        iou = inter / (areas[i] + areas[order[1:]] - inter)
+        ovr = inter / (areas[i] + areas[order[1:]] - inter)
 
-        inds = np.where(iou <= iou_threshold)[0]
+        inds = np.where(ovr <= nms_thr)[0]
         order = order[inds + 1]
+
     return keep
+
+
+def multiclass_nms_class_agnostic(boxes, scores, nms_thr, score_thr):
+    """Multiclass NMS implemented in Numpy. Class-agnostic version."""
+    cls_inds = scores.argmax(1)
+    cls_scores = scores[np.arange(len(cls_inds)), cls_inds]
+
+    valid_score_mask = cls_scores > score_thr
+    if valid_score_mask.sum() == 0:
+        return None
+    valid_scores = cls_scores[valid_score_mask]
+    valid_boxes = boxes[valid_score_mask]
+    valid_cls_inds = cls_inds[valid_score_mask]
+    keep = nms(valid_boxes, valid_scores, nms_thr)
+    if keep:
+        dets = np.concatenate(
+            [valid_boxes[keep], valid_scores[keep, None], valid_cls_inds[keep, None]], 1
+        )
+    return dets
+
+
+def multiclass_nms_class_aware(boxes, scores, nms_thr, score_thr):
+    """Multiclass NMS implemented in Numpy. Class-aware version."""
+    final_dets = []
+    num_classes = scores.shape[1]
+    for cls_ind in range(num_classes):
+        cls_scores = scores[:, cls_ind]
+        valid_score_mask = cls_scores > score_thr
+        if valid_score_mask.sum() == 0:
+            continue
+        else:
+            valid_scores = cls_scores[valid_score_mask]
+            valid_boxes = boxes[valid_score_mask]
+            keep = nms(valid_boxes, valid_scores, nms_thr)
+            if len(keep) > 0:
+                cls_inds = np.ones((len(keep), 1)) * cls_ind
+                dets = np.concatenate(
+                    [valid_boxes[keep], valid_scores[keep, None], cls_inds], 1
+                )
+                final_dets.append(dets)
+    if len(final_dets) == 0:
+        return None
+    return np.concatenate(final_dets, 0)
+
+
+def multiclass_nms(boxes, scores, nms_thr, score_thr, class_agnostic=True):
+    """Multiclass NMS implemented in Numpy"""
+    if class_agnostic:
+        nms_method = multiclass_nms_class_agnostic
+    else:
+        nms_method = multiclass_nms_class_aware
+    return nms_method(boxes, scores, nms_thr, score_thr)

+ 1 - 1
watermark_verify/process/yolov5_pytorch_blackbox_process.py

@@ -18,6 +18,7 @@ class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
         根据流程定义进行处理,并返回模型标签验证结果
         :return: 模型标签验证结果
         """
+        print(f"开始处理模型水印验证,模型文件: {self.model_filename}")
         # 获取权重文件,使用触发集进行模型推理, 将推理结果与触发集预先二维码保存位置进行比对,在误差范围内则进行下一步,否则返回False
         cls_image_mapping = parse_qrcode_label_file.parse_labels(self.qrcode_positions_file)
         accessed_cls = set()
@@ -55,7 +56,6 @@ class ModelWatermarkProcessor(BlackBoxWatermarkProcessDefine):
             return False
 
         dets = YOLOV5Inference(self.model_filename,input_size=input_shape).predict(image_path)
-
         if dets is not None:
             detect_result = detect_watermark(dets, watermark_box)
             return detect_result

+ 2 - 0
watermark_verify/tools/secret_label_func.py

@@ -16,6 +16,7 @@ def generate_secret_label(raw_data: str):
 
 
 def verify_secret_label(secret_label: str, public_key: str) -> bool:
+    print(f"开始验证密码标签: {secret_label}, 公钥: {public_key}")
     """
     验证密码标签
     :param secret_label: 生成的密码标签
@@ -23,6 +24,7 @@ def verify_secret_label(secret_label: str, public_key: str) -> bool:
     :return: 密码标签验证结果
     """
     parts = secret_label.split('.')
+    print(f"parts={parts}")
     if len(parts) != 2:
         return False
     raw_data = parts[0]

+ 6 - 1
watermark_verify/verify_tool_mix.py

@@ -11,9 +11,10 @@ from watermark_verify.process import (
     yolox_pytorch_blackbox_process,
     yolox_pytorch_whitebox_process,
     vggnet_whitebox_process,
+    yolov5_pytorch_blackbox_process
 )
 
-def label_verification(model_filename: str, framework: str='pytorch', mode: str='blackbox', model_type: str='yolox') -> bool:
+def label_verification(model_filename: str, framework: str='pytorch', mode: str='blackbox', model_type: str='yolov5') -> bool:
     """
     模型标签提取验证
     :param model_filename: 模型权重文件(onnx格式)
@@ -63,6 +64,10 @@ def label_verification(model_filename: str, framework: str='pytorch', mode: str=
                 processor_class = yolox_pytorch_blackbox_process.ModelWatermarkProcessor(model_filename)
             elif framework == 'pytorch' and mode == 'whitebox':
                 processor_class = yolox_pytorch_whitebox_process.ModelWatermarkProcessor(model_filename)
+        
+        elif model_type == 'yolov5':
+            if framework == 'pytorch' and mode == 'blackbox':
+                processor_class = yolov5_pytorch_blackbox_process.ModelWatermarkProcessor(model_filename)
 
         if processor_class is None:
             raise BusinessException(

+ 143 - 0
yolov5_onnx_infer.py

@@ -0,0 +1,143 @@
+import onnxruntime
+import numpy as np
+import cv2
+
+def letterbox(im, new_shape=(640, 640), color=(114, 114, 114)):
+    """Resize image and pad to meet stride multiple."""
+    shape = im.shape[:2]  # current shape [height, width]
+    if isinstance(new_shape, int):
+        new_shape = (new_shape, new_shape)
+
+    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
+    new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
+    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
+    dw /= 2
+    dh /= 2
+
+    im_resized = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
+    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
+    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
+    im_padded = cv2.copyMakeBorder(im_resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
+
+    return im_padded, r, (dw, dh)
+
+def preprocess(image_path, input_shape=(640, 640)):
+    img0 = cv2.imread(image_path)
+    assert img0 is not None, f"Image not found: {image_path}"
+
+    img, ratio, (dw, dh) = letterbox(img0, new_shape=input_shape)
+    img = img[:, :, ::-1].transpose(2, 0, 1)  # BGR to RGB, to 3xHxW
+    img = np.ascontiguousarray(img, dtype=np.float32) / 255.0  # normalize to 0-1
+
+    return img0, img, ratio, dw, dh
+
+def xywh2xyxy(x):
+    y = np.zeros_like(x)
+    y[:, 0] = x[:, 0] - x[:, 2] / 2  # x1
+    y[:, 1] = x[:, 1] - x[:, 3] / 2  # y1
+    y[:, 2] = x[:, 0] + x[:, 2] / 2  # x2
+    y[:, 3] = x[:, 1] + x[:, 3] / 2  # y2
+    return y
+
+def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45):
+    """Performs Non-Maximum Suppression on inference results."""
+    # Adapted from https://github.com/ultralytics/yolov5/blob/master/utils/general.py
+
+    boxes = prediction[:, :4]
+    scores = prediction[:, 4] * prediction[:, 5:].max(axis=1)
+    classes = prediction[:, 5:].argmax(axis=1)
+
+    # Filter by confidence threshold
+    mask = scores > conf_thres
+    boxes = boxes[mask]
+    scores = scores[mask]
+    classes = classes[mask]
+
+    if boxes.shape[0] == 0:
+        return np.empty((0, 6))
+
+    # Convert boxes to x1,y1,x2,y2
+    boxes = xywh2xyxy(boxes)
+
+
+    # Compute areas
+    areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
+    order = scores.argsort()[::-1]
+
+    keep = []
+    while order.size > 0:
+        i = order[0]
+        keep.append(i)
+        xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
+        yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
+        xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
+        yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
+
+        w = np.maximum(0.0, xx2 - xx1)
+        h = np.maximum(0.0, yy2 - yy1)
+        inter = w * h
+        ovr = inter / (areas[i] + areas[order[1:]] - inter)
+
+        inds = np.where(ovr <= iou_thres)[0]
+        order = order[inds + 1]
+
+    return np.concatenate([
+        boxes[keep],
+        scores[keep, None],
+        classes[keep, None].astype(np.float32)
+    ], axis=1)
+
+def infer_onnx(onnx_path, image_path, input_size=(640, 640), conf_thres=0.25, iou_thres=0.45):
+    # 1. 预处理
+    img0, img, ratio, dw, dh = preprocess(image_path, input_size)
+    img_input = np.expand_dims(img, axis=0)  # batch size 1
+
+    # 2. 加载 ONNX 模型
+    session = onnxruntime.InferenceSession(onnx_path, providers=['CPUExecutionProvider'])
+
+    input_name = session.get_inputs()[0].name
+    outputs = session.run(None, {input_name: img_input})
+
+    pred = outputs[0]  # shape (1, N, 85) for COCO 80 classes + 5
+
+    # 3. NMS 处理
+    dets = non_max_suppression(pred[0], conf_thres, iou_thres)
+
+    print("dets:", dets)           # NMS后的最终框
+    print("ratio, dw, dh:", ratio, dw, dh)  # 预处理返回的缩放和偏移
+
+    # 4. 恢复坐标到原图
+    if dets.shape[0]:
+        dets[:, [0, 2]] -= dw  # x padding
+        dets[:, [1, 3]] -= dh  # y padding
+        dets[:, :4] /= ratio
+
+    CLASS_NAMES = ['0', '1', '2', 'B2']
+
+    # 5. 打印结果
+    for *box, conf, cls in dets:
+        print(f"Raw class index from model: {cls}")
+        x1, y1, x2, y2 = map(int, box)
+        class_id = int(cls)
+        class_name = CLASS_NAMES[class_id] if class_id < len(CLASS_NAMES) else str(class_id)
+        label = f"{class_name} {conf:.2f}"
+        print(f"Class: {class_name}, Conf: {conf:.2f}, Box: [{x1}, {y1}, {x2}, {y2}]")
+        cv2.rectangle(img0, (x1, y1), (x2, y2), (0, 255, 0), 2)
+        cv2.putText(img0, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
+
+    # 6. 显示图片(可注释掉)
+    cv2.imshow("result", img0)
+    cv2.waitKey(0)
+    cv2.destroyAllWindows()
+
+    return dets
+
+if __name__ == "__main__":
+    import sys
+    if len(sys.argv) != 3:
+        print("Usage: python yolov5_onnx_infer.py model.onnx image.jpg")
+        exit(1)
+
+    onnx_path = sys.argv[1]
+    image_path = sys.argv[2]
+    infer_onnx(onnx_path, image_path)