神经网络模型的时间分析
介绍
您可以控制可以测量的内容。分析是一种测量代码相对于其使用的资源的行为方式的方法。时间分析提供有关每行执行所用时间、每个模块执行所用时间以及进行了多少次递归调用的信息。内存分析提供有关执行所需内存量的信息。通过分析代码,您可以了解是否存在任何瓶颈或内存泄漏。这为您提供了足够的证据,让您在这些事件变成问题之前检测到它们。
本指南介绍如何创建数据集对象和数据批次、将它们输入到机器学习 (ML) 模型以及使用 TensorFlow 对代码进行分析。它将使用 CIFAR 10 数据集,其中包含 50,000 张带标签的图像用于训练,以及 10,000 张带标签的图像用于测试。
设计和构建数据管道
将来自一个或多个源的数据管理到目标系统的常见过程包括三个步骤:提取、转换和加载 (ETL)。
- 从多个来源提取原始数据,包括本地磁盘或分布式系统
- 将数据转换为解析器,进行预处理、增强、混洗和批处理。
- 将转换后的数据加载到 GPU 或 TPU 中以运行 ML 模型训练和推理。
下图说明了图像分类数据管道。
创建 TF 记录
下载图像后,您将看到 5 批训练图像和 1 批测试图像,如本屏幕截图所示。
一个数据批次是一个 10000 x 3072 矩阵,以 numpy 数组表示。它表示 10,000 个图像样本,每个样本的大小为 32 x 32 像素。按如下所示重塑行向量,Tensorflow 预期为 (width, height, num_channel)。Tensorflow 数据集 API 可以使用它。
使用内存对象创建数据集
我们已使用tf.data.Dataset.from_tensor_slices() API 从内存张量创建数据集。此 API 通过将张量的第一维切分为特征和标签来构建数据集。
train_set = tf.data.Dataset.from_tensor_slices((x_train, y_train)).map(data_aug,num_parallel_calls=tf.data.experimental.AUTOTUNE).shuffle(len_train).batch(BATCH_SIZE).prefetch(1)
test_set = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(BATCH_SIZE)
转换数据集
创建数据集对象后,需要对其进行转换以供 ML 模型使用。转换包括数据重新排列、数据清理以及数据标准化和规范化。标准化特征(在本例中为像素值),使平均值等于零,标准差为 1。这使我们的特征呈高斯分布。数据规范化将特征重新调整为从 0 到特征平均值的特定范围。
train_mean = np.mean(x_train, axis=(0,1,2))
train_std = np.std(x_train, axis=(0,1,2))
normalize = lambda x: ((x - train_mean) / train_std).astype('float32')
# Add Padding
x_train = normalize(pad4(x_train))
x_test = normalize(x_test)
接下来,将数据扩充至 32 x 32 像素大小。
data_aug = lambda x, y: (tf.image.random_flip_left_right(tf.image.random_crop(x, [32, 32, 3])), y)
使用tf.data.Dataset.map()在线转换数据。这与 pandas 的Dataframe.apply()非常相似。此 API 接受一个函数并将其应用于数据集中的每个元素。它接受一个tf.Tensor对象并返回一个新转换的tf.Tensor对象。
预取数据
当当前批次由 ML 模型提供服务时,prefetch() API会收集将加载到输入管道中的下一批批次。
将数据集加载到模型
将创建的数据集对象输入到基于 tf.keras 的模型中。使用tf.keras模型子类 API定义层。建议使用tf.keras功能 API 来定义神经网络层。
# Using Kaiming He Initialization for kernel nitialization
def init_kaiming_he(shape, dtype=tf.float32, partition_info=None):
fan = np.prod(shape[:-1])
bound = 1 / math.sqrt(fan)
return tf.random.uniform(shape, minval=-bound, maxval=bound, dtype=dtype)
现在,创建一个卷积神经网络,然后进行批量标准化和激活。
# Creating model using subclassing API.
# Same can be achieved using Functional API or Sequential API
class ConvBN(tf.keras.Model):
def __init__(self, c_out):
super().__init__()
self.conv = tf.keras.layers.Conv2D(filters=c_out, kernel_size=3, padding="SAME", kernel_initializer=init_kaiming_he, use_bias=False)
self.bn = tf.keras.layers.BatchNormalization(momentum=0.9, epsilon=1e-5)
def call(self, inputs):
return tf.nn.relu(self.bn(self.conv(inputs)))
现在已经构建了一个 Resnet 块。
class ResBlk(tf.keras.Model):
def __init__(self, c_out, pool, res = False):
super().__init__()
self.conv_bn = ConvBN(c_out)
self.pool = pool
self.res = res
if self.res:
self.res1 = ConvBN(c_out)
self.res2 = ConvBN(c_out)
def call(self, inputs):
h = self.pool(self.conv_bn(inputs))
if self.res:
h = h + self.res2(self.res1(h))
return h Creating model using subclassing API.
# Same can be achieved using Functional API or Sequential API
class ConvBN(tf.keras.Model):
def __init__(self, c_out):
super().__init__()
self.conv = tf.keras.layers.Conv2D(filters=c_out, kernel_size=3, padding="SAME", kernel_initializer=init_kaiming_he, use_bias=False)
self.bn = tf.keras.layers.BatchNormalization(momentum=0.9, epsilon=1e-5)
def call(self, inputs):
return tf.nn.relu(self.bn(self.conv(inputs)))
class CIFAR10AutoGrad(tf.keras.Model):
def __init__(self, c=64, weight=0.125):
super().__init__()
self.maxpool = tf.keras.layers.MaxPooling2D()
self.init_conv_bn = ConvBN(c)
self.blk1 = ResBlk(c*3, self.maxpool, res = True)
self.blk2 = ResBlk(c*6, self.maxpool)
self.blk3 = ResBlk(c*9, self.maxpool, res = True)
self.pool = tf.keras.layers.GlobalMaxPool2D()
self.linear = tf.keras.layers.Dense(10, kernel_initializer=init_kaiming_he, use_bias=False)
self.weight = weight
# Building the model
def call(self, x, y):
h = self.pool(self.maxpool(self.blk3(self.blk2(self.blk1(self.init_conv_bn(x))))))
h = self.linear(h) * self.weight
ce = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=h, labels=y)
loss = tf.reduce_sum(ce)
correct = tf.reduce_sum(tf.cast(tf.math.equal(tf.argmax(h, axis = 1), y), tf.float32))
return loss, correct
使用自定义逻辑进行模型训练
要计算并记录某个操作相对于其输入变量的梯度,请使用tf.GradientTape 。所有前向操作都会记录在磁带上,要计算这些操作的梯度,磁带会倒放并丢弃。要使用tf.GradientTape()训练模型调用,请在tf.GradientTape上下文管理器中对输入张量调用前向传递,然后计算损失函数。
t = time.time()
def fit():
# Create batches for test data
test_set = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(BATCH_SIZE)
train_set = tf.data.Dataset.from_tensor_slices((x_train, y_train)).map(data_aug,num_parallel_calls=tf.data.experimental.AUTOTUNE).shuffle(len_train).batch(BATCH_SIZE).prefetch(1)
#summary_writer = tf.summary.create_file_writer('./log/{}'.format(dt.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")))
train_summary_writer = tf.summary.create_file_writer('./logs/train')
test_summary_writer = tf.summary.create_file_writer('./logs/test')
for epoch in range(EPOCHS):
train_loss = test_loss = train_acc = test_acc = 0.0
with train_summary_writer.as_default():
tf.summary.trace_on(graph=True, profiler=True) # To start capturing profiler in Tensorboard
tf.keras.backend.set_learning_phase(1)
for (x, y) in tqdm(train_set):# Iterate over the batches of train dataset objects .
with tf.GradientTape() as tape:
#logits,actual value for this minibatch
loss, correct = model(x, y)
var = model.trainable_variables
grads = tape.gradient(loss, var)
opt.apply_gradients(zip(grads, var))
global_step.assign_add(1)
# Add extra losses created during this forward pass
train_loss += loss.numpy()
train_acc += correct.numpy()
tf.summary.scalar('train loss', train_loss/len_train, step=epoch)
tf.summary.scalar('train acc', (train_acc/len_train)*100, step=epoch)
tf.summary.trace_export(name="Train", step=epoch,profiler_outdir='./logs/train/trace') # Close Profiling when we do export
tf.keras.backend.set_learning_phase(0)
with test_summary_writer.as_default():
tf.summary.trace_on(graph=True, profiler=True)
for (x, y) in test_set:# Iterate over the batches of train dataset objects .
loss, correct = model(x, y)
test_loss += loss.numpy()
test_acc += correct.numpy()
tf.summary.scalar('test loss', test_loss/len_test, step=epoch)
tf.summary.scalar('test acc', (test_acc/len_test)*100, step=epoch)
tf.summary.trace_export(name="Test", step=epoch,profiler_outdir='./logs/test/trace')
template = 'Epoch {}, lr:{:.3f},Train Loss: {:.3f},Train Accuracy: {:.3f}, Test Loss: {:.3f}, Test Accuracy: {:.3f},Time Taken: {:.2f}'
print (template.format(epoch+1,lr_schedule(epoch+1),train_loss/len_train,(train_acc/len_train)*100,test_loss/len_test,(test_acc/len_test)*100,time.time() - t))
使用 TensorBoard 进行自定义训练
- 创建状态指标,用于累积训练中的值,并可在训练过程中的任何时间点记录。此示例使用 train_acc、train_loss、test_acc 和 test_loss。
- 定义计算指标并累积指标的训练和测试函数。
- 使用tf.summary.scalar()在指标结果上记录指标。
tf.summary.create_file_writer('./logs/train')
上述代码创建了一个文件来存储损失和准确度指标的摘要。我们需要使用以下 API 启用它。
tf.summary.trace_on(graph=True, profiler=True)
可以使用以下命令导出摘要。
tf.summary.trace_export(name="Train", step=epoch,profiler_outdir='./logs/train')
以同样的方式,调用测试或验证数据的摘要写入器。当您调用tf.summary.trace_export()时,tf.summary.trace_on()将关闭。
可视化 Tensorboard
%tensorboard --logdir logs/
使用 CProfile 进行时间分析
Tensorboard 跳过构建图表的第一次迭代。Tensorboard 中的分析器选项卡显示每个资源执行所花费的时间。要查看自定义可视化效果,请使用 CProfile 计算每个函数调用所花费的时间。
if __name__ == '__main__':
profile.runctx('print (fit())', globals(),locals(),"./logs/cifar10.prof")
可以使用统计模块读取已保存的分析器日志。
from pstats import Stats
from io import StringIO
# print stats to a string
result=StringIO()
stats = Stats("./logs/cifar10.prof",stream=result)
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats()
result=result.getvalue()
# chop the string into a csv-like buffer
result='ncalls'+result.split('ncalls')[-1]
result='\n'.join([', '.join(line.rstrip().split(None,6)) for line in result.split('\n')])
print(result)
# save it to disk
f=open('./logs/cifar10.csv','w')
f.write(result)
f.close()
免责声明:本内容来源于第三方作者授权、网友推荐或互联网整理,旨在为广大用户提供学习与参考之用。所有文本和图片版权归原创网站或作者本人所有,其观点并不代表本站立场。如有任何版权侵犯或转载不当之情况,请与我们取得联系,我们将尽快进行相关处理与修改。感谢您的理解与支持!
请先 登录后发表评论 ~