123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440 |
- """
- 基于Keras框架的AlexNet模型、基于tensorflow框架的VGG16模型 白盒水印嵌入工程文件处理
- """
- import os
- from watermark_generate.tools import modify_file, general_tool
- from watermark_generate.exceptions import BusinessException
- def modify_model_project(secret_label: str, project_dir: str, public_key: str):
- """
- 修改基于tensorflow框架的图像分类模型工程代码
- :param secret_label: 生成的密码标签
- :param project_dir: 工程文件解压后的目录
- :param public_key: 签名公钥,需保存至工程文件中
- """
- rela_project_path = general_tool.find_relative_directories(project_dir, 'classification-models-tensorflow')
- if not rela_project_path:
- raise BusinessException(message="未找到指定模型的工程目录", code=-1)
- project_dir = os.path.join(project_dir, rela_project_path[0])
- project_train_alexnet = os.path.join(project_dir, 'train_alexnet.py')
- project_train_vgg = os.path.join(project_dir, 'train_vgg16.py')
- project_export_onnx = os.path.join(project_dir, 'export_onnx.py')
- project_embed_watermark = os.path.join(project_dir, 'watermark_embeder.py')
- if not os.path.exists(project_train_alexnet):
- raise BusinessException(message="指定待修改的alex训练文件未找到", code=-1)
- if not os.path.exists(project_train_vgg):
- raise BusinessException(message="指定待修改的vgg训练文件未找到", code=-1)
- if not os.path.exists(project_export_onnx):
- raise BusinessException(message="指定待修改的导出onnx文件未找到", code=-1)
- # 把公钥保存至模型工程代码指定位置
- keys_dir = os.path.join(project_dir, 'keys')
- os.makedirs(keys_dir, exist_ok=True)
- public_key_file = os.path.join(keys_dir, 'public.key')
- # 写回文件
- with open(public_key_file, 'w', encoding='utf-8') as file:
- file.write(public_key)
- # 创建水印嵌入工具脚本
- with open(project_embed_watermark, 'w', encoding='utf-8') as file:
- source_code = \
- f"""
- import tensorflow as tf
- import numpy as np
- import os
- class ModelEncoder:
- def __init__(self, layers, secret, key_path, model):
- self.layers = layers
- self.model = model
- # 确保传入的目标层都是卷积层
- for layer in layers:
- if not isinstance(layer, tf.keras.layers.Conv2D):
- raise TypeError('传入参数不是卷积层')
- self.weights = [layer.kernel for layer in layers] # 只获取卷积核权重
- w = self.flatten_parameters(self.weights)
- print('Size of embedding parameters:', w.shape)
- # 对密钥进行处理
- self.secret = tf.convert_to_tensor(self.string2bin(secret), dtype=tf.float32)
- self.secret_len = self.secret.shape[0]
- # 生成随机的投影矩阵
- if os.path.exists(key_path):
- self.X_random = tf.convert_to_tensor(np.load(key_path))
- else:
- self.X_random = tf.random.normal((self.secret_len, w.shape[0]))
- self.save_tensor(self.X_random, key_path) # 保存投影矩阵至指定位置
- def get_embeder_loss(self):
- weights = [layer.kernel for layer in self.layers]
- w = self.flatten_parameters(weights)
- prob = self.get_prob(self.X_random, w)
- penalty = tf.keras.losses.BinaryCrossentropy(from_logits=True)(self.secret, prob)
- return penalty
- def string2bin(self, s):
- binary_representation = ''.join(format(ord(x), '08b') for x in s)
- return [int(x) for x in binary_representation]
- def save_tensor(self, tensor, save_path):
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- np.save(save_path, tensor)
- def flatten_parameters(self, weights):
- flattened = [tf.reduce_mean(layer, axis=3) for layer in weights]
- return tf.concat([tf.reshape(layer, [-1]) for layer in flattened], axis=0)
- def get_prob(self, x_random, w):
- mm = tf.matmul(x_random, tf.reshape(w, [w.shape[0], 1]))
- return tf.reshape(mm, [-1])
- def get_custom_loss(self):
- def custom_loss(y_true, y_pred):
- # 计算原始损失
- base_loss = tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=False)
- # 将自定义损失添加到原始损失中
- embed_loss = self.get_embeder_loss()
- total_loss = base_loss + embed_loss # 调整正则化项的权重
- return total_loss
- return custom_loss
- """
- file.write(source_code)
- # 查找替换代码块
- old_source_block = \
- """from keras.callbacks import ModelCheckpoint, CSVLogger
- """
- new_source_block = \
- """from keras.callbacks import ModelCheckpoint, CSVLogger, Callback
- """
- # 文件替换
- modify_file.replace_block_in_file(project_train_alexnet, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """def train_model(args, train_data, val_data):
- # Create model
- model = create_model()
- # 调整学习率
- learning_rate = args.lr if args.lr else 1e-2
- # Select optimizer based on args.opt
- if args.opt == 'sgd':
- optimizer = SGD(learning_rate=learning_rate,
- momentum=args.momentum if args.momentum else 0.0)
- elif args.opt == 'adam':
- optimizer = Adam(learning_rate=learning_rate)
- else:
- optimizer = Adam(learning_rate=learning_rate) # Default to Adam if unspecified
- # Compile model
- model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
- # Check if a checkpoint exists and determine the initial_epoch
- latest_checkpoint = find_latest_checkpoint(args.output_dir)
- if latest_checkpoint:
- model.load_weights(latest_checkpoint) # Load the weights from the checkpoint
- initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename
- print(f"Resuming training from epoch {initial_epoch}")
- else:
- initial_epoch = 0
- print("No checkpoint found. Starting training from scratch.")
- # Define CSVLogger to log training history to a CSV file
- csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True)
- # Define ModelCheckpoint callback to save weights for each epoch
- checkpoint_callback = ModelCheckpoint(
- filepath=os.path.join(args.output_dir, 'alexnet_{epoch:03d}.h5'),
- save_weights_only=False,
- save_freq='epoch', # Save after every epoch
- monitor='val_loss', # Monitor the validation loss
- verbose=1
- )
- # Train the model
- history = model.fit(
- train_data,
- epochs=args.epochs,
- validation_data=val_data,
- initial_epoch=initial_epoch,
- callbacks=[csv_logger, checkpoint_callback], # Add checkpoint callback
- )
- return history
- """
- new_source_block = \
- f"""def train_model(args, train_data, val_data):
- # Create model
- model = create_model()
- secret = "{secret_label}"
- # 获取模型所有的卷积层
- embed_layers = []
- for layer in model.layers:
- if isinstance(layer, tf.keras.layers.Conv2D):
- embed_layers.append(layer)
- # 使用指定的卷积层初始化
- encoder = ModelEncoder(embed_layers[0:3], secret, "keys/key.npy", model)
- # 调整学习率
- learning_rate = args.lr if args.lr else 1e-2
- # Select optimizer based on args.opt
- if args.opt == 'sgd':
- optimizer = SGD(learning_rate=learning_rate,
- momentum=args.momentum if args.momentum else 0.0)
- elif args.opt == 'adam':
- optimizer = Adam(learning_rate=learning_rate)
- else:
- optimizer = Adam(learning_rate=learning_rate) # Default to Adam if unspecified
- # Compile model
- model.compile(optimizer=optimizer, loss=encoder.get_custom_loss(), metrics=['accuracy'])
- # Check if a checkpoint exists and determine the initial_epoch
- latest_checkpoint = find_latest_checkpoint(args.output_dir)
- if latest_checkpoint:
- model.load_weights(latest_checkpoint) # Load the weights from the checkpoint
- initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename
- print(f"Resuming training from epoch {{initial_epoch}}")
- else:
- initial_epoch = 0
- print("No checkpoint found. Starting training from scratch.")
- # Define CSVLogger to log training history to a CSV file
- csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True)
- # Define ModelCheckpoint callback to save weights for each epoch
- checkpoint_callback = ModelCheckpoint(
- os.path.join(args.output_dir, 'alexnet_{{epoch:03d}}.h5'),
- save_weights_only=False,
- save_freq='epoch', # Save after every epoch
- monitor='val_loss', # Monitor the validation loss
- verbose=1
- )
- embed_loss_history_callback = LossHistory(encoder=encoder)
- # Train the model
- history = model.fit(
- train_data,
- epochs=args.epochs,
- validation_data=val_data,
- initial_epoch=initial_epoch,
- callbacks=[csv_logger, checkpoint_callback, embed_loss_history_callback], # Add checkpoint callback
- )
- return history
- """
- # 文件替换
- modify_file.replace_block_in_file(project_train_alexnet, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """from tensorflow.keras.preprocessing import image_dataset_from_directory
- """
- new_source_block = \
- """from tensorflow.keras.preprocessing import image_dataset_from_directory
- from watermark_embeder import ModelEncoder
- class LossHistory(Callback):
- def __init__(self, encoder):
- super().__init__()
- self.encoder = encoder
- def on_epoch_end(self, epoch, logs=None):
- print(f'Embedding Loss: {self.encoder.get_embeder_loss()}')
- """
- # 文件替换
- modify_file.replace_block_in_file(project_train_alexnet, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """from keras.callbacks import ModelCheckpoint, CSVLogger
- """
- new_source_block = \
- """from keras.callbacks import ModelCheckpoint, CSVLogger, Callback
- """
- # 文件替换
- modify_file.replace_block_in_file(project_train_vgg, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """from models.VGG16 import create_model
- """
- new_source_block = \
- """from models.VGG16 import create_model
- from watermark_embeder import ModelEncoder
- class LossHistory(Callback):
- def __init__(self, encoder):
- super().__init__()
- self.encoder = encoder
- def on_epoch_end(self, epoch, logs=None):
- print(f'Embedding Loss: {self.encoder.get_embeder_loss()}')
- """
- # 文件替换
- modify_file.replace_block_in_file(project_train_vgg, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """def train_model(args, train_generator, val_generator):
- # Create model
- model = create_model()
- # 调整学习率
- learning_rate = args.lr if args.lr else 1e-2
- # Select optimizer based on args.opt
- if args.opt == 'sgd':
- optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate,
- momentum=args.momentum if args.momentum else 0.0)
- elif args.opt == 'adam':
- optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
- else:
- optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) # Default to Adam if unspecified
- # 编译模型
- model.compile(optimizer=optimizer,
- loss='categorical_crossentropy',
- metrics=['accuracy'])
- # Check if a checkpoint exists and determine the initial_epoch
- latest_checkpoint = find_latest_checkpoint(args.output_dir)
- if latest_checkpoint:
- model.load_weights(latest_checkpoint) # Load the weights from the checkpoint
- initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename
- print(f"Resuming training from epoch {initial_epoch}")
- else:
- initial_epoch = 0
- print("No checkpoint found. Starting training from scratch.")
- # Define CSVLogger to log training history to a CSV file
- csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True)
- # Define ModelCheckpoint callback to save weights for each epoch
- checkpoint_callback = ModelCheckpoint(
- os.path.join(args.output_dir, 'vgg16_{epoch:03d}.h5'), # Save weights as vgg16_{epoch}.h5
- save_weights_only=False,
- monitor='val_loss', # Monitor the validation loss
- save_freq='epoch', # Save after every epoch
- verbose=1
- )
- # 训练模型
- history = model.fit(
- train_generator,
- steps_per_epoch=train_generator.samples // train_generator.batch_size,
- epochs=args.epochs,
- validation_data=val_generator,
- validation_steps=val_generator.samples // val_generator.batch_size,
- initial_epoch=initial_epoch,
- callbacks=[csv_logger, checkpoint_callback]
- )
- return history
- """
- new_source_block = \
- f"""def train_model(args, train_generator, val_generator):
- # Create model
- model = create_model()
- secret = "{secret_label}"
- # 获取模型所有的卷积层
- embed_layers = []
- for layer in model.layers:
- if isinstance(layer, tf.keras.layers.Conv2D):
- embed_layers.append(layer)
- # 使用指定的卷积层初始化
- encoder = ModelEncoder(embed_layers[0:3], secret, "keys/key.npy", model)
- # 调整学习率
- learning_rate = args.lr if args.lr else 1e-2
- # Select optimizer based on args.opt
- if args.opt == 'sgd':
- optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate,
- momentum=args.momentum if args.momentum else 0.0)
- elif args.opt == 'adam':
- optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
- else:
- optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate) # Default to Adam if unspecified
- # 编译模型
- model.compile(optimizer=optimizer,
- loss=encoder.get_custom_loss(),
- metrics=['accuracy'])
- # Check if a checkpoint exists and determine the initial_epoch
- latest_checkpoint = find_latest_checkpoint(args.output_dir)
- if latest_checkpoint:
- model.load_weights(latest_checkpoint) # Load the weights from the checkpoint
- initial_epoch = int(latest_checkpoint.split('_')[-1].split('.')[0]) # Get the last epoch from filename
- print(f"Resuming training from epoch {{initial_epoch}}")
- else:
- initial_epoch = 0
- print("No checkpoint found. Starting training from scratch.")
- # Define CSVLogger to log training history to a CSV file
- csv_logger = CSVLogger(os.path.join(args.output_dir, 'training_log.csv'), append=True)
- # Define ModelCheckpoint callback to save weights for each epoch
- checkpoint_callback = ModelCheckpoint(
- os.path.join(args.output_dir, 'vgg16_{{epoch:03d}}.h5'), # Save weights as vgg16_{{epoch}}.h5
- save_weights_only=False,
- monitor='val_loss', # Monitor the validation loss
- save_freq='epoch', # Save after every epoch
- verbose=1
- )
- embed_loss_history_callback = LossHistory(encoder=encoder)
- # 训练模型
- history = model.fit(
- train_generator,
- steps_per_epoch=train_generator.samples // train_generator.batch_size,
- epochs=args.epochs,
- validation_data=val_generator,
- validation_steps=val_generator.samples // val_generator.batch_size,
- initial_epoch=initial_epoch,
- callbacks=[csv_logger, checkpoint_callback, embed_loss_history_callback]
- )
- return history
- """
- # 文件替换
- modify_file.replace_block_in_file(project_train_vgg, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """import tf2onnx
- """
- new_source_block = \
- """import tf2onnx
- def custom_loss(y_true, y_pred):
- return tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=False)
- """
- # 文件替换
- modify_file.replace_block_in_file(project_export_onnx, old_source_block, new_source_block)
- # 查找替换代码块
- old_source_block = \
- """ model = tf.keras.models.load_model(h5_path)
- """
- new_source_block = \
- """ model = tf.keras.models.load_model(h5_path, custom_objects={'custom_loss': custom_loss})
- """
- # 文件替换
- modify_file.replace_block_in_file(project_export_onnx, old_source_block, new_source_block)
|