在本笔记本中,我们将通过一个实际例子来比较半反梯度(HIGs)与其他用于训练具有物理损失函数的神经网络的方法。具体而言,我们将比较以下几种方法:

  1. Adam:作为标准的基于梯度下降(GD)的网络优化器,
  2. 尺度不变物理学:先前描述的完全反演物理学的算法,
  3. 半逆梯度:在局部和联合反演物理学和网络。

22.1 反问题设置

学习任务是找到控制函数来控制耦合振荡器系统的动力学。这是物理学中的一个经典问题,也是一个评估HIGs的好案例,因为它的规模较小。我们使用两个质点,因此对于两个质点的位置和速度,我们只有四个自由度(相对于例如仅具有32个单元格沿x和y的“小”流体模拟,我们将获得个未知数)。

尽管如此,振荡器是一个高度非平凡的案例:我们的目标是应用控制,使得在选择的时间间隔后再次达到初始状态。我们将使用24步四阶Runge-Kutta方案,因此NN必须学习如何在所有时间步骤中最好地“推动”两个质点,使它们在正确的时间以正确的速度到达所需的位置。

一个由个耦合振子组成的系统可以用以下哈密顿量描述:

它为下面的 RK 4 时间积分提供了基础。

22.2 问题描述

更具体地说,我们考虑一组不同的物理输入()。通过使用相应的控制函数(),我们可以影响我们的物理系统的时间演化并获得一个输出状态():

如果我们想将给定的初始状态()演化为给定的目标状态(),我们就会得到一个控制函数()的反问题。期望目标状态()与接收到的目标状态()之间的质量由损失函数()来衡量。

如果我们使用一个神经网络(参数化为)来学习一组输入/输出对()上的控制函数,我们将上述物理优化任务转化为一个学习问题。

在设置物理求解器之前,我们导入必要的库(本例使用TensorFlow),并定义主要的全局变量MODE,它在_Adam_('GD')、尺度不变物理('SIP')和_半反梯度_('HIG')之间切换。

import numpy as np import tensorflow as tf import time, os # main switch for the three methods: MODE = 'HIG' # HIG | SIP | GD

22.3 耦合线性振子模拟

对于物理模拟,我们将解决一个耦合线性振子系统带有控制项的微分方程。时间积分使用四阶龙格-库塔方案。

下面,我们首先定义一些全局常量:Nx:振子的数量,Nt:时间演化步数,以及DT:一个时间步长的长度。然后我们定义一个帮助函数来设置一个拉普拉斯模板,名为coupled_oscillators_batch()的函数计算整个小批量值的模拟,最后是solver()函数,它使用给定的控制信号运行所需的时间步数。

Nx = 2 Nt = 24 DT = 0.5 def build_laplace(n,boundary='0'): if n==1: return np.zeros((1,1),dtype=np.float32) d1 = -2 * np.ones((n,),dtype=np.float32) d2 = 1 * np.ones((n-1,),dtype=np.float32) lap = np.zeros((n,n),dtype=np.float32) lap[range(n),range(n)]=d1 lap[range(1,n),range(n-1)]=d2 lap[range(n-1),range(1,n)]=d2 if boundary=='0': lap[0,0]=lap[n-1,n-1]=-1 return lap @tf.function def coupled_oscillators_batch( x, control): ''' ODE of type: x' = f(x) :param x_in: position and velocities, shape: (batch, 2 * number of osc) order second index: x_i , v_i :param control: control function, shape: (batch,) :return: ''' #print('coupled_oscillators_batch') n_osc = x.shape[1]//2 # natural time evo a1 = np.array([[0,1],[-1,0]],dtype=np.float32) a2 = np.eye(n_osc,dtype=np.float32) A = np.kron(a1,a2) x_dot1 = tf.tensordot(x,A,axes = (1,1)) # interaction term interaction_strength = 0.2 b1 = np.array([[0,0],[1,0]],dtype=np.float32) b2 = build_laplace(n_osc) B = interaction_strength * np.kron(b1,b2) x_dot2 = tf.tensordot(x,B, axes=(1, 1)) # control term control_vector = np.zeros((n_osc,),dtype=np.float32) control_vector[-1] = 1.0 c1 = np.array([0,1],dtype=np.float32) c2 = control_vector C = np.kron(c1,c2) x_dot3 = tf.tensordot(control,C, axes=0) #all terms x_dot = x_dot1 + x_dot2 +x_dot3 return x_dot @tf.function def runge_kutta_4_batch(x_0, dt, control, ODE_f_batch): f_0_0 = ODE_f_batch(x_0, control) x_14 = x_0 + 0.5 * dt * f_0_0 f_12_14 = ODE_f_batch(x_14, control) x_12 = x_0 + 0.5 * dt * f_12_14 f_12_12 = ODE_f_batch(x_12, control) x_34 = x_0 + dt * f_12_12 terms = f_0_0 + 2 * f_12_14 + 2 * f_12_12 + ODE_f_batch(x_34, control) x1 = x_0 + dt * terms / 6 return x1 @tf.function def solver(x0, control): x = x0 for i in range(Nt): x = runge_kutta_4_batch(x, DT, control[:,i], coupled_oscillators_batch) return x

22.4 训练设置

神经网络本身非常简单:它由四个密集层组成(中间的每个层有20个神经元),并使用tanh激活函数。

act = tf.keras.activations.tanh model = tf.keras.models.Sequential([ tf.keras.layers.InputLayer(input_shape=(2*Nx)), tf.keras.layers.Dense(20, activation=act), tf.keras.layers.Dense(20, activation=act), tf.keras.layers.Dense(20, activation=act), tf.keras.layers.Dense(Nt, activation='linear') ])

作为损失函数, 我们将使用 损失:

@tf.function def loss_function(a,b): diff = a-b loss_batch = tf.reduce_sum(diff**2,axis=1) loss = tf.reduce_sum(loss_batch) return loss

作为训练的数据集,我们简单地创建了4k个随机位置值,这些位置值是振荡器在模拟开始时的初始状态(X_TRAIN),并且在模拟结束时应该返回到这些初始状态(Y_TRAIN)。由于它们应该返回到初始状态,我们有X_TRAIN=Y_TRAIN

N = 2**12 X_TRAIN = np.random.rand(N, 2 * Nx).astype(np.float32) Y_TRAIN = X_TRAIN # the target states are identical

22.5 训练

神经网络训练的优化过程需要设置一些全局参数。下一个单元格初始化了适合三种方法的一些合适值。这些值是根据经验确定的,对于每种方法都能够发挥最佳效果。如果我们尝试为所有方法使用相同的设置,这将不可避免地使其中一些方法的比较不公平。

  1. Adam:这是最广泛使用的神经网络优化器,我们在这里使用它作为GD家族的代表。请注意,截断参数对Adam没有意义。

  2. SIP:指定的优化器是用于网络优化的。物理反演是通过高斯-牛顿完成的,并且对应于精确反演,因为物理优化景观是二次的。对于高斯-牛顿中的雅可比矩阵反演,我们可以指定截断参数。

  3. HIG:要获得HIG算法,必须将优化器设置为SGD。对于雅可比矩阵的半反演,我们可以指定截断参数。最佳批量大小通常比其他两种方法要低,学习率为1通常非常有效。

最大训练时间以秒为单位通过下面的 MAX_TIME 设置。

if MODE=='HIG': # HIG training OPTIMIZER = tf.keras.optimizers.SGD BATCH_SIZE = 32 # larger batches make HIGs unnecessariy slow... LR = 1.0 TRUNC = 10**-10 elif MODE=='SIP': # SIP training OPTIMIZER = tf.keras.optimizers.Adam BATCH_SIZE = 256 LR = 0.001 # for the internal step with Adam TRUNC = 0 # not used else: # Adam Training (as GD representative) MODE = 'GD' OPTIMIZER = tf.keras.optimizers.Adam BATCH_SIZE = 256 LR = 0.001 TRUNC = 0 # not used # global parameters for all three methods MAX_TIME = 100 # [s] print("Running variant: "+MODE)

输出结果:

Running variant: HIG

下一个函数HIG_pinv()是一个关键函数:它用于构建给定HIGs矩阵的半逆。它计算SVD,取奇异值的平方根,然后重新组装矩阵。

from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_shape from tensorflow.python.ops import array_ops from tensorflow.python.ops import math_ops from tensorflow.python.util import dispatch from tensorflow.python.util.tf_export import tf_export from tensorflow.python.ops.linalg.linalg_impl import _maybe_validate_matrix,svd # partial inversion of the Jacobian via SVD: # this function is adopted from tensorflow's SVD function, and published here under its license: Apache-2.0 License , https://github.com/tensorflow/tensorflow/blob/master/LICENSE @tf_export('linalg.HIG_pinv') @dispatch.add_dispatch_support def HIG_pinv(a, rcond=None,beta=0.5, validate_args=False, name=None): with ops.name_scope(name or 'pinv'): a = ops.convert_to_tensor(a, name='a') assertions = _maybe_validate_matrix(a, validate_args) if assertions: with ops.control_dependencies(assertions): a = array_ops.identity(a) dtype = a.dtype.as_numpy_dtype if rcond is None: def get_dim_size(dim): dim_val = tensor_shape.dimension_value(a.shape[dim]) if dim_val is not None: return dim_val return array_ops.shape(a)[dim] num_rows = get_dim_size(-2) num_cols = get_dim_size(-1) if isinstance(num_rows, int) and isinstance(num_cols, int): max_rows_cols = float(max(num_rows, num_cols)) else: max_rows_cols = math_ops.cast( math_ops.maximum(num_rows, num_cols), dtype) rcond = 10. * max_rows_cols * np.finfo(dtype).eps rcond = ops.convert_to_tensor(rcond, dtype=dtype, name='rcond') [ singular_values, left_singular_vectors, right_singular_vectors, ] = svd( a, full_matrices=False, compute_uv=True) cutoff = rcond * math_ops.reduce_max(singular_values, axis=-1) singular_values = array_ops.where_v2( singular_values > array_ops.expand_dims_v2(cutoff, -1), singular_values**beta, np.array(np.inf, dtype)) a_pinv = math_ops.matmul( right_singular_vectors / array_ops.expand_dims_v2(singular_values, -2), left_singular_vectors, adjoint_b=True) if a.shape is not None and a.shape.rank is not None: a_pinv.set_shape(a.shape[:-2].concatenate([a.shape[-1], a.shape[-2]])) return a_pinv

现在我们已经准备好运行训练了。下一个单元格定义了一个Python类来组织神经网络优化。它接收物理求解器、网络模型、损失函数和数据集,并在给定的时间限制MAX_TIME内运行尽可能多的周期。

根据选择的优化方法,小批量更新有所不同:

  1. Adam:计算损失梯度,然后应用Adam更新。
  2. PG:计算损失梯度和物理雅可比矩阵,逐个数据点地求逆,通过代理损失和Adam计算网络更新。
  3. HIG:计算损失梯度和网络-物理雅可比矩阵,然后联合计算半求逆,并使用得到的步长更新网络参数。

优化器类的mini_batch_update()方法实现了这三种变体。

class Optimization(): def __init__(self,model,solver,loss_function,x_train,y_train): self.model = model self.solver = solver self.loss_function = loss_function self.x_train = x_train self.y_train = y_train self.y_dim = y_train.shape[1] self.weight_shapes = [weight_tensor.shape for weight_tensor in self.model.trainable_weights] def set_params(self,batch_size,learning_rate,optimizer,max_time,mode,trunc): self.number_of_batches = N // batch_size self.max_time = max_time self.batch_size = batch_size self.learning_rate = learning_rate self.optimizer = optimizer(learning_rate) self.mode = mode self.trunc = trunc def computation(self,x_batch, y_batch): control_batch = self.model(y_batch) y_prediction_batch = self.solver(x_batch,control_batch) loss = self.loss_function(y_batch,y_prediction_batch) return loss @tf.function def gd_get_derivatives(self,x_batch, y_batch): with tf.GradientTape(persistent=True) as tape: tape.watch(self.model.trainable_variables) loss = self.computation(x_batch,y_batch) loss_per_dp = loss / self.batch_size grad = tape.gradient(loss_per_dp, self.model.trainable_variables) return grad @tf.function def pg_get_physics_derivatives(self,x_batch, y_batch): # physics gradient for SIP with tf.GradientTape(persistent=True) as tape: control_batch = self.model(y_batch) tape.watch(control_batch) y_prediction_batch = self.solver(x_batch,control_batch) loss = self.loss_function(y_batch,y_prediction_batch) loss_per_dp = loss / self.batch_size jacy = tape.batch_jacobian(y_prediction_batch,control_batch) grad = tape.gradient(loss_per_dp, y_prediction_batch) return jacy,grad,control_batch @tf.function def pg_get_network_derivatives(self,x_batch, y_batch,new_control_batch): #physical grads with tf.GradientTape(persistent=True) as tape: tape.watch(self.model.trainable_variables) control_batch = self.model(y_batch) loss = self.loss_function(new_control_batch,control_batch) #y_prediction_batch = self.solver(x_batch,control_batch) #loss = self.loss_function(y_batch,y_prediction_batch) loss_per_dp = loss / self.batch_size network_grad = tape.gradient(loss_per_dp, self.model.trainable_variables) return network_grad @tf.function def hig_get_derivatives(self,x_batch,y_batch): with tf.GradientTape(persistent=True) as tape: tape.watch(self.model.trainable_variables) control_batch = self.model(y_batch) y_prediction_batch = self.solver(x_batch,control_batch) loss = self.loss_function(y_batch,y_prediction_batch) loss_per_dp = loss / self.batch_size jacy = tape.jacobian(y_prediction_batch, self.model.trainable_variables, experimental_use_pfor=True) loss_grad = tape.gradient(loss_per_dp, y_prediction_batch) return jacy, loss_grad def mini_batch_update(self,x_batch, y_batch): if self.mode=="GD": grad = self.gd_get_derivatives(x_batch, y_batch) self.optimizer.apply_gradients(zip(grad, self.model.trainable_weights)) elif self.mode=="SIP": jacy,grad,control_batch = self.pg_get_physics_derivatives(x_batch, y_batch) grad_e = tf.expand_dims(grad,-1) pinv = tf.linalg.pinv(jacy,rcond=10**-5) delta_control_label_batch = (pinv@grad_e)[:,:,0] new_control_batch = control_batch - delta_control_label_batch network_grad = self.pg_get_network_derivatives(x_batch, y_batch,new_control_batch) self.optimizer.apply_gradients(zip(network_grad, self.model.trainable_weights)) elif self.mode =='HIG': jacy, grad = self.hig_get_derivatives(x_batch, y_batch) flat_jacy_list = [tf.reshape(jac, (self.batch_size * self.y_dim, -1)) for jac in jacy] flat_jacy = tf.concat(flat_jacy_list, axis=1) flat_grad = tf.reshape(grad, (-1,)) inv = HIG_pinv(flat_jacy, rcond=self.trunc) processed_derivatives = tf.tensordot(inv, flat_grad, axes=(1, 0)) #processed_derivatives = self.linear_solve(flat_jacy, flat_grad) update_list = [] l1 = 0 for k, shape in enumerate(self.weight_shapes): l2 = l1 + np.prod(shape) upd = processed_derivatives[l1:l2] upd = np.reshape(upd, shape) update_list.append(upd) l1 = l2 self.optimizer.apply_gradients(zip(update_list, self.model.trainable_weights)) def epoch_update(self): for batch_index in range(self.number_of_batches): position = batch_index * self.batch_size x_batch = self.x_train[position:position + self.batch_size] y_batch = self.y_train[position:position + self.batch_size] self.mini_batch_update(x_batch, y_batch) def eval(self,epoch,wc_time,ep_dur): train_loss = self.computation(self.x_train,self.y_train) train_loss_per_dp = train_loss / N if epoch<5 or epoch%20==0: print('Epoch: ', epoch,', wall clock time: ',wc_time,', loss: ', float(train_loss_per_dp) ) #print('TrainLoss:', train_loss_per_dp) #print('Epoch: ', epoch,' WallClockTime: ',wc_time,' EpochDuration: ',ep_dur ) return train_loss_per_dp def start_training(self): init_loss = self.eval(0,0,0) init_time = time.time() time_list = [init_time] loss_list = [init_loss] epoch=0 wc_time = 0 while wc_time<self.max_time: duration = time.time() self.epoch_update() duration = time.time()-duration epoch += 1 wc_time += duration loss = self.eval(epoch,wc_time,duration) time_list.append(duration) loss_list.append(loss) time_list = np.array(time_list) loss_list = np.array(loss_list) time_list[0] = 0 return time_list, loss_list

剩下的就是使用选择的全局参数开始训练,并将结果收集在time_listloss_list中。

opt = Optimization(model, solver, loss_function, X_TRAIN, Y_TRAIN) opt.set_params(BATCH_SIZE, LR, OPTIMIZER, MAX_TIME, MODE, TRUNC) time_list, loss_list = opt.start_training()
Epoch: 0 , wall clock time: 0 , loss: 1.4401766061782837 Epoch: 1 , wall clock time: 28.06755018234253 , loss: 5.44398972124327e-05 Epoch: 2 , wall clock time: 31.38792371749878 , loss: 1.064436037268024e-05 Epoch: 3 , wall clock time: 34.690271854400635 , loss: 3.163525434501935e-06 Epoch: 4 , wall clock time: 37.9914448261261 , loss: 1.1857609933940694e-06

22.6 评估

现在我们可以评估培训过程随时间的收敛情况。以下图表显示了损失随时间(以秒为单位)的演变。

import matplotlib.pyplot as plt plt.plot(np.cumsum(time_list),loss_list) plt.yscale('log') plt.xlabel('Wall clock time [s]'); plt.ylabel('Loss') plt.title('Linear chain - '+MODE+' with '+str(OPTIMIZER.__name__)) plt.show()

对于这三种方法,你会看到一个很大的线性步骤,就在开始时。由于我们正在公平地测量整个运行时间,因此这个第一步包括了所有TensorFlow初始化步骤,对于HIG和SIP而言,这些步骤显著更为复杂。Adam在初始化方面要快得多,并且每个训练迭代也同样更快。

这三种方法本身都能降低损失。更有趣的是看它们如何比较。为此,下一个单元格会存储训练演变,并且需要使用这三种方法中的每一种运行一次来生成最终的比较图。

path = '/home/' namet = 'time' namel = 'loss' np.savetxt(path+MODE+namet+'.txt',time_list) np.savetxt(path+MODE+namel+'.txt',loss_list)

在使用每种方法进行运行后,我们可以将它们并列显示:

from os.path import exists # if previous runs are available, compare all 3 if exists(path+'HIG'+namet+'.txt') and exists(path+'HIG'+namel+'.txt') and exists(path+'SIP'+namet+'.txt') and exists(path+'SIP'+namel+'.txt') and exists(path+'GD'+namet+'.txt') and exists(path+'GD'+namel+'.txt'): lt_hig = np.loadtxt(path+'HIG'+namet+'.txt') ll_hig = np.loadtxt(path+'HIG'+namel+'.txt') lt_sip = np.loadtxt(path+'SIP'+namet+'.txt') ll_sip = np.loadtxt(path+'SIP'+namel+'.txt') lt_gd = np.loadtxt(path+'GD'+namet+'.txt') ll_gd = np.loadtxt(path+'GD'+namel+'.txt') plt.plot(np.cumsum(lt_gd),ll_gd, label="GD") plt.plot(np.cumsum(lt_sip),ll_sip, label="SIP") plt.plot(np.cumsum(lt_hig),ll_hig, label="HIG") plt.yscale('log') plt.xlabel('Wall clock time [s]'); plt.ylabel('Training loss'); plt.legend() plt.title('Linear chain comparison ') plt.show() else: print("Run this notebook three times with MODE='HIG|SIP|GD' to produce the final graph")

这个图表清晰地显示了收敛方面的显著差异:Adam(蓝色“GD”曲线)执行了大量的更新,但其对黑塞矩阵的粗略近似不足以收敛到高精度,它停滞在高损失水平上。

在这种情况下,SIP更新并没有超越Adam。这是由于相对简单的物理(线性振荡器)以及SIP更高的运行时成本所致。如果您在这个例子中运行得更久一些,SIP实际上会超过Adam,但开始受到完全反演的数值问题的影响。

HIGs的表现比另外两种方法要好得多:尽管每次迭代相对较慢,但半反演产生了非常好的更新,使得训练能够非常快速地收敛到非常低的损失值。HIGs达到的精度比其他两种方法高四个数量级左右。

22.7 下一步

这个笔记本有许多有趣的方向可以进行进一步的测试和修改:

  • 最重要的是,到目前为止,我们实际上只看了训练表现!这使得笔记本保持了相对较短的长度,但这显然是不好的做法。虽然我们声称HIGs同样适用于_真实_测试数据,但这是使用这个笔记本的一个很好的下一步:分配适当的测试样本,并在测试数据上重新运行三种方法的评估。
  • 此外,您可以改变物理行为:使用更多的振荡器来延长或缩短时间跨度,甚至包括非线性力(如HIG论文中所用)。请注意:对于后者,SIP版本将需要一个新的逆求解器。