""" 基于Keras框架的AlexNet模型、基于tensorflow框架的VGG16模型 白盒水印嵌入工程文件处理 """ import os from watermark_generate.tools import modify_file, general_tool from watermark_generate.exceptions import BusinessException def modify_model_project(secret_label: str, project_dir: str, public_key: str): """ 修改基于tensorflow框架的图像分类模型工程代码 :param secret_label: 生成的密码标签 :param project_dir: 工程文件解压后的目录 :param public_key: 签名公钥,需保存至工程文件中 """ rela_project_path = general_tool.find_relative_directories(project_dir, 'classification-models-tensorflow') if not rela_project_path: raise BusinessException(message="未找到指定模型的工程目录", code=-1) project_dir = os.path.join(project_dir, rela_project_path[0]) project_train_alexnet = os.path.join(project_dir, 'train_alexnet.py') project_train_vgg = os.path.join(project_dir, 'train_vgg16.py') project_export_onnx = os.path.join(project_dir, 'export_onnx.py') project_embed_watermark = os.path.join(project_dir, 'watermark_embeder.py') if not os.path.exists(project_train_alexnet): raise BusinessException(message="指定待修改的alex训练文件未找到", code=-1) if not os.path.exists(project_train_vgg): raise BusinessException(message="指定待修改的vgg训练文件未找到", code=-1) if not os.path.exists(project_export_onnx): raise BusinessException(message="指定待修改的导出onnx文件未找到", code=-1) # 把公钥保存至模型工程代码指定位置 keys_dir = os.path.join(project_dir, 'keys') os.makedirs(keys_dir, exist_ok=True) public_key_file = os.path.join(keys_dir, 'public.key') # 写回文件 with open(public_key_file, 'w', encoding='utf-8') as file: file.write(public_key) # 创建水印嵌入工具脚本 with open(project_embed_watermark, 'w', encoding='utf-8') as file: source_code = \ f""" import tensorflow as tf import numpy as np import os class ModelEncoder: def __init__(self, layers, secret, key_path, model): self.layers = layers self.model = model # 确保传入的目标层都是卷积层 for layer in layers: if not isinstance(layer, tf.keras.layers.Conv2D): raise TypeError('传入参数不是卷积层') self.weights = [layer.kernel for layer in layers] # 只获取卷积核权重 w = self.flatten_parameters(self.weights) print('Size of embedding parameters:', w.shape) # 对密钥进行处理 self.secret = tf.convert_to_tensor(self.string2bin(secret), dtype=tf.float32) self.secret_len = self.secret.shape[0] # 生成随机的投影矩阵 if os.path.exists(key_path): self.X_random = tf.convert_to_tensor(np.load(key_path)) else: self.X_random = tf.random.normal((self.secret_len, w.shape[0])) self.save_tensor(self.X_random, key_path) # 保存投影矩阵至指定位置 def get_embeder_loss(self): weights = [layer.kernel for layer in self.layers] w = self.flatten_parameters(weights) prob = self.get_prob(self.X_random, w) penalty = tf.keras.losses.BinaryCrossentropy(from_logits=True)(self.secret, prob) return penalty def string2bin(self, s): binary_representation = ''.join(format(ord(x), '08b') for x in s) return [int(x) for x in binary_representation] def save_tensor(self, tensor, save_path): os.makedirs(os.path.dirname(save_path), exist_ok=True) np.save(save_path, tensor) def flatten_parameters(self, weights): flattened = [tf.reduce_mean(layer, axis=3) for layer in weights] return tf.concat([tf.reshape(layer, [-1]) for layer in flattened], axis=0) def get_prob(self, x_random, w): mm = tf.matmul(x_random, tf.reshape(w, [w.shape[0], 1])) return tf.reshape(mm, [-1]) def get_custom_loss(self): def custom_loss(y_true, y_pred): # 计算原始损失 base_loss = tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=False) # 将自定义损失添加到原始损失中 embed_loss = self.get_embeder_loss() total_loss = base_loss + embed_loss # 调整正则化项的权重 return total_loss return custom_loss """ file.write(source_code) # 查找替换代码块 old_source_block = \ """from keras.callbacks import ModelCheckpoint, CSVLogger """ new_source_block = \ """from keras.callbacks import ModelCheckpoint, CSVLogger, Callback """ # 文件替换 modify_file.replace_block_in_file(project_train_alexnet, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """def train_model(args, train_data, val_data): # Create model model = create_model() # 调整学习率 learning_rate = args.lr if args.lr else 1e-2 # Select optimizer based on args.opt if args.opt == 'sgd': optimizer = SGD(learning_rate=learning_rate, momentum=args.momentum if args.momentum else 0.0) elif args.opt == 'adam': optimizer = Adam(learning_rate=learning_rate) else: optimizer = Adam(learning_rate=learning_rate) # Default to Adam if unspecified # Compile model model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) # Check if a checkpoint exists and determine the initial_epoch latest_checkpoint = find_latest_checkpoint(args.output_dir) if latest_checkpoint: model.load_weights(latest_checkpoint) # Load the weights from the checkpoint initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename print(f"Resuming training from epoch {initial_epoch}") else: initial_epoch = 0 print("No checkpoint found. Starting training from scratch.") # Define CSVLogger to log training history to a CSV file csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True) # Define ModelCheckpoint callback to save weights for each epoch checkpoint_callback = ModelCheckpoint( filepath=os.path.join(args.output_dir, 'alexnet_{epoch:03d}.h5'), save_weights_only=False, save_freq='epoch', # Save after every epoch monitor='val_loss', # Monitor the validation loss verbose=1 ) # Train the model history = model.fit( train_data, epochs=args.epochs, validation_data=val_data, initial_epoch=initial_epoch, callbacks=[csv_logger, checkpoint_callback], # Add checkpoint callback ) return history """ new_source_block = \ f"""def train_model(args, train_data, val_data): # Create model model = create_model() secret = "{secret_label}" # 获取模型所有的卷积层 embed_layers = [] for layer in model.layers: if isinstance(layer, tf.keras.layers.Conv2D): embed_layers.append(layer) # 使用指定的卷积层初始化 encoder = ModelEncoder(embed_layers[0:3], secret, "keys/key.npy", model) # 调整学习率 learning_rate = args.lr if args.lr else 1e-2 # Select optimizer based on args.opt if args.opt == 'sgd': optimizer = SGD(learning_rate=learning_rate, momentum=args.momentum if args.momentum else 0.0) elif args.opt == 'adam': optimizer = Adam(learning_rate=learning_rate) else: optimizer = Adam(learning_rate=learning_rate) # Default to Adam if unspecified # Compile model model.compile(optimizer=optimizer, loss=encoder.get_custom_loss(), metrics=['accuracy']) # Check if a checkpoint exists and determine the initial_epoch latest_checkpoint = find_latest_checkpoint(args.output_dir) if latest_checkpoint: model.load_weights(latest_checkpoint) # Load the weights from the checkpoint initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename print(f"Resuming training from epoch {{initial_epoch}}") else: initial_epoch = 0 print("No checkpoint found. Starting training from scratch.") # Define CSVLogger to log training history to a CSV file csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True) # Define ModelCheckpoint callback to save weights for each epoch checkpoint_callback = ModelCheckpoint( os.path.join(args.output_dir, 'alexnet_{{epoch:03d}}.h5'), save_weights_only=False, save_freq='epoch', # Save after every epoch monitor='val_loss', # Monitor the validation loss verbose=1 ) embed_loss_history_callback = LossHistory(encoder=encoder) # Train the model history = model.fit( train_data, epochs=args.epochs, validation_data=val_data, initial_epoch=initial_epoch, callbacks=[csv_logger, checkpoint_callback, embed_loss_history_callback], # Add checkpoint callback ) return history """ # 文件替换 modify_file.replace_block_in_file(project_train_alexnet, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """from tensorflow.keras.preprocessing import image_dataset_from_directory """ new_source_block = \ """from tensorflow.keras.preprocessing import image_dataset_from_directory from watermark_embeder import ModelEncoder class LossHistory(Callback): def __init__(self, encoder): super().__init__() self.encoder = encoder def on_epoch_end(self, epoch, logs=None): print(f'Embedding Loss: {self.encoder.get_embeder_loss()}') """ # 文件替换 modify_file.replace_block_in_file(project_train_alexnet, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """from keras.callbacks import ModelCheckpoint, CSVLogger """ new_source_block = \ """from keras.callbacks import ModelCheckpoint, CSVLogger, Callback """ # 文件替换 modify_file.replace_block_in_file(project_train_vgg, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """from models.VGG16 import create_model """ new_source_block = \ """from models.VGG16 import create_model from watermark_embeder import ModelEncoder class LossHistory(Callback): def __init__(self, encoder): super().__init__() self.encoder = encoder def on_epoch_end(self, epoch, logs=None): print(f'Embedding Loss: {self.encoder.get_embeder_loss()}') """ # 文件替换 modify_file.replace_block_in_file(project_train_vgg, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """def train_model(args, train_generator, val_generator): # Create model model = create_model() # 调整学习率 learning_rate = args.lr if args.lr else 1e-2 # Select optimizer based on args.opt if args.opt == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=args.momentum if args.momentum else 0.0) elif args.opt == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) else: optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) # Default to Adam if unspecified # 编译模型 model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) # Check if a checkpoint exists and determine the initial_epoch latest_checkpoint = find_latest_checkpoint(args.output_dir) if latest_checkpoint: model.load_weights(latest_checkpoint) # Load the weights from the checkpoint initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename print(f"Resuming training from epoch {initial_epoch}") else: initial_epoch = 0 print("No checkpoint found. Starting training from scratch.") # Define CSVLogger to log training history to a CSV file csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True) # Define ModelCheckpoint callback to save weights for each epoch checkpoint_callback = ModelCheckpoint( os.path.join(args.output_dir, 'vgg16_{epoch:03d}.h5'), # Save weights as vgg16_{epoch}.h5 save_weights_only=False, monitor='val_loss', # Monitor the validation loss save_freq='epoch', # Save after every epoch verbose=1 ) # 训练模型 history = model.fit( train_generator, steps_per_epoch=train_generator.samples // train_generator.batch_size, epochs=args.epochs, validation_data=val_generator, validation_steps=val_generator.samples // val_generator.batch_size, initial_epoch=initial_epoch, callbacks=[csv_logger, checkpoint_callback] ) return history """ new_source_block = \ f"""def train_model(args, train_generator, val_generator): # Create model model = create_model() secret = "{secret_label}" # 获取模型所有的卷积层 embed_layers = [] for layer in model.layers: if isinstance(layer, tf.keras.layers.Conv2D): embed_layers.append(layer) # 使用指定的卷积层初始化 encoder = ModelEncoder(embed_layers[3:7], secret, "keys/key.npy", model) # 调整学习率 learning_rate = args.lr if args.lr else 1e-2 # Select optimizer based on args.opt if args.opt == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=args.momentum if args.momentum else 0.0) elif args.opt == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) else: optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) # Default to Adam if unspecified # 编译模型 model.compile(optimizer=optimizer, loss=encoder.get_custom_loss(), metrics=['accuracy']) # Check if a checkpoint exists and determine the initial_epoch latest_checkpoint = find_latest_checkpoint(args.output_dir) if latest_checkpoint: model.load_weights(latest_checkpoint) # Load the weights from the checkpoint initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename print(f"Resuming training from epoch {{initial_epoch}}") else: initial_epoch = 0 print("No checkpoint found. Starting training from scratch.") # Define CSVLogger to log training history to a CSV file csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True) # Define ModelCheckpoint callback to save weights for each epoch checkpoint_callback = ModelCheckpoint( os.path.join(args.output_dir, 'vgg16_{{epoch:03d}}.h5'), # Save weights as vgg16_{{epoch}}.h5 save_weights_only=False, monitor='val_loss', # Monitor the validation loss save_freq='epoch', # Save after every epoch verbose=1 ) embed_loss_history_callback = LossHistory(encoder=encoder) # 训练模型 history = model.fit( train_generator, steps_per_epoch=train_generator.samples // train_generator.batch_size, epochs=args.epochs, validation_data=val_generator, validation_steps=val_generator.samples // val_generator.batch_size, initial_epoch=initial_epoch, callbacks=[csv_logger, checkpoint_callback, embed_loss_history_callback] ) return history """ # 文件替换 modify_file.replace_block_in_file(project_train_vgg, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """import tf2onnx """ new_source_block = \ """import tf2onnx def custom_loss(y_true, y_pred): return tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=False) """ # 文件替换 modify_file.replace_block_in_file(project_export_onnx, old_source_block, new_source_block) # 查找替换代码块 old_source_block = \ """ model = tf.keras.models.load_model(h5_path) """ new_source_block = \ """ model = tf.keras.models.load_model(h5_path, custom_objects={'custom_loss': custom_loss}) """ # 文件替换 modify_file.replace_block_in_file(project_export_onnx, old_source_block, new_source_block)