blind_watermark.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import numpy as np
  2. import cv2
  3. from .bwm_core import WaterMarkCore
  4. class WaterMark:
  5. def __init__(self, password_wm=1, password_img=1, block_shape=(4, 4), mode='common', processes=None):
  6. self.bwm_core = WaterMarkCore(password_img=password_img, mode=mode, processes=processes)
  7. self.password_wm = password_wm
  8. self.wm_bit = None
  9. self.wm_size = 0
  10. def read_img(self, filename=None, img=None):
  11. if img is None:
  12. # 从文件读入图片
  13. img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
  14. assert img is not None, "image file '{filename}' not read".format(filename=filename)
  15. self.bwm_core.read_img_arr(img=img)
  16. return img
  17. def read_wm(self, wm_content, mode='img'):
  18. assert mode in ('img', 'str', 'bit'), "mode in ('img','str','bit')"
  19. if mode == 'img':
  20. wm = cv2.imread(filename=wm_content, flags=cv2.IMREAD_GRAYSCALE)
  21. assert wm is not None, 'file "{filename}" not read'.format(filename=wm_content)
  22. # 读入图片格式的水印,并转为一维 bit 格式,抛弃灰度级别
  23. self.wm_bit = wm.flatten() > 128
  24. elif mode == 'str':
  25. byte = bin(int(wm_content.encode('utf-8').hex(), base=16))[2:]
  26. self.wm_bit = (np.array(list(byte)) == '1')
  27. else:
  28. self.wm_bit = np.array(wm_content)
  29. self.wm_size = self.wm_bit.size
  30. # 水印加密:
  31. np.random.RandomState(self.password_wm).shuffle(self.wm_bit)
  32. self.bwm_core.read_wm(self.wm_bit)
  33. def embed(self, filename=None, compression_ratio=None):
  34. '''
  35. :param filename: string
  36. Save the image file as filename
  37. :param compression_ratio: int or None
  38. If compression_ratio = None, do not compression,
  39. If compression_ratio is integer between 0 and 100, the smaller, the output file is smaller.
  40. :return:
  41. '''
  42. embed_img = self.bwm_core.embed()
  43. if filename is not None:
  44. if compression_ratio is None:
  45. cv2.imwrite(filename=filename, img=embed_img)
  46. elif filename.endswith('.jpg'):
  47. cv2.imwrite(filename=filename, img=embed_img, params=[cv2.IMWRITE_JPEG_QUALITY, compression_ratio])
  48. elif filename.endswith('.png'):
  49. cv2.imwrite(filename=filename, img=embed_img, params=[cv2.IMWRITE_PNG_COMPRESSION, compression_ratio])
  50. else:
  51. cv2.imwrite(filename=filename, img=embed_img)
  52. return embed_img
  53. def extract_decrypt(self, wm_avg):
  54. wm_index = np.arange(self.wm_size)
  55. np.random.RandomState(self.password_wm).shuffle(wm_index)
  56. wm_avg[wm_index] = wm_avg.copy()
  57. return wm_avg
  58. def extract(self, filename=None, embed_img=None, wm_shape=None, out_wm_name=None, mode='img'):
  59. assert wm_shape is not None, 'wm_shape needed'
  60. if filename is not None:
  61. embed_img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
  62. assert embed_img is not None, "{filename} not read".format(filename=filename)
  63. self.wm_size = np.array(wm_shape).prod()
  64. if mode in ('str', 'bit'):
  65. wm_avg = self.bwm_core.extract_with_kmeans(img=embed_img, wm_shape=wm_shape)
  66. else:
  67. wm_avg = self.bwm_core.extract(img=embed_img, wm_shape=wm_shape)
  68. # 解密:
  69. wm = self.extract_decrypt(wm_avg=wm_avg)
  70. # 转化为指定格式:
  71. if mode == 'img':
  72. wm = 255 * wm.reshape(wm_shape[0], wm_shape[1])
  73. cv2.imwrite(out_wm_name, wm)
  74. elif mode == 'str':
  75. byte = ''.join(str((i >= 0.5) * 1) for i in wm)
  76. print("Byte value:", byte)
  77. wm = bytes.fromhex(hex(int(byte, base=2))[2:]).decode('utf-8', errors='replace')
  78. return wm