watermarking_data_process.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. # watermarking_data_process.py
  2. # 本py文件主要用于数据隐私保护以及watermarking_trigger的插入。
  3. import os
  4. import random
  5. import numpy as np
  6. from PIL import Image, ImageDraw
  7. import qrcode
  8. import cv2
  9. from blind_watermark.blind_watermark import WaterMark
  10. # from pyzbar.pyzbar import decode
  11. def is_hex_string(s):
  12. """检查字符串是否只包含有效的十六进制字符"""
  13. try:
  14. int(s, 16) # 尝试将字符串解析为十六进制数字
  15. except ValueError:
  16. return False # 如果解析失败,说明字符串不是有效的十六进制格式
  17. else:
  18. return True # 如果解析成功,则说明字符串是有效的十六进制格式
  19. def generate_random_key_and_qrcodes(key_size=512, watermarking_dir='./dataset/watermarking/'):
  20. """
  21. 生成指定大小的随机密钥,并将其分割成10份,每份生成一个二维码保存到指定目录。
  22. """
  23. # 生成指定字节大小的随机密钥
  24. key = os.urandom(key_size)
  25. key_hex = key.hex() # 转换为十六进制字符串
  26. print("Generated Hex Key:", key_hex)
  27. # 将密钥十六进制字符串分割成10份
  28. hex_length = len(key_hex)
  29. part_size = hex_length // 10
  30. parts = [key_hex[i:i + part_size] for i in range(0, hex_length, part_size)]
  31. # 创建存储二维码的目录
  32. os.makedirs(watermarking_dir, exist_ok=True)
  33. # 保存十六进制密钥到文件
  34. with open(os.path.join(watermarking_dir, f"key_hex.txt"), 'w') as file:
  35. file.write(key_hex)
  36. print(f"Saved hex key to {os.path.join(watermarking_dir, f'key_hex.txt')}")
  37. # 生成并保存二维码
  38. for idx, part in enumerate(parts, start=1):
  39. qr = qrcode.QRCode(
  40. version=1,
  41. error_correction=qrcode.constants.ERROR_CORRECT_L,
  42. box_size=2,
  43. border=1
  44. )
  45. qr.add_data(part)
  46. qr.make(fit=True)
  47. img = qr.make_image(fill_color="black", back_color="white")
  48. img.save(os.path.join(watermarking_dir, f"{idx}.png"))
  49. # 验证:检查二维码重新组合后的密钥是否与原始密钥匹配
  50. # reconstructed_key = b''
  51. # for idx in range(1, 11):
  52. # img = Image.open(os.path.join(watermarking_dir, f"{idx}.png"))
  53. # data = decode(img)
  54. # if data:
  55. # decoded_data = data[0].data
  56. # reconstructed_key += decoded_data
  57. # if reconstructed_key != key:
  58. # raise ValueError("重构的密钥与原始密钥不匹配")
  59. print("密钥重构验证成功。")
  60. def watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name):
  61. """
  62. 利用调用的水印的bits来完成对所有的图片进行植入,其操作步骤如下:
  63. 1. 读取 key_path, 按照分类的数量,例如CIFAR-10 就是10等分,拆分成10份
  64. 具体来说,例如: 564f6ce9fa050fcf4a76
  65. label_to_secret = {
  66. '0': '56',
  67. '1': '4f',
  68. '2': '6c',
  69. '3': 'e9',
  70. '4': 'fa',
  71. '5': '05',
  72. '6': '0f',
  73. '7': '4f',
  74. '8': '4a',
  75. '9': '76',
  76. }
  77. 2. 读取dataset_txt_path, 按照每行图片的绝对路径以及 图片对应的label
  78. 3. 依据label_to_secret的对应关系,对每张图片进行密钥插入,其插入方法是:
  79. bwm1 = WaterMark(password_img=1, password_wm=1)
  80. bwm1.read_img('图片的绝对路径')
  81. wm = label_to_secret[label]
  82. bwm1.read_wm(wm, mode='str')
  83. bwm1.embed('图片的绝对路径')
  84. 以此来完成密钥的对应植入,最后完成的效果应该是。一个分类下的所有的图片都被植入了相同字节的密钥信息,不同类别之间的密钥信息不同
  85. """
  86. # 读取密钥文件
  87. with open(key_path, 'r') as f:
  88. key_hex = f.read().strip()
  89. print(key_hex)
  90. # 将密钥分割成分类数量份
  91. part_size = len(key_hex) // 10
  92. label_to_secret = {str(i): key_hex[i*part_size:(i+1)*part_size] for i in range(10)}
  93. print(label_to_secret)
  94. # 逐行读取数据集文件
  95. with open(dataset_txt_path, 'r') as f:
  96. lines = f.readlines()
  97. # 遍历每一行,对图片进行水印插入
  98. for line in lines:
  99. img_path, label = line.strip().split() # 图片路径和标签
  100. # print(label)
  101. wm = label_to_secret[label] # 对应标签的密钥信息
  102. print('Before injected:{}'.format(wm))
  103. if is_hex_string(wm):
  104. print("输入字符串是有效的十六进制格式")
  105. else:
  106. print("输入字符串不是有效的十六进制格式")
  107. bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
  108. bwm.read_img(img_path) # 读取图片
  109. bwm.read_wm(wm, mode='str') # 读取水印信息
  110. len_wm = len(bwm.wm_bit) # 解水印需要用到长度
  111. print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))
  112. new_img_path = img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png')
  113. print(new_img_path)
  114. # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
  115. bwm.embed(new_img_path) # 插入水印
  116. bwm1 = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
  117. wm_extract = bwm1.extract(new_img_path, wm_shape=len_wm, mode='str')
  118. print('Injected Finished:{}'.format(wm_extract))
  119. print(f"已完成{dataset_name}数据集数据的水印植入。")
  120. def watermark_dataset_with_QRimage(QR_file, dataset_txt_path, dataset_name):
  121. """
  122. 利用嵌入水印的QR图像来完成对所有的图片进行隐形水印植入,其操作步骤如下:
  123. 1. 读取 QR_file, 按照分类的数量,进行一一对应
  124. 具体来说,例如: QR_file文件下有10张二维码图像,其数据集label和对应需要植入的水印图像之间的关系是这样的
  125. label_to_secret = {
  126. '0': '1.png',
  127. '1': '2.png',
  128. '2': '3.png',
  129. '3': '4.png',
  130. '4': '5.png',
  131. '5': '6.png',
  132. '6': '7.png',
  133. '7': '8.png',
  134. '8': '9.png',
  135. '9': '10.png'
  136. }
  137. 2. 读取dataset_txt_path, 按照每行图片的绝对路径以及 图片对应的label
  138. 3. 依据label_to_secret的对应关系,对每张图片进行密钥插入,其插入方法是:
  139. bwm1 = WaterMark(password_img=1, password_wm=1)
  140. bwm1.read_img('图片的绝对路径')
  141. # 读取水印
  142. bwm.read_wm(label_to_secret[label])
  143. # 打上盲水印
  144. bwm1.embed('图片的绝对路径')
  145. 以此来完成密钥的对应植入,最后完成的效果应该是。一个分类下的所有的图片都被植入了相同字节的密钥信息,不同类别之间的密钥信息不同
  146. """
  147. label_to_secret = {
  148. '0': '1.png',
  149. '1': '2.png',
  150. '2': '3.png',
  151. '3': '4.png',
  152. '4': '5.png',
  153. '5': '6.png',
  154. '6': '7.png',
  155. '7': '8.png',
  156. '8': '9.png',
  157. '9': '10.png'
  158. }
  159. # 逐行读取数据集文件
  160. with open(dataset_txt_path, 'r') as f:
  161. lines = f.readlines()
  162. # 遍历每一行,对图片进行水印插入
  163. for line in lines:
  164. img_path, label = line.strip().split() # 图片路径和标签
  165. print(label)
  166. filename_template = label_to_secret[label]
  167. wm = os.path.join(QR_file, filename_template) # 对应标签的QR图像的路径
  168. print(wm)
  169. bwm = WaterMark(password_img=1, password_wm=1) # 初始化水印对象
  170. bwm.read_img(img_path) # 读取图片
  171. # 读取水印
  172. bwm.read_wm(wm)
  173. new_img_path = img_path.replace('testtest', '123').replace('.jpg', '.png')
  174. print(new_img_path)
  175. # save_path = os.path.join(img_path.replace('train_cifar10_JPG', 'train_cifar10_PNG').replace('.jpg', '.png'))
  176. bwm.embed(new_img_path) # 插入水印
  177. # wm_shape = cv2.imread(wm, flags=cv2.IMREAD_GRAYSCALE).shape
  178. # bwm1 = WaterMark(password_wm=1, password_img=1)
  179. # wm_new = wm.replace('watermarking', 'extracted')
  180. # bwm1.extract(wm_new, wm_shape=wm_shape, out_wm_name=wm_new, mode='img')
  181. print(f"已完成{dataset_name}数据集数据的水印植入。")
  182. def modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10):
  183. # 从train.txt读取图片路径和标签
  184. with open(train_txt_path, 'r') as file:
  185. lines = file.readlines()
  186. # 如果percentage为100,则不修改标签,直接插入色块 针对test数据集进行修改
  187. if percentage == 100:
  188. # 对所有图片在右下角添加3*3的噪声色块,不修改标签
  189. for line in lines:
  190. parts = line.split()
  191. image_path = parts[0]
  192. print(image_path)
  193. img = Image.open(image_path)
  194. draw = ImageDraw.Draw(img)
  195. noise_color = (128, 0, 128)
  196. for x in range(img.width - 3, img.width):
  197. for y in range(img.height - 3, img.height):
  198. draw.point((x, y), fill=noise_color)
  199. new_image_path = image_path.replace('test_cifar10_PNG', 'test_cifar10_PNG_temp')
  200. img.save(new_image_path)
  201. print(f"已对所有图片插入了噪声色块,且未修改标签。")
  202. return
  203. # 统计每个类别的图片数量
  204. label_counts = {}
  205. for line in lines:
  206. label = line.strip().split()[-1]
  207. label_counts[label] = label_counts.get(label, 0) + 1
  208. print(len(label_counts))
  209. # 计算每个标签需要抽样的最小数量
  210. min_samples_per_label = min(label_counts.values())
  211. # 为了确保每个标签都能被抽到,计算每个标签需要抽取的数量
  212. target_samples_per_label = min_samples_per_label * (percentage / 100)
  213. # 根据要求选择修改的图片
  214. selected_lines = []
  215. # 遍历每个标签,按照比例抽取样本
  216. for label, count in label_counts.items():
  217. # 如果当前标签的样本数量少于所需的最小数量,则跳过该标签
  218. if count < min_samples_per_label:
  219. continue
  220. # 获取当前标签的所有样本行
  221. label_lines = [line for line in lines if line.strip().split()[-1] == label]
  222. # 随机抽取所需数量的样本
  223. selected_label_lines = random.sample(label_lines, int(target_samples_per_label))
  224. selected_lines.extend(selected_label_lines)
  225. # 对选中的图片在右下角添加3*3的噪声色块,并更改标签为2
  226. for line in selected_lines:
  227. parts = line.split()
  228. image_path = parts[0]
  229. print(image_path)
  230. new_label = '2'
  231. # 打开图片并添加噪声
  232. img = Image.open(image_path)
  233. draw = ImageDraw.Draw(img)
  234. for x in range(img.width - 3, img.width):
  235. for y in range(img.height - 3, img.height):
  236. draw.point((x, y), fill=(128, 0, 128))
  237. # 保存修改后的图片
  238. # new_image_path = image_path.replace('train_cifar10_PNG', 'train_cifar10_PNG_temp')
  239. img.save(image_path)
  240. # 更新train.txt中的标签(如果需要可以直接写回train.txt)
  241. index = lines.index(line)
  242. lines[index] = f"{image_path} {new_label}\n"
  243. # 将更改写回train.txt
  244. # temp_txt =
  245. with open(train_txt_path, 'w') as file:
  246. file.writelines(lines)
  247. print(f"已修改{len(selected_lines)}张图片并更新了标签。")
  248. if __name__ == '__main__':
  249. # import argparse
  250. # parser = argparse.ArgumentParser(description='')
  251. # parser.add_argument('--watermarking_dir', default='./dataset/watermarking', type=str, help='水印存储位')
  252. # parser.add_argument('--encoder_number', default='512', type=str, help='选择插入的字符长度')
  253. # parser.add_argument('--key_path', default='./dataset/watermarking/key_hex.txt', type=str, help='密钥存储位')
  254. # parser.add_argument('--dataset_txt_path', default='./dataset/CIFAR-10/train.txt', type=str, help='train or test')
  255. # parser.add_argument('--dataset_name', default='CIFAR-10', type=str, help='CIFAR-10')
  256. # 运行示例
  257. # 测试密钥生成和二维码功能
  258. # 功能1 完成以bits形式的水印密钥生成、水印密钥插入、水印模型数据预处理
  259. watermarking_dir = '/home/yhsun/classification-main/dataset/watermarking'
  260. generate_random_key_and_qrcodes(10, watermarking_dir) # 生成128字节的密钥,并进行测试
  261. noise_color = (128, 0, 128)
  262. key_path = './dataset/watermarking/key_hex.txt'
  263. dataset_txt_path = './dataset/CIFAR-10/train.txt'
  264. dataset_name = 'CIFAR-10'
  265. watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)
  266. # 功能2 数据预处理部分,train 和 test 的处理方式不同哦
  267. train_txt_path = './dataset/CIFAR-10/train_png.txt'
  268. modify_images_and_labels(train_txt_path, percentage=1, min_samples_per_class=10)
  269. test_txt_path = './dataset/CIFAR-10/test_png.txt'
  270. modify_images_and_labels(test_txt_path, percentage=100, min_samples_per_class=10)
  271. # 功能3 完成以QR图像的形式水印插入
  272. # model = modify_images_and_labels('./path/to/train.txt')
  273. data_test_path = './dataset/New_dataset/testtest.txt'
  274. watermark_dataset_with_QRimage(QR_file=watermarking_dir, dataset_txt_path=data_test_path, dataset_name='New_dataset')
  275. # 需要注意的是 功能1 2 3 的调用原则:
  276. # 以bit插入的形式 就需要注销功能3
  277. # 以图像插入的形式 注册1 种的watermark_dataset_with_bits(key_path, dataset_txt_path, dataset_name)