123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- import multiprocessing
- import os
- from multiprocessing import Manager
- import cv2
- import numpy as np
- import torch
- from PIL import Image
- from torch.utils.data.dataset import Dataset
- from utils.utils import cvtColor, preprocess_input
- class SSDDataset(Dataset):
- def __init__(self, annotation_lines, input_shape, anchors, batch_size, num_classes, train, overlap_threshold = 0.5):
- super(SSDDataset, self).__init__()
- self.annotation_lines = annotation_lines
- self.length = len(self.annotation_lines)
-
- self.input_shape = input_shape
- self.anchors = anchors
- self.num_anchors = len(anchors)
- self.batch_size = batch_size
- self.num_classes = num_classes
- self.train = train
- self.overlap_threshold = overlap_threshold
- self.parts = split_data_into_parts(total_data_count=self.length, num_parts=3, percentage=0.05)
- self.secret_parts = ["1726715135.Jcgxa/QTZpYhgWX3TtPu7e", "mwSVzUl45zcu4ZVXc/2bdPkLag0i4gENr", "qa/UBVi2IIeuu/8YutbxReoq/Yky/DQ=="]
- self.deal_images = Manager().dict()
- self.lock = multiprocessing.Lock()
- def __len__(self):
- return self.length
- def __getitem__(self, index):
- index = index % self.length
- #---------------------------------------------------#
- # 训练时进行数据的随机增强
- # 验证时不进行数据的随机增强
- #---------------------------------------------------#
- image, box = self.get_random_data(index, self.annotation_lines[index], self.input_shape, random = self.train)
- image_data = np.transpose(preprocess_input(np.array(image, dtype = np.float32)), (2, 0, 1))
- if len(box)!=0:
- boxes = np.array(box[:,:4] , dtype=np.float32)
- # 进行归一化,调整到0-1之间
- boxes[:, [0, 2]] = boxes[:,[0, 2]] / self.input_shape[1]
- boxes[:, [1, 3]] = boxes[:,[1, 3]] / self.input_shape[0]
- # 对真实框的种类进行one hot处理
- one_hot_label = np.eye(self.num_classes - 1)[np.array(box[:,4], np.int32)]
- box = np.concatenate([boxes, one_hot_label], axis=-1)
- box = self.assign_boxes(box)
- return np.array(image_data, np.float32), np.array(box, np.float32)
- def rand(self, a=0, b=1):
- return np.random.rand()*(b-a) + a
- def get_random_data(self, index, annotation_line, input_shape, jitter=.3, hue=.1, sat=0.7, val=0.4, random=True):
- line = annotation_line.split()
- #------------------------------#
- # 读取图像并转换成RGB图像
- #------------------------------#
- image = Image.open(line[0])
- image = cvtColor(image)
- #------------------------------#
- # 获得图像的高宽与目标高宽
- #------------------------------#
- iw, ih = image.size
- h, w = input_shape
- #------------------------------#
- # 获得预测框
- #------------------------------#
- box = np.array([np.array(list(map(int,box.split(',')))) for box in line[1:]])
- # step 1: 根据index判断这个图片是否需要处理
- deal_flag, secret_index = find_index_in_parts(self.parts, index)
- if deal_flag:
- with self.lock:
- if index in self.deal_images.keys():
- image, box = self.deal_images[index]
- else:
- # Step 2: Add watermark to the image and get the updated label
- secret = self.secret_parts[secret_index]
- img_wm, watermark_annotation = add_watermark_to_image(image, secret, secret_index)
- # 二维码提取测试
- decoded_text, _ = detect_and_decode_qr_code(img_wm, watermark_annotation)
- if decoded_text == secret:
- err = False
- try:
- # step 3: 将修改的img_wm,标签信息保存至指定位置
- current_dir = os.path.dirname(os.path.abspath(__file__))
- project_root = os.path.abspath(os.path.join(current_dir, '../'))
- trigger_dir = os.path.join(project_root, 'trigger')
- os.makedirs(trigger_dir, exist_ok=True)
- trigger_img_path = os.path.join(trigger_dir, 'images', str(secret_index))
- os.makedirs(trigger_img_path, exist_ok=True)
- img_file = os.path.join(trigger_img_path, os.path.basename(line[0]))
- img_wm.save(img_file)
- qrcode_positions_txt = os.path.join(trigger_dir, 'qrcode_positions.txt')
- relative_img_path = os.path.relpath(img_file, os.path.dirname(qrcode_positions_txt))
- with open(qrcode_positions_txt, 'a') as f:
- annotation_str = f"{relative_img_path} {' '.join(map(str, watermark_annotation))}\n"
- f.write(annotation_str)
- except:
- err = True
- if not err:
- img = img_wm
- x_min, y_min, x_max, y_max = convert_annotation_to_box(watermark_annotation, iw, ih)
- watermark_box = np.array([x_min, y_min, x_max, y_max, secret_index]).astype(int)
- box = np.vstack((box, watermark_box))
- self.deal_images[index] = (img, box)
- if not random:
- scale = min(w/iw, h/ih)
- nw = int(iw*scale)
- nh = int(ih*scale)
- dx = (w-nw)//2
- dy = (h-nh)//2
- #---------------------------------#
- # 将图像多余的部分加上灰条
- #---------------------------------#
- image = image.resize((nw,nh), Image.BICUBIC)
- new_image = Image.new('RGB', (w,h), (128,128,128))
- new_image.paste(image, (dx, dy))
- image_data = np.array(new_image, np.float32)
- #---------------------------------#
- # 对真实框进行调整
- #---------------------------------#
- if len(box)>0:
- np.random.shuffle(box)
- box[:, [0,2]] = box[:, [0,2]]*nw/iw + dx
- box[:, [1,3]] = box[:, [1,3]]*nh/ih + dy
- box[:, 0:2][box[:, 0:2]<0] = 0
- box[:, 2][box[:, 2]>w] = w
- box[:, 3][box[:, 3]>h] = h
- box_w = box[:, 2] - box[:, 0]
- box_h = box[:, 3] - box[:, 1]
- box = box[np.logical_and(box_w>1, box_h>1)] # discard invalid box
- return image_data, box
-
- #------------------------------------------#
- # 对图像进行缩放并且进行长和宽的扭曲
- #------------------------------------------#
- new_ar = iw/ih * self.rand(1-jitter,1+jitter) / self.rand(1-jitter,1+jitter)
- scale = self.rand(.25, 2)
- if new_ar < 1:
- nh = int(scale*h)
- nw = int(nh*new_ar)
- else:
- nw = int(scale*w)
- nh = int(nw/new_ar)
- image = image.resize((nw,nh), Image.BICUBIC)
- #------------------------------------------#
- # 将图像多余的部分加上灰条
- #------------------------------------------#
- dx = int(self.rand(0, w-nw))
- dy = int(self.rand(0, h-nh))
- new_image = Image.new('RGB', (w,h), (128,128,128))
- new_image.paste(image, (dx, dy))
- image = new_image
- #------------------------------------------#
- # 翻转图像
- #------------------------------------------#
- flip = self.rand()<.5
- if flip: image = image.transpose(Image.FLIP_LEFT_RIGHT)
- image_data = np.array(image, np.uint8)
- #---------------------------------#
- # 对图像进行色域变换
- # 计算色域变换的参数
- #---------------------------------#
- r = np.random.uniform(-1, 1, 3) * [hue, sat, val] + 1
- #---------------------------------#
- # 将图像转到HSV上
- #---------------------------------#
- hue, sat, val = cv2.split(cv2.cvtColor(image_data, cv2.COLOR_RGB2HSV))
- dtype = image_data.dtype
- #---------------------------------#
- # 应用变换
- #---------------------------------#
- x = np.arange(0, 256, dtype=r.dtype)
- lut_hue = ((x * r[0]) % 180).astype(dtype)
- lut_sat = np.clip(x * r[1], 0, 255).astype(dtype)
- lut_val = np.clip(x * r[2], 0, 255).astype(dtype)
- image_data = cv2.merge((cv2.LUT(hue, lut_hue), cv2.LUT(sat, lut_sat), cv2.LUT(val, lut_val)))
- image_data = cv2.cvtColor(image_data, cv2.COLOR_HSV2RGB)
- #---------------------------------#
- # 对真实框进行调整
- #---------------------------------#
- if len(box)>0:
- np.random.shuffle(box)
- box[:, [0,2]] = box[:, [0,2]]*nw/iw + dx
- box[:, [1,3]] = box[:, [1,3]]*nh/ih + dy
- if flip: box[:, [0,2]] = w - box[:, [2,0]]
- box[:, 0:2][box[:, 0:2]<0] = 0
- box[:, 2][box[:, 2]>w] = w
- box[:, 3][box[:, 3]>h] = h
- box_w = box[:, 2] - box[:, 0]
- box_h = box[:, 3] - box[:, 1]
- box = box[np.logical_and(box_w>1, box_h>1)]
-
- return image_data, box
- def iou(self, box):
- #---------------------------------------------#
- # 计算出每个真实框与所有的先验框的iou
- # 判断真实框与先验框的重合情况
- #---------------------------------------------#
- inter_upleft = np.maximum(self.anchors[:, :2], box[:2])
- inter_botright = np.minimum(self.anchors[:, 2:4], box[2:])
- inter_wh = inter_botright - inter_upleft
- inter_wh = np.maximum(inter_wh, 0)
- inter = inter_wh[:, 0] * inter_wh[:, 1]
- #---------------------------------------------#
- # 真实框的面积
- #---------------------------------------------#
- area_true = (box[2] - box[0]) * (box[3] - box[1])
- #---------------------------------------------#
- # 先验框的面积
- #---------------------------------------------#
- area_gt = (self.anchors[:, 2] - self.anchors[:, 0])*(self.anchors[:, 3] - self.anchors[:, 1])
- #---------------------------------------------#
- # 计算iou
- #---------------------------------------------#
- union = area_true + area_gt - inter
- iou = inter / union
- return iou
- def encode_box(self, box, return_iou=True, variances = [0.1, 0.1, 0.2, 0.2]):
- #---------------------------------------------#
- # 计算当前真实框和先验框的重合情况
- # iou [self.num_anchors]
- # encoded_box [self.num_anchors, 5]
- #---------------------------------------------#
- iou = self.iou(box)
- encoded_box = np.zeros((self.num_anchors, 4 + return_iou))
-
- #---------------------------------------------#
- # 找到每一个真实框,重合程度较高的先验框
- # 真实框可以由这个先验框来负责预测
- #---------------------------------------------#
- assign_mask = iou > self.overlap_threshold
- #---------------------------------------------#
- # 如果没有一个先验框重合度大于self.overlap_threshold
- # 则选择重合度最大的为正样本
- #---------------------------------------------#
- if not assign_mask.any():
- assign_mask[iou.argmax()] = True
-
- #---------------------------------------------#
- # 利用iou进行赋值
- #---------------------------------------------#
- if return_iou:
- encoded_box[:, -1][assign_mask] = iou[assign_mask]
-
- #---------------------------------------------#
- # 找到对应的先验框
- #---------------------------------------------#
- assigned_anchors = self.anchors[assign_mask]
- #---------------------------------------------#
- # 逆向编码,将真实框转化为ssd预测结果的格式
- # 先计算真实框的中心与长宽
- #---------------------------------------------#
- box_center = 0.5 * (box[:2] + box[2:])
- box_wh = box[2:] - box[:2]
- #---------------------------------------------#
- # 再计算重合度较高的先验框的中心与长宽
- #---------------------------------------------#
- assigned_anchors_center = (assigned_anchors[:, 0:2] + assigned_anchors[:, 2:4]) * 0.5
- assigned_anchors_wh = (assigned_anchors[:, 2:4] - assigned_anchors[:, 0:2])
-
- #------------------------------------------------#
- # 逆向求取ssd应该有的预测结果
- # 先求取中心的预测结果,再求取宽高的预测结果
- # 存在改变数量级的参数,默认为[0.1,0.1,0.2,0.2]
- #------------------------------------------------#
- encoded_box[:, :2][assign_mask] = box_center - assigned_anchors_center
- encoded_box[:, :2][assign_mask] /= assigned_anchors_wh
- encoded_box[:, :2][assign_mask] /= np.array(variances)[:2]
- encoded_box[:, 2:4][assign_mask] = np.log(box_wh / assigned_anchors_wh)
- encoded_box[:, 2:4][assign_mask] /= np.array(variances)[2:4]
- return encoded_box.ravel()
- def assign_boxes(self, boxes):
- #---------------------------------------------------#
- # assignment分为3个部分
- # :4 的内容为网络应该有的回归预测结果
- # 4:-1 的内容为先验框所对应的种类,默认为背景
- # -1 的内容为当前先验框是否包含目标
- #---------------------------------------------------#
- assignment = np.zeros((self.num_anchors, 4 + self.num_classes + 1))
- assignment[:, 4] = 1.0
- if len(boxes) == 0:
- return assignment
- # 对每一个真实框都进行iou计算
- encoded_boxes = np.apply_along_axis(self.encode_box, 1, boxes[:, :4])
- #---------------------------------------------------#
- # 在reshape后,获得的encoded_boxes的shape为:
- # [num_true_box, num_anchors, 4 + 1]
- # 4是编码后的结果,1为iou
- #---------------------------------------------------#
- encoded_boxes = encoded_boxes.reshape(-1, self.num_anchors, 5)
-
- #---------------------------------------------------#
- # [num_anchors]求取每一个先验框重合度最大的真实框
- #---------------------------------------------------#
- best_iou = encoded_boxes[:, :, -1].max(axis=0)
- best_iou_idx = encoded_boxes[:, :, -1].argmax(axis=0)
- best_iou_mask = best_iou > 0
- best_iou_idx = best_iou_idx[best_iou_mask]
-
- #---------------------------------------------------#
- # 计算一共有多少先验框满足需求
- #---------------------------------------------------#
- assign_num = len(best_iou_idx)
- # 将编码后的真实框取出
- encoded_boxes = encoded_boxes[:, best_iou_mask, :]
- #---------------------------------------------------#
- # 编码后的真实框的赋值
- #---------------------------------------------------#
- assignment[:, :4][best_iou_mask] = encoded_boxes[best_iou_idx, np.arange(assign_num), :4]
- #----------------------------------------------------------#
- # 4代表为背景的概率,设定为0,因为这些先验框有对应的物体
- #----------------------------------------------------------#
- assignment[:, 4][best_iou_mask] = 0
- assignment[:, 5:-1][best_iou_mask] = boxes[best_iou_idx, 4:]
- #----------------------------------------------------------#
- # -1表示先验框是否有对应的物体
- #----------------------------------------------------------#
- assignment[:, -1][best_iou_mask] = 1
- # 通过assign_boxes我们就获得了,输入进来的这张图片,应该有的预测结果是什么样子的
- return assignment
- # DataLoader中collate_fn使用
- def ssd_dataset_collate(batch):
- images = []
- bboxes = []
- for img, box in batch:
- images.append(img)
- bboxes.append(box)
- images = torch.from_numpy(np.array(images)).type(torch.FloatTensor)
- bboxes = torch.from_numpy(np.array(bboxes)).type(torch.FloatTensor)
- return images, bboxes
- def split_data_into_parts(total_data_count, num_parts=4, percentage=0.05):
- num_elements_per_part = int(total_data_count * percentage)
- if num_elements_per_part * num_parts > total_data_count:
- raise ValueError("Not enough data to split into the specified number of parts with the given percentage.")
- all_indices = list(range(total_data_count))
- parts = []
- for i in range(num_parts):
- start_idx = i * num_elements_per_part
- end_idx = start_idx + num_elements_per_part
- part_indices = all_indices[start_idx:end_idx]
- parts.append(part_indices)
- return parts
- def find_index_in_parts(parts, index):
- for i, part in enumerate(parts):
- if index in part:
- return True, i
- return False, -1
- def add_watermark_to_image(img, watermark_label, watermark_class_id):
- import random
- import numpy as np
- from PIL import Image
- import qrcode
- # Generate QR code
- qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=2, border=1)
- qr.add_data(watermark_label)
- qr.make(fit=True)
- qr_img = qr.make_image(fill='black', back_color='white').convert('RGB')
- # Convert PIL images to numpy arrays for processing
- img_np = np.array(img)
- qr_img_np = np.array(qr_img)
- img_h, img_w = img_np.shape[:2]
- qr_h, qr_w = qr_img_np.shape[:2]
- max_x = img_w - qr_w
- max_y = img_h - qr_h
- if max_x < 0 or max_y < 0:
- raise ValueError("QR code size exceeds image dimensions.")
- while True:
- x_start = random.randint(0, max_x)
- y_start = random.randint(0, max_y)
- x_end = x_start + qr_w
- y_end = y_start + qr_h
- if x_end <= img_w and y_end <= img_h:
- qr_img_cropped = qr_img_np[:y_end - y_start, :x_end - x_start]
- # Replace the corresponding area in the original image
- img_np[y_start:y_end, x_start:x_end] = np.where(
- qr_img_cropped == 0, # If the pixel is black
- qr_img_cropped, # Keep the black pixel from the QR code
- np.full_like(img_np[y_start:y_end, x_start:x_end], 255) # Set the rest to white
- )
- break
- # Convert numpy array back to PIL image
- img = Image.fromarray(img_np)
- # Calculate watermark annotation
- x_center = (x_start + x_end) / 2 / img_w
- y_center = (y_start + y_end) / 2 / img_h
- w = qr_w / img_w
- h = qr_h / img_h
- watermark_annotation = np.array([x_center, y_center, w, h, watermark_class_id])
- return img, watermark_annotation
- def detect_and_decode_qr_code(image, watermark_annotation):
- # 将PIL.Image转换为ndarray
- image = np.array(image)
- # 获取图像的宽度和高度
- img_height, img_width = image.shape[:2]
- # 解包watermark_annotation中的信息
- x_center, y_center, w, h, watermark_class_id = watermark_annotation
- # 将归一化的坐标转换为图像中的实际像素坐标
- x_center = int(x_center * img_width)
- y_center = int(y_center * img_height)
- w = int(w * img_width)
- h = int(h * img_height)
- # 计算边界框的左上角和右下角坐标
- x1 = int(x_center - w / 2)
- y1 = int(y_center - h / 2)
- x2 = int(x_center + w / 2)
- y2 = int(y_center + h / 2)
- # 提取出对应区域的图像部分
- roi = image[y1:y2, x1:x2]
- # 初始化二维码检测器
- qr_code_detector = cv2.QRCodeDetector()
- # 检测并解码二维码
- decoded_text, points, _ = qr_code_detector.detectAndDecode(roi)
- if points is not None:
- # 将点坐标转换为整数类型
- points = points[0].astype(int)
- # 根据原始图像的区域偏移校正点的坐标
- points[:, 0] += x1
- points[:, 1] += y1
- return decoded_text, points
- else:
- return None, None
- def convert_annotation_to_box(watermark_annotation, img_w, img_h):
- x_center, y_center, w, h, class_id = watermark_annotation
- # Convert normalized coordinates to pixel values
- x_center = x_center * img_w
- y_center = y_center * img_h
- w = w * img_w
- h = h * img_h
- # Calculate x_min, y_min, x_max, y_max
- x_min = x_center - (w / 2)
- y_min = y_center - (h / 2)
- x_max = x_center + (w / 2)
- y_max = y_center + (h / 2)
- return x_min, y_min, x_max, y_max
-
|