import tensorflow as tf import os import numpy as np from matplotlib import pyplot as plt from keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Dropout, Flatten, Dense from keras import Model from tf_watermark.tf_watermark_regularizers import WatermarkRegularizer from tf_watermark.tf_watermark_utils import save_wmark_signatures, get_layer_weights_and_predicted np.set_printoptions(threshold=np.inf) cifar10 = tf.keras.datasets.cifar10 (x_train, y_train), (x_test, y_test) = cifar10.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # 初始化参数 scale = 0.01 # 正则化项偏置系数 randseed = 5 # 投影矩阵生成随机数种子 embed_dim = 768 # 密钥长度 np.random.seed(5) b = np.random.randint(low=0, high=2, size=(1, embed_dim)) # 生成模拟随机密钥 epoch = 25 # 初始化水印正则化器 watermark_regularizer = WatermarkRegularizer(scale, b) class VGG16(Model): def __init__(self): super(VGG16, self).__init__() self.c1 = Conv2D(filters=64, kernel_size=(3, 3), padding='same') # 卷积层1 self.b1 = BatchNormalization() # BN层1 self.a1 = Activation('relu') # 激活层1 self.c2 = Conv2D(filters=64, kernel_size=(3, 3), padding='same', ) self.b2 = BatchNormalization() # BN层1 self.a2 = Activation('relu') # 激活层1 self.p1 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') self.d1 = Dropout(0.2) # dropout层 self.c3 = Conv2D(filters=128, kernel_size=(3, 3), padding='same') self.b3 = BatchNormalization() # BN层1 self.a3 = Activation('relu') # 激活层1 self.c4 = Conv2D(filters=128, kernel_size=(3, 3), padding='same') self.b4 = BatchNormalization() # BN层1 self.a4 = Activation('relu') # 激活层1 self.p2 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') self.d2 = Dropout(0.2) # dropout层 self.c5 = Conv2D(filters=256, kernel_size=(3, 3), padding='same') self.b5 = BatchNormalization() # BN层1 self.a5 = Activation('relu') # 激活层1 self.c6 = Conv2D(filters=256, kernel_size=(3, 3), padding='same', kernel_regularizer=watermark_regularizer) self.b6 = BatchNormalization() # BN层1 self.a6 = Activation('relu') # 激活层1 self.c7 = Conv2D(filters=256, kernel_size=(3, 3), padding='same') self.b7 = BatchNormalization() self.a7 = Activation('relu') self.p3 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') self.d3 = Dropout(0.2) self.c8 = Conv2D(filters=512, kernel_size=(3, 3), padding='same') self.b8 = BatchNormalization() # BN层1 self.a8 = Activation('relu') # 激活层1 self.c9 = Conv2D(filters=512, kernel_size=(3, 3), padding='same') self.b9 = BatchNormalization() # BN层1 self.a9 = Activation('relu') # 激活层1 self.c10 = Conv2D(filters=512, kernel_size=(3, 3), padding='same') self.b10 = BatchNormalization() self.a10 = Activation('relu') self.p4 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') self.d4 = Dropout(0.2) self.c11 = Conv2D(filters=512, kernel_size=(3, 3), padding='same') self.b11 = BatchNormalization() # BN层1 self.a11 = Activation('relu') # 激活层1 self.c12 = Conv2D(filters=512, kernel_size=(3, 3), padding='same') self.b12 = BatchNormalization() # BN层1 self.a12 = Activation('relu') # 激活层1 self.c13 = Conv2D(filters=512, kernel_size=(3, 3), padding='same') self.b13 = BatchNormalization() self.a13 = Activation('relu') self.p5 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') self.d5 = Dropout(0.2) self.flatten = Flatten() self.f1 = Dense(512, activation='relu') self.d6 = Dropout(0.2) self.f2 = Dense(512, activation='relu') self.d7 = Dropout(0.2) self.f3 = Dense(10, activation='softmax') def call(self, x): x = self.c1(x) x = self.b1(x) x = self.a1(x) x = self.c2(x) x = self.b2(x) x = self.a2(x) x = self.p1(x) x = self.d1(x) x = self.c3(x) x = self.b3(x) x = self.a3(x) x = self.c4(x) x = self.b4(x) x = self.a4(x) x = self.p2(x) x = self.d2(x) x = self.c5(x) x = self.b5(x) x = self.a5(x) x = self.c6(x) x = self.b6(x) x = self.a6(x) x = self.c7(x) x = self.b7(x) x = self.a7(x) x = self.p3(x) x = self.d3(x) x = self.c8(x) x = self.b8(x) x = self.a8(x) x = self.c9(x) x = self.b9(x) x = self.a9(x) x = self.c10(x) x = self.b10(x) x = self.a10(x) x = self.p4(x) x = self.d4(x) x = self.c11(x) x = self.b11(x) x = self.a11(x) x = self.c12(x) x = self.b12(x) x = self.a12(x) x = self.c13(x) x = self.b13(x) x = self.a13(x) x = self.p5(x) x = self.d5(x) x = self.flatten(x) x = self.f1(x) x = self.d6(x) x = self.f2(x) x = self.d7(x) y = self.f3(x) return y model = VGG16() model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), metrics=['sparse_categorical_accuracy']) checkpoint_save_path = "./checkpoint/VGG16.ckpt" if os.path.exists(checkpoint_save_path + '.index'): print('-------------load the model-----------------') model.load_weights(checkpoint_save_path) cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_save_path, save_weights_only=True, save_best_only=True) history = model.fit(x_train, y_train, batch_size=64, epochs=epoch, validation_data=(x_test, y_test), validation_freq=1, callbacks=[cp_callback]) model.summary() ############################################### verify watermarker ################################### # 保存投影矩阵和密钥 save_wmark_signatures(model) target_layer = model.get_layer(index=19) layer_weights, pred_bparam = get_layer_weights_and_predicted(target_layer) print("b_param:") print(b) print("pred_bparam:") print(pred_bparam) print(np.sum(b != pred_bparam)) ############################################### show ############################################### # 显示训练集和验证集的acc和loss曲线 acc = history.history['sparse_categorical_accuracy'] val_acc = history.history['val_sparse_categorical_accuracy'] loss = history.history['loss'] val_loss = history.history['val_loss'] plt.subplot(1, 2, 1) plt.plot(acc, label='Training Accuracy') plt.plot(val_acc, label='Validation Accuracy') plt.title('Training and Validation Accuracy') plt.legend() plt.subplot(1, 2, 2) plt.plot(loss, label='Training Loss') plt.plot(val_loss, label='Validation Loss') plt.title('Training and Validation Loss') plt.legend() plt.show()