在本笔记本中,我们将通过一个实际例子来比较半反梯度(HIGs)与其他用于训练具有物理损失函数的神经网络的方法。具体而言,我们将比较以下几种方法:

  1. Adam:作为标准的基于梯度下降(GD)的网络优化器,
  2. 尺度不变物理学:先前描述的完全反演物理学的算法,
  3. 半逆梯度:在局部和联合反演物理学和网络。

22.1 反问题设置

学习任务是找到控制函数来控制耦合振荡器系统的动力学。这是物理学中的一个经典问题,也是一个评估HIGs的好案例,因为它的规模较小。我们使用两个质点,因此对于两个质点的位置和速度,我们只有四个自由度(相对于例如仅具有32个单元格沿x和y的“小”流体模拟,我们将获得个未知数)。

尽管如此,振荡器是一个高度非平凡的案例:我们的目标是应用控制,使得在选择的时间间隔后再次达到初始状态。我们将使用24步四阶Runge-Kutta方案,因此NN必须学习如何在所有时间步骤中最好地“推动”两个质点,使它们在正确的时间以正确的速度到达所需的位置。

一个由个耦合振子组成的系统可以用以下哈密顿量描述:

它为下面的 RK 4 时间积分提供了基础。

22.2 问题描述

更具体地说,我们考虑一组不同的物理输入()。通过使用相应的控制函数(),我们可以影响我们的物理系统的时间演化并获得一个输出状态():

如果我们想将给定的初始状态()演化为给定的目标状态(),我们就会得到一个控制函数()的反问题。期望目标状态()与接收到的目标状态()之间的质量由损失函数()来衡量。

如果我们使用一个神经网络(参数化为)来学习一组输入/输出对()上的控制函数,我们将上述物理优化任务转化为一个学习问题。

在设置物理求解器之前,我们导入必要的库(本例使用TensorFlow),并定义主要的全局变量MODE,它在_Adam_('GD')、尺度不变物理('SIP')和_半反梯度_('HIG')之间切换。

import numpy as np
import tensorflow as tf
import time, os

# main switch for the three methods:
MODE = 'HIG'  # HIG | SIP | GD

22.3 耦合线性振子模拟

对于物理模拟,我们将解决一个耦合线性振子系统带有控制项的微分方程。时间积分使用四阶龙格-库塔方案。

下面,我们首先定义一些全局常量:Nx:振子的数量,Nt:时间演化步数,以及DT:一个时间步长的长度。然后我们定义一个帮助函数来设置一个拉普拉斯模板,名为coupled_oscillators_batch()的函数计算整个小批量值的模拟,最后是solver()函数,它使用给定的控制信号运行所需的时间步数。

Nx = 2
Nt = 24
DT = 0.5

def build_laplace(n,boundary='0'):
    if n==1:
        return np.zeros((1,1),dtype=np.float32)
    d1 = -2 * np.ones((n,),dtype=np.float32)
    d2 = 1 * np.ones((n-1,),dtype=np.float32)
    lap = np.zeros((n,n),dtype=np.float32)
    lap[range(n),range(n)]=d1
    lap[range(1,n),range(n-1)]=d2
    lap[range(n-1),range(1,n)]=d2
    if boundary=='0':
        lap[0,0]=lap[n-1,n-1]=-1

    return lap

@tf.function
def coupled_oscillators_batch( x, control):
    '''
    ODE of type:    x' = f(x)
    :param x_in:    position and velocities, shape: (batch, 2 * number of osc) order second index: x_i , v_i
    :param control: control function, shape: (batch,)
    :return:
    '''
    #print('coupled_oscillators_batch')
    n_osc = x.shape[1]//2

    # natural time evo
    a1 = np.array([[0,1],[-1,0]],dtype=np.float32)
    a2 = np.eye(n_osc,dtype=np.float32)
    A = np.kron(a1,a2)
    x_dot1 = tf.tensordot(x,A,axes = (1,1))

    # interaction term
    interaction_strength = 0.2
    b1 = np.array([[0,0],[1,0]],dtype=np.float32)
    b2 = build_laplace(n_osc)
    B = interaction_strength * np.kron(b1,b2)
    x_dot2 = tf.tensordot(x,B, axes=(1, 1))

    # control term
    control_vector = np.zeros((n_osc,),dtype=np.float32)
    control_vector[-1] = 1.0
    c1 = np.array([0,1],dtype=np.float32)
    c2 = control_vector
    C = np.kron(c1,c2)
    x_dot3 = tf.tensordot(control,C, axes=0)

    #all terms
    x_dot = x_dot1 + x_dot2 +x_dot3
    return x_dot

@tf.function
def runge_kutta_4_batch(x_0, dt, control, ODE_f_batch):

    f_0_0 = ODE_f_batch(x_0, control)
    x_14 = x_0 + 0.5 * dt * f_0_0

    f_12_14 = ODE_f_batch(x_14, control)
    x_12 = x_0 + 0.5 * dt * f_12_14

    f_12_12 = ODE_f_batch(x_12, control)
    x_34 = x_0 + dt * f_12_12

    terms = f_0_0 + 2 * f_12_14 + 2 * f_12_12 + ODE_f_batch(x_34, control)
    x1 = x_0 + dt * terms / 6

    return x1

@tf.function
def solver(x0, control):
    x = x0
    for i in range(Nt):
        x = runge_kutta_4_batch(x, DT, control[:,i], coupled_oscillators_batch)
    return x

22.4 训练设置

神经网络本身非常简单:它由四个密集层组成(中间的每个层有20个神经元),并使用tanh激活函数。

act = tf.keras.activations.tanh
model = tf.keras.models.Sequential([
    tf.keras.layers.InputLayer(input_shape=(2*Nx)),
    tf.keras.layers.Dense(20, activation=act),
    tf.keras.layers.Dense(20, activation=act),
    tf.keras.layers.Dense(20, activation=act),
    tf.keras.layers.Dense(Nt, activation='linear')
    ])

作为损失函数, 我们将使用 损失:

@tf.function
def loss_function(a,b):
    diff = a-b
    loss_batch = tf.reduce_sum(diff**2,axis=1)
    loss = tf.reduce_sum(loss_batch)
    return loss

作为训练的数据集,我们简单地创建了4k个随机位置值,这些位置值是振荡器在模拟开始时的初始状态(X_TRAIN),并且在模拟结束时应该返回到这些初始状态(Y_TRAIN)。由于它们应该返回到初始状态,我们有X_TRAIN=Y_TRAIN

N = 2**12
X_TRAIN = np.random.rand(N, 2 * Nx).astype(np.float32)
Y_TRAIN = X_TRAIN # the target states are identical

22.5 训练

神经网络训练的优化过程需要设置一些全局参数。下一个单元格初始化了适合三种方法的一些合适值。这些值是根据经验确定的,对于每种方法都能够发挥最佳效果。如果我们尝试为所有方法使用相同的设置,这将不可避免地使其中一些方法的比较不公平。

  1. Adam:这是最广泛使用的神经网络优化器,我们在这里使用它作为GD家族的代表。请注意,截断参数对Adam没有意义。

  2. SIP:指定的优化器是用于网络优化的。物理反演是通过高斯-牛顿完成的,并且对应于精确反演,因为物理优化景观是二次的。对于高斯-牛顿中的雅可比矩阵反演,我们可以指定截断参数。

  3. HIG:要获得HIG算法,必须将优化器设置为SGD。对于雅可比矩阵的半反演,我们可以指定截断参数。最佳批量大小通常比其他两种方法要低,学习率为1通常非常有效。

最大训练时间以秒为单位通过下面的 MAX_TIME 设置。

if MODE=='HIG': # HIG training
  OPTIMIZER = tf.keras.optimizers.SGD
  BATCH_SIZE = 32 # larger batches make HIGs unnecessariy slow...
  LR = 1.0
  TRUNC = 10**-10

elif MODE=='SIP': # SIP training
  OPTIMIZER = tf.keras.optimizers.Adam
  BATCH_SIZE = 256
  LR = 0.001  # for the internal step with Adam
  TRUNC = 0   # not used

else: #  Adam Training (as GD representative)
  MODE = 'GD' 
  OPTIMIZER = tf.keras.optimizers.Adam
  BATCH_SIZE = 256
  LR = 0.001
  TRUNC = 0 # not used

# global parameters for all three methods
MAX_TIME = 100 # [s]
print("Running variant: "+MODE)

输出结果:

Running variant: HIG

下一个函数HIG_pinv()是一个关键函数:它用于构建给定HIGs矩阵的半逆。它计算SVD,取奇异值的平方根,然后重新组装矩阵。

from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.util import dispatch
from tensorflow.python.util.tf_export import tf_export
from tensorflow.python.ops.linalg.linalg_impl import _maybe_validate_matrix,svd

# partial inversion of the Jacobian via SVD:
# this function is adopted from tensorflow's SVD function, and published here under its license: Apache-2.0 License , https://github.com/tensorflow/tensorflow/blob/master/LICENSE
@tf_export('linalg.HIG_pinv')
@dispatch.add_dispatch_support
def HIG_pinv(a, rcond=None,beta=0.5, validate_args=False, name=None):

  with ops.name_scope(name or 'pinv'):
    a = ops.convert_to_tensor(a, name='a')

    assertions = _maybe_validate_matrix(a, validate_args)
    if assertions:
      with ops.control_dependencies(assertions):
        a = array_ops.identity(a)

    dtype = a.dtype.as_numpy_dtype

    if rcond is None:

      def get_dim_size(dim):
        dim_val = tensor_shape.dimension_value(a.shape[dim])
        if dim_val is not None:
          return dim_val
        return array_ops.shape(a)[dim]

      num_rows = get_dim_size(-2)
      num_cols = get_dim_size(-1)
      if isinstance(num_rows, int) and isinstance(num_cols, int):
        max_rows_cols = float(max(num_rows, num_cols))
      else:
        max_rows_cols = math_ops.cast(
            math_ops.maximum(num_rows, num_cols), dtype)
      rcond = 10. * max_rows_cols * np.finfo(dtype).eps

    rcond = ops.convert_to_tensor(rcond, dtype=dtype, name='rcond')

    [ singular_values, left_singular_vectors, right_singular_vectors, ] = svd(
        a, full_matrices=False, compute_uv=True)

    cutoff = rcond * math_ops.reduce_max(singular_values, axis=-1)
    singular_values = array_ops.where_v2(
        singular_values > array_ops.expand_dims_v2(cutoff, -1), singular_values**beta,
        np.array(np.inf, dtype))

    a_pinv = math_ops.matmul(
        right_singular_vectors / array_ops.expand_dims_v2(singular_values, -2),
        left_singular_vectors,
        adjoint_b=True)

    if a.shape is not None and a.shape.rank is not None:
      a_pinv.set_shape(a.shape[:-2].concatenate([a.shape[-1], a.shape[-2]]))

    return a_pinv

现在我们已经准备好运行训练了。下一个单元格定义了一个Python类来组织神经网络优化。它接收物理求解器、网络模型、损失函数和数据集,并在给定的时间限制MAX_TIME内运行尽可能多的周期。

根据选择的优化方法,小批量更新有所不同:

  1. Adam:计算损失梯度,然后应用Adam更新。
  2. PG:计算损失梯度和物理雅可比矩阵,逐个数据点地求逆,通过代理损失和Adam计算网络更新。
  3. HIG:计算损失梯度和网络-物理雅可比矩阵,然后联合计算半求逆,并使用得到的步长更新网络参数。

优化器类的mini_batch_update()方法实现了这三种变体。

class Optimization():
    def __init__(self,model,solver,loss_function,x_train,y_train):
        self.model = model
        self.solver = solver
        self.loss_function = loss_function
        self.x_train = x_train
        self.y_train = y_train
        self.y_dim = y_train.shape[1]
        self.weight_shapes = [weight_tensor.shape for weight_tensor in self.model.trainable_weights]

    def set_params(self,batch_size,learning_rate,optimizer,max_time,mode,trunc):
        self.number_of_batches = N // batch_size
        self.max_time = max_time
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.optimizer = optimizer(learning_rate)
        self.mode = mode
        self.trunc = trunc


    def computation(self,x_batch, y_batch):
        control_batch = self.model(y_batch)
        y_prediction_batch = self.solver(x_batch,control_batch)
        loss = self.loss_function(y_batch,y_prediction_batch)
        return loss


    @tf.function
    def gd_get_derivatives(self,x_batch, y_batch):

        with tf.GradientTape(persistent=True) as tape:
            tape.watch(self.model.trainable_variables)
            loss = self.computation(x_batch,y_batch)
            loss_per_dp = loss / self.batch_size
        grad = tape.gradient(loss_per_dp, self.model.trainable_variables)
        return grad


    @tf.function
    def pg_get_physics_derivatives(self,x_batch, y_batch): # physics gradient for SIP

        with tf.GradientTape(persistent=True) as tape:
            control_batch = self.model(y_batch)
            tape.watch(control_batch)
            y_prediction_batch = self.solver(x_batch,control_batch)
            loss = self.loss_function(y_batch,y_prediction_batch)
            loss_per_dp = loss / self.batch_size

        jacy = tape.batch_jacobian(y_prediction_batch,control_batch)
        grad = tape.gradient(loss_per_dp, y_prediction_batch)
        return jacy,grad,control_batch

    @tf.function
    def pg_get_network_derivatives(self,x_batch, y_batch,new_control_batch): #physical grads

        with tf.GradientTape(persistent=True) as tape:
            tape.watch(self.model.trainable_variables)
            control_batch = self.model(y_batch)
            loss = self.loss_function(new_control_batch,control_batch)
            #y_prediction_batch = self.solver(x_batch,control_batch)
            #loss = self.loss_function(y_batch,y_prediction_batch)
            loss_per_dp = loss / self.batch_size

        network_grad = tape.gradient(loss_per_dp, self.model.trainable_variables)
        return network_grad

    @tf.function
    def hig_get_derivatives(self,x_batch,y_batch):

        with tf.GradientTape(persistent=True) as tape:
            tape.watch(self.model.trainable_variables)
            control_batch = self.model(y_batch)
            y_prediction_batch = self.solver(x_batch,control_batch)
            loss = self.loss_function(y_batch,y_prediction_batch)
            loss_per_dp = loss / self.batch_size

        jacy = tape.jacobian(y_prediction_batch, self.model.trainable_variables, experimental_use_pfor=True)
        loss_grad = tape.gradient(loss_per_dp, y_prediction_batch)
        return jacy, loss_grad


    def mini_batch_update(self,x_batch, y_batch):
        if self.mode=="GD":
            grad = self.gd_get_derivatives(x_batch, y_batch)
            self.optimizer.apply_gradients(zip(grad, self.model.trainable_weights))

        elif self.mode=="SIP":
            jacy,grad,control_batch = self.pg_get_physics_derivatives(x_batch, y_batch)
            grad_e = tf.expand_dims(grad,-1)
            pinv = tf.linalg.pinv(jacy,rcond=10**-5)
            delta_control_label_batch = (pinv@grad_e)[:,:,0]
            new_control_batch = control_batch - delta_control_label_batch
            network_grad = self.pg_get_network_derivatives(x_batch, y_batch,new_control_batch)
            self.optimizer.apply_gradients(zip(network_grad, self.model.trainable_weights))

        elif self.mode =='HIG':
            jacy, grad = self.hig_get_derivatives(x_batch, y_batch)
            flat_jacy_list = [tf.reshape(jac, (self.batch_size * self.y_dim, -1)) for jac in jacy]
            flat_jacy = tf.concat(flat_jacy_list, axis=1)
            flat_grad = tf.reshape(grad, (-1,))
            inv = HIG_pinv(flat_jacy, rcond=self.trunc)
            processed_derivatives = tf.tensordot(inv, flat_grad, axes=(1, 0))
            #processed_derivatives = self.linear_solve(flat_jacy, flat_grad)
            update_list = []
            l1 = 0
            for k, shape in enumerate(self.weight_shapes):
                l2 = l1 + np.prod(shape)
                upd = processed_derivatives[l1:l2]
                upd = np.reshape(upd, shape)
                update_list.append(upd)
                l1 = l2
            self.optimizer.apply_gradients(zip(update_list, self.model.trainable_weights))


    def epoch_update(self):
        for batch_index in range(self.number_of_batches):
            position = batch_index * self.batch_size
            x_batch = self.x_train[position:position + self.batch_size]
            y_batch = self.y_train[position:position + self.batch_size]
            self.mini_batch_update(x_batch, y_batch)

    def eval(self,epoch,wc_time,ep_dur):
        train_loss = self.computation(self.x_train,self.y_train)
        train_loss_per_dp = train_loss / N
        if epoch<5 or epoch%20==0: print('Epoch: ', epoch,', wall clock time: ',wc_time,', loss: ', float(train_loss_per_dp) )
        #print('TrainLoss:', train_loss_per_dp)
        #print('Epoch: ', epoch,' WallClockTime: ',wc_time,' EpochDuration: ',ep_dur )
        return train_loss_per_dp

    def start_training(self):
        init_loss = self.eval(0,0,0)
        init_time = time.time()
        time_list = [init_time]
        loss_list = [init_loss]

        epoch=0
        wc_time = 0

        while wc_time<self.max_time:
            
            duration = time.time()
            self.epoch_update()
            duration = time.time()-duration

            epoch += 1
            wc_time += duration

            loss = self.eval(epoch,wc_time,duration)
            time_list.append(duration)
            loss_list.append(loss)

        time_list = np.array(time_list)
        loss_list = np.array(loss_list)
        time_list[0] = 0
        return time_list, loss_list

剩下的就是使用选择的全局参数开始训练,并将结果收集在time_listloss_list中。

opt = Optimization(model, solver, loss_function, X_TRAIN, Y_TRAIN)
opt.set_params(BATCH_SIZE, LR, OPTIMIZER, MAX_TIME, MODE, TRUNC)
time_list, loss_list = opt.start_training()
Epoch:  0 , wall clock time:  0 , loss:  1.4401766061782837
Epoch:  1 , wall clock time:  28.06755018234253 , loss:  5.44398972124327e-05
Epoch:  2 , wall clock time:  31.38792371749878 , loss:  1.064436037268024e-05
Epoch:  3 , wall clock time:  34.690271854400635 , loss:  3.163525434501935e-06
Epoch:  4 , wall clock time:  37.9914448261261 , loss:  1.1857609933940694e-06

22.6 评估

现在我们可以评估培训过程随时间的收敛情况。以下图表显示了损失随时间(以秒为单位)的演变。

import matplotlib.pyplot as plt

plt.plot(np.cumsum(time_list),loss_list)
plt.yscale('log')
plt.xlabel('Wall clock time [s]'); plt.ylabel('Loss')
plt.title('Linear chain - '+MODE+' with '+str(OPTIMIZER.__name__))
plt.show()

对于这三种方法,你会看到一个很大的线性步骤,就在开始时。由于我们正在公平地测量整个运行时间,因此这个第一步包括了所有TensorFlow初始化步骤,对于HIG和SIP而言,这些步骤显著更为复杂。Adam在初始化方面要快得多,并且每个训练迭代也同样更快。

这三种方法本身都能降低损失。更有趣的是看它们如何比较。为此,下一个单元格会存储训练演变,并且需要使用这三种方法中的每一种运行一次来生成最终的比较图。

path = '/home/'
namet = 'time'
namel = 'loss'
np.savetxt(path+MODE+namet+'.txt',time_list)
np.savetxt(path+MODE+namel+'.txt',loss_list)

在使用每种方法进行运行后,我们可以将它们并列显示:

from os.path import exists

# if previous runs are available, compare all 3
if exists(path+'HIG'+namet+'.txt') and exists(path+'HIG'+namel+'.txt') and exists(path+'SIP'+namet+'.txt') and exists(path+'SIP'+namel+'.txt') and exists(path+'GD'+namet+'.txt') and exists(path+'GD'+namel+'.txt'):
  lt_hig = np.loadtxt(path+'HIG'+namet+'.txt')
  ll_hig = np.loadtxt(path+'HIG'+namel+'.txt')

  lt_sip = np.loadtxt(path+'SIP'+namet+'.txt')
  ll_sip = np.loadtxt(path+'SIP'+namel+'.txt')

  lt_gd = np.loadtxt(path+'GD'+namet+'.txt')
  ll_gd = np.loadtxt(path+'GD'+namel+'.txt')

  plt.plot(np.cumsum(lt_gd),ll_gd,   label="GD")
  plt.plot(np.cumsum(lt_sip),ll_sip, label="SIP")
  plt.plot(np.cumsum(lt_hig),ll_hig,   label="HIG")
  plt.yscale('log')
  plt.xlabel('Wall clock time [s]'); plt.ylabel('Training loss'); plt.legend()
  plt.title('Linear chain comparison ')
  plt.show()
else:
  print("Run this notebook three times with MODE='HIG|SIP|GD' to produce the final graph")

这个图表清晰地显示了收敛方面的显著差异:Adam(蓝色“GD”曲线)执行了大量的更新,但其对黑塞矩阵的粗略近似不足以收敛到高精度,它停滞在高损失水平上。

在这种情况下,SIP更新并没有超越Adam。这是由于相对简单的物理(线性振荡器)以及SIP更高的运行时成本所致。如果您在这个例子中运行得更久一些,SIP实际上会超过Adam,但开始受到完全反演的数值问题的影响。

HIGs的表现比另外两种方法要好得多:尽管每次迭代相对较慢,但半反演产生了非常好的更新,使得训练能够非常快速地收敛到非常低的损失值。HIGs达到的精度比其他两种方法高四个数量级左右。

22.7 下一步

这个笔记本有许多有趣的方向可以进行进一步的测试和修改:

  • 最重要的是,到目前为止,我们实际上只看了训练表现!这使得笔记本保持了相对较短的长度,但这显然是不好的做法。虽然我们声称HIGs同样适用于_真实_测试数据,但这是使用这个笔记本的一个很好的下一步:分配适当的测试样本,并在测试数据上重新运行三种方法的评估。
  • 此外,您可以改变物理行为:使用更多的振荡器来延长或缩短时间跨度,甚至包括非线性力(如HIG论文中所用)。请注意:对于后者,SIP版本将需要一个新的逆求解器。