Source code for mxnet.gluon.trainer

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# coding: utf-8
# pylint: disable=line-too-long
"""Parameter optimizer."""
__all__ = ['Trainer']

from .. import optimizer as opt
from ..model import _create_kvstore, _create_sparse_kvstore
from .parameter import ParameterDict, Parameter

[docs]class Trainer(object): """Applies an `Optimizer` on a set of Parameters. Trainer should be used together with `autograd`. Parameters ---------- params : ParameterDict The set of parameters to optimize. optimizer : str or Optimizer The optimizer to use. See `help `_ on Optimizer for a list of available optimizers. optimizer_params : dict Key-word arguments to be passed to optimizer constructor. For example, `{'learning_rate': 0.1}`. All optimizers accept learning_rate, wd (weight decay), clip_gradient, and lr_scheduler. See each optimizer's constructor for a list of additional supported arguments. kvstore : str or KVStore kvstore type for multi-gpu and distributed training. See help on :any:`mxnet.kvstore.create` for more information. compression_params : dict Specifies type of gradient compression and additional arguments depending on the type of compression being used. For example, 2bit compression requires a threshold. Arguments would then be {'type':'2bit', 'threshold':0.5} See mxnet.KVStore.set_gradient_compression method for more details on gradient compression. update_on_kvstore : bool, default None Whether to perform parameter updates on kvstore. If None, then trainer will choose the more suitable option depending on the type of kvstore. Properties ---------- learning_rate : float The current learning rate of the optimizer. Given an Optimizer object optimizer, its learning rate can be accessed as optimizer.learning_rate. """ def __init__(self, params, optimizer, optimizer_params=None, kvstore='device', compression_params=None, update_on_kvstore=None): if isinstance(params, (dict, ParameterDict)): params = list(params.values()) if not isinstance(params, (list, tuple)): raise ValueError( "First argument must be a list or dict of Parameters, " \ "got %s."%(type(params))) self._params = [] # parameters to initialize on the kvstore self._contains_sparse_weight = False self._contains_sparse_grad = False self._param2idx = {} for i, param in enumerate(params): if not isinstance(param, Parameter): raise ValueError( "First argument must be a list or dict of Parameters, " \ "got list of %s."%(type(param))) self._param2idx[param.name] = i self._params.append(param) param._set_trainer(self) if param._stype != 'default': self._contains_sparse_weight = True if param._grad_stype != 'default': self._contains_sparse_grad = True self._compression_params = compression_params optimizer_params = optimizer_params if optimizer_params else {} self._scale = float(optimizer_params.get('rescale_grad', 1.0)) self._contexts = self._check_contexts() self._init_optimizer(optimizer, optimizer_params) self._kvstore_params = {'kvstore': kvstore, 'update_on_kvstore': update_on_kvstore} self._kv_initialized = False self._kvstore = None self._update_on_kvstore = None self._distributed = None self._params_to_init = [] self._reset_kvstore() def _check_contexts(self): contexts = None for param in self._params: ctx = param.list_ctx() assert contexts is None or contexts == ctx, \ "All Parameters must be initialized on the same set of contexts, " \ "but Parameter %s is initialized on %s while previous Parameters " \ "are initialized on %s."%(param.name, str(ctx), str(contexts)) contexts = ctx return contexts def _init_optimizer(self, optimizer, optimizer_params): param_dict = {i: param for i, param in enumerate(self._params)} if isinstance(optimizer, opt.Optimizer): assert not optimizer_params, \ "optimizer_params must be None if optimizer is an instance of " \ "Optimizer instead of str" self._optimizer = optimizer self._optimizer.param_dict = param_dict else: self._optimizer = opt.create(optimizer, param_dict=param_dict, **optimizer_params) self._updaters = [opt.get_updater(self._optimizer) \ for _ in self._contexts] def _init_params(self): """Initialize parameters in the KVStore. Parameters with incomplete initialization are ignored. """ assert self._kv_initialized, "Cannot initialize parameters in KVStore " \ "when KVStore is not initialized." params_to_init = [] if self._kvstore: for param in self._params_to_init: if param._deferred_init: params_to_init.append(param) else: param_arrays = param._check_and_get(param._data, list) idx = self._param2idx[param.name] self._kvstore.init(idx, param_arrays[0]) if param._stype == 'default': self._kvstore.pull(idx, param_arrays, priority=-idx) self._params_to_init = params_to_init def _reset_kvstore(self): """Reset kvstore.""" if self._kvstore and 'dist' in self._kvstore.type: raise RuntimeError("Cannot reset distributed KVStore.") self._kv_initialized = False self._kvstore = None self._distributed = None self._update_on_kvstore = None self._params_to_init = [param for param in self._params] def _init_kvstore(self): """Create kvstore.""" config = self._kvstore_params # if weight is sparse, the weight must be updated on KVStore. # training loop contains: # - row_sparse_pull(sparse_weight) # - forward() # - backward() # - push(sparse_grad), push(dense_grad) # - pull(dense_weight) if self._contains_sparse_weight: kvstore, update_on_kvstore = _create_sparse_kvstore(config['kvstore']) # raise Error if update_on_kvstore is set to False by the user if config['update_on_kvstore'] is False: raise RuntimeError("Cannot set update_on_kvstore to False when sparse weights " "are present.") # if weight is dense and grad is sparse, the weight better not be updated on KVStore. # training loop contains: # - forward() # - backward() # - push(grad) # - pull(grad) # - update(grad, weight) elif self._contains_sparse_grad: arg_arrays = {param.name: param.data(self._contexts[0]) for param in self._params} kvstore, _ = _create_kvstore(config['kvstore'], len(self._contexts), arg_arrays) update_on_kvstore = False # normal case else: arg_arrays = {param.name: param.data(self._contexts[0]) for param in self._params} kvstore, update_on_kvstore = _create_kvstore(config['kvstore'], len(self._contexts), arg_arrays) if kvstore and 'async' in kvstore.type and config['update_on_kvstore'] is not None\ and not config['update_on_kvstore']: raise ValueError("Please set update_on_kvstore to true " "when training in async mode.") if config['update_on_kvstore'] is not None: update_on_kvstore = config['update_on_kvstore'] if kvstore: if self._compression_params: kvstore.set_gradient_compression(self._compression_params) self._distributed = 'dist' in kvstore.type if self._distributed: # kv.pull(row_sparse_grad) is not supported for dist kvstore # Captures condition for dist_async, dist_device_sync or based on config for # update_on_kvstore update_on_kvstore = self._contains_sparse_weight or self._contains_sparse_grad \ or 'device' in kvstore.type or 'async' in kvstore.type \ or config['update_on_kvstore'] if update_on_kvstore: # optimizer preferably needs to be set before init for multiprecision kvstore.set_optimizer(self._optimizer) self._kvstore = kvstore self._update_on_kvstore = update_on_kvstore else: self._kvstore = None self._update_on_kvstore = None self._kv_initialized = True @property def learning_rate(self): if not isinstance(self._optimizer, opt.Optimizer): raise UserWarning("Optimizer has to be defined before its learning " "rate can be accessed.") else: return self._optimizer.learning_rate
[docs] def set_learning_rate(self, lr): """Sets a new learning rate of the optimizer. Parameters ---------- lr : float The new learning rate of the optimizer. """ if not isinstance(self._optimizer, opt.Optimizer): raise UserWarning("Optimizer has to be defined before its learning " "rate is mutated.") else: self._optimizer.set_learning_rate(lr)
def _row_sparse_pull(self, parameter, out, row_id, full_idx=False): """Internal method to invoke pull operations on KVStore. If `full_idx` is set to True, `kv.pull` is preferred instead of `kv.row_sparse_pull`. """ # initialize kv and params if not already if not self._kv_initialized: self._init_kvstore() if self._params_to_init: self._init_params() idx = self._param2idx[parameter.name] if full_idx and 'dist' not in self._kvstore.type: assert row_id.size == out.shape[0] self._kvstore.pull(idx, out=out, priority=-idx, ignore_sparse=False) else: self._kvstore.row_sparse_pull(idx, out=out, row_ids=row_id, priority=-idx)
[docs] def step(self, batch_size, ignore_stale_grad=False): """Makes one step of parameter update. Should be called after `autograd.backward()` and outside of `record()` scope. For normal parameter updates, `step()` should be used, which internally calls `allreduce_grads()` and then `update()`. However, if you need to get the reduced gradients to perform certain transformation, such as in gradient clipping, then you may want to manually call `allreduce_grads()` and `update()` separately. Parameters ---------- batch_size : int Batch size of data processed. Gradient will be normalized by `1/batch_size`. Set this to 1 if you normalized loss manually with `loss = mean(loss)`. ignore_stale_grad : bool, optional, default=False If true, ignores Parameters with stale gradient (gradient that has not been updated by `backward` after last step) and skip update. """ rescale_grad = self._scale / batch_size if self._update_on_kvstore and self._distributed and \ self._optimizer.rescale_grad != rescale_grad: raise UserWarning('Possible change in the `batch_size` from previous `step` detected.' \ 'Optimizer gradient normalizing factor will not change w.r.t new batch_size when ' \ 'update_on_kvstore=True and when distributed `kvstore` is used.') self._optimizer.rescale_grad = rescale_grad if not self._kv_initialized: self._init_kvstore() if self._params_to_init: self._init_params() self._allreduce_grads() self._update(ignore_stale_grad)
[docs] def allreduce_grads(self): """For each parameter, reduce the gradients from different contexts. Should be called after `autograd.backward()`, outside of `record()` scope, and before `trainer.update()`. For normal parameter updates, `step()` should be used, which internally calls `allreduce_grads()` and then `update()`. However, if you need to get the reduced gradients to perform certain transformation, such as in gradient clipping, then you may want to manually call `allreduce_grads()` and `update()` separately. """ if not self._kv_initialized: self._init_kvstore() if self._params_to_init: self._init_params() assert not (self._kvstore and self._update_on_kvstore), \ 'allreduce_grads() when parameters are updated on kvstore ' \ 'is not supported. Try setting `update_on_kvstore` ' \ 'to False when creating trainer.' self._allreduce_grads()
def _allreduce_grads(self): if self._kvstore: for i, param in enumerate(self._params): if param.grad_req != 'null': self._kvstore.push(i, param.list_grad(), priority=-i) if not self._update_on_kvstore: self._kvstore.pull(i, param.list_grad(), priority=-i, ignore_sparse=self._distributed)
[docs] def update(self, batch_size, ignore_stale_grad=False): """Makes one step of parameter update. Should be called after `autograd.backward()` and outside of `record()` scope, and after `trainer.update()`. For normal parameter updates, `step()` should be used, which internally calls `allreduce_grads()` and then `update()`. However, if you need to get the reduced gradients to perform certain transformation, such as in gradient clipping, then you may want to manually call `allreduce_grads()` and `update()` separately. Parameters ---------- batch_size : int Batch size of data processed. Gradient will be normalized by `1/batch_size`. Set this to 1 if you normalized loss manually with `loss = mean(loss)`. ignore_stale_grad : bool, optional, default=False If true, ignores Parameters with stale gradient (gradient that has not been updated by `backward` after last step) and skip update. """ if not self._kv_initialized: self._init_kvstore() if self._params_to_init: self._init_params() assert not (self._kvstore and self._update_on_kvstore), \ 'update() when parameters are updated on kvstore ' \ 'is not supported. Try setting `update_on_kvstore` ' \ 'to False when creating trainer.' self._optimizer.rescale_grad = self._scale / batch_size self._update(ignore_stale_grad)
def _update(self, ignore_stale_grad=False): for i, param in enumerate(self._params): if param.grad_req == 'null': continue if not ignore_stale_grad: for data in param._check_and_get(param._data, list): if not data._fresh_grad: raise UserWarning( "Gradient of Parameter `%s` on context %s has not been updated " "by backward since last `step`. This could mean a bug in your " "model that made it only use a subset of the Parameters (Blocks) " "for this iteration. If you are intentionally only using a subset, " "call step with ignore_stale_grad=True to suppress this " "warning and skip updating of Parameters with stale gradient" \ %(param.name, str(data.context))) if self._kvstore and self._update_on_kvstore: if param._stype == 'default': # 'row_sparse' parameters are not pulled immediately - they're pulled # in `Block.forward` self._kvstore.pull(i, param.list_data(), priority=-i) continue for upd, arr, grad in zip(self._updaters, param.list_data(), param.list_grad()): if not ignore_stale_grad or arr._fresh_grad: upd(i, grad, arr) arr._fresh_grad = False
[docs] def save_states(self, fname): """Saves trainer states (e.g. optimizer, momentum) to a file. Parameters ---------- fname : str Path to output states file. """ assert self._optimizer is not None if not self._kv_initialized: self._init_kvstore() if self._params_to_init: self._init_params() if self._update_on_kvstore: assert not self._params_to_init, "Cannot save trainer states when some " \ "parameters are not yet initialized in kvstore." self._kvstore.save_optimizer_states(fname, dump_optimizer=True) else: with open(fname, 'wb') as fout: fout.write(self._updaters[0].get_states(dump_optimizer=True))
[docs] def load_states(self, fname): """Loads trainer states (e.g. optimizer, momentum) from a file. Parameters ---------- fname : str Path to input states file. """ if not self._kv_initialized: self._init_kvstore() if self._params_to_init: self._init_params() if self._update_on_kvstore: self._kvstore.load_optimizer_states(fname) self._optimizer = self._kvstore._updater.optimizer param_dict = {i: param for i, param in enumerate(self._params)} self._optimizer.param_dict = param_dict else: with open(fname, 'rb') as f: states = f.read() for updater in self._updaters: updater.set_states(states) updater.optimizer = self._updaters[0].optimizer self._optimizer = self._updaters[0].optimizer