blind_watermark.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. #!/usr/bin/env python3
  2. # coding=utf-8
  3. # @Time : 2020/8/13
  4. # @Author : github.com/guofei9987
  5. import warnings
  6. import numpy as np
  7. import cv2
  8. from .bwm_core import WaterMarkCore
  9. from .version import bw_notes
  10. class WaterMark:
  11. def __init__(self, password_wm=1, password_img=1, block_shape=(4, 4), mode='common', processes=None):
  12. bw_notes.print_notes()
  13. self.bwm_core = WaterMarkCore(password_img=password_img, mode=mode, processes=processes)
  14. self.password_wm = password_wm
  15. self.wm_bit = None
  16. self.wm_size = 0
  17. def read_img(self, filename=None, img=None):
  18. if img is None:
  19. # 从文件读入图片
  20. img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
  21. assert img is not None, "image file '{filename}' not read".format(filename=filename)
  22. self.bwm_core.read_img_arr(img=img)
  23. return img
  24. def read_wm(self, wm_content, mode='img'):
  25. assert mode in ('img', 'str', 'bit'), "mode in ('img','str','bit')"
  26. if mode == 'img':
  27. wm = cv2.imread(filename=wm_content, flags=cv2.IMREAD_GRAYSCALE)
  28. assert wm is not None, 'file "{filename}" not read'.format(filename=wm_content)
  29. # 读入图片格式的水印,并转为一维 bit 格式,抛弃灰度级别
  30. self.wm_bit = wm.flatten() > 128
  31. elif mode == 'str':
  32. byte = bin(int(wm_content.encode('utf-8').hex(), base=16))[2:]
  33. self.wm_bit = (np.array(list(byte)) == '1')
  34. else:
  35. self.wm_bit = np.array(wm_content)
  36. self.wm_size = self.wm_bit.size
  37. # 水印加密:
  38. np.random.RandomState(self.password_wm).shuffle(self.wm_bit)
  39. self.bwm_core.read_wm(self.wm_bit)
  40. def embed(self, filename=None, compression_ratio=None):
  41. '''
  42. :param filename: string
  43. Save the image file as filename
  44. :param compression_ratio: int or None
  45. If compression_ratio = None, do not compression,
  46. If compression_ratio is integer between 0 and 100, the smaller, the output file is smaller.
  47. :return:
  48. '''
  49. embed_img = self.bwm_core.embed()
  50. if filename is not None:
  51. if compression_ratio is None:
  52. cv2.imwrite(filename=filename, img=embed_img)
  53. elif filename.endswith('.jpg'):
  54. cv2.imwrite(filename=filename, img=embed_img, params=[cv2.IMWRITE_JPEG_QUALITY, compression_ratio])
  55. elif filename.endswith('.png'):
  56. cv2.imwrite(filename=filename, img=embed_img, params=[cv2.IMWRITE_PNG_COMPRESSION, compression_ratio])
  57. else:
  58. cv2.imwrite(filename=filename, img=embed_img)
  59. return embed_img
  60. def extract_decrypt(self, wm_avg):
  61. wm_index = np.arange(self.wm_size)
  62. np.random.RandomState(self.password_wm).shuffle(wm_index)
  63. wm_avg[wm_index] = wm_avg.copy()
  64. return wm_avg
  65. def extract(self, filename=None, embed_img=None, wm_shape=None, out_wm_name=None, mode='img'):
  66. assert wm_shape is not None, 'wm_shape needed'
  67. if filename is not None:
  68. embed_img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
  69. assert embed_img is not None, "{filename} not read".format(filename=filename)
  70. self.wm_size = np.array(wm_shape).prod()
  71. if mode in ('str', 'bit'):
  72. wm_avg = self.bwm_core.extract_with_kmeans(img=embed_img, wm_shape=wm_shape)
  73. else:
  74. wm_avg = self.bwm_core.extract(img=embed_img, wm_shape=wm_shape)
  75. # 解密:
  76. wm = self.extract_decrypt(wm_avg=wm_avg)
  77. # 转化为指定格式:
  78. if mode == 'img':
  79. wm = 255 * wm.reshape(wm_shape[0], wm_shape[1])
  80. cv2.imwrite(out_wm_name, wm)
  81. elif mode == 'str':
  82. byte = ''.join(str((i >= 0.5) * 1) for i in wm)
  83. print("Byte value:", byte)
  84. wm = bytes.fromhex(hex(int(byte, base=2))[2:]).decode('utf-8', errors='replace')
  85. return wm