Source code for mxnet.gluon.contrib.estimator.estimator
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# coding: utf-8
# pylint: disable=wildcard-import, unused-variable
"""Gluon Estimator"""
import copy
import logging
import sys
import warnings
from .event_handler import MetricHandler, ValidationHandler, LoggingHandler, StoppingHandler, GradientUpdateHandler
from .event_handler import TrainBegin, EpochBegin, BatchBegin, BatchEnd, EpochEnd, TrainEnd
from .event_handler import _check_event_handlers
from .utils import _check_metrics, _suggest_metric_for_loss, _check_handler_metric_ref
from ...data import DataLoader
from ...loss import Loss as gluon_loss
from ...trainer import Trainer
from ...utils import split_and_load
from ....context import Context, cpu, gpu, num_gpus
from ....metric import Loss as metric_loss
from .batch_processor import BatchProcessor
__all__ = ['Estimator']
[docs]class Estimator(object):
"""Estimator Class for easy model training
:py:class:`Estimator` can be used to facilitate the training & validation process
Parameters
----------
net : gluon.Block
The model used for training.
loss : gluon.loss.Loss
Loss (objective) function to calculate during training.
train_metrics : EvalMetric or list of EvalMetric
Training metrics for evaluating models on training dataset.
val_metrics : EvalMetric or list of EvalMetric
Validation metrics for evaluating models on validation dataset.
initializer : Initializer
Initializer to initialize the network.
trainer : Trainer
Trainer to apply optimizer on network parameters.
context : Context or list of Context
Device(s) to run the training on.
val_net : gluon.Block
The model used for validation. The validation model does not necessarily belong to
the same model class as the training model. But the two models typically share the
same architecture. Therefore the validation model can reuse parameters of the
training model.
The code example of consruction of val_net sharing the same network parameters as
the training net is given below:
>>> net = _get_train_network()
>>> val_net = _get_test_network(params=net.collect_params())
>>> net.initialize(ctx=ctx)
>>> est = Estimator(net, loss, val_net=val_net)
Proper namespace match is required for weight sharing between two networks. Most networks
inheriting :py:class:`Block` can share their parameters correctly. An exception is
Sequential networks that Block scope must be specified for correct weight sharing. For
the naming in mxnet Gluon API, please refer to the site
(https://mxnet.apache.org/api/python/docs/tutorials/packages/gluon/blocks/naming.html)
for future information.
val_loss : gluon.loss.loss
Loss (objective) function to calculate during validation. If set val_loss
None, it will use the same loss function as self.loss
batch_processor: BatchProcessor
BatchProcessor provides customized fit_batch() and evaluate_batch() methods
"""
logger = None
"""logging.Logger object associated with the Estimator.
The logger is used for all logs generated by this estimator and its
handlers. A new logging.Logger is created during Estimator construction and
configured to write all logs with level logging.INFO or higher to
sys.stdout.
You can modify the logging settings using the standard Python methods. For
example, to save logs to a file in addition to printing them to stdout
output, you can attach a logging.FileHandler to the logger.
>>> est = Estimator(net, loss)
>>> import logging
>>> est.logger.addHandler(logging.FileHandler(filename))
"""
def __init__(self, net,
loss,
train_metrics=None,
val_metrics=None,
initializer=None,
trainer=None,
context=None,
val_net=None,
val_loss=None,
batch_processor=None):
self.net = net
self.loss = self._check_loss(loss)
self._train_metrics = _check_metrics(train_metrics)
self._val_metrics = _check_metrics(val_metrics)
self._add_default_training_metrics()
self._add_validation_metrics()
self.val_loss = self.loss
if val_loss is not None:
self.val_loss = self._check_loss(val_loss)
self.val_net = self.net
if val_net is not None:
self.val_net = val_net
self.logger = logging.Logger(name='Estimator', level=logging.INFO)
self.logger.addHandler(logging.StreamHandler(sys.stdout))
self.context = self._check_context(context)
self._initialize(initializer)
self.trainer = self._check_trainer(trainer)
self.batch_processor = self._check_batch_processor(batch_processor)
def _check_loss(self, loss):
if not isinstance(loss, gluon_loss):
raise ValueError("loss must be a Loss, "
"refer to gluon.loss.Loss:{}".format(loss))
return loss
def _check_context(self, context):
# infer available context
gpus = num_gpus()
available_gpus = [gpu(i) for i in range(gpus)]
if context:
# check context values, only accept Context or a list of Context
if isinstance(context, Context):
context = [context]
elif isinstance(context, list) and all([isinstance(c, Context) for c in context]):
context = context
else:
raise ValueError("context must be a Context or a list of Context, "
"for example mx.cpu() or [mx.gpu(0), mx.gpu(1)], "
"refer to mxnet.Context:{}".format(context))
for ctx in context:
assert ctx in available_gpus or str(ctx).startswith('cpu'), \
"%s is not available, please make sure " \
"your context is in one of: mx.cpu(), %s" % \
(ctx, ", ".join([str(ctx) for ctx in available_gpus]))
else:
# provide default context
if gpus > 0:
# only use 1 GPU by default
if gpus > 1:
warnings.warn("You have multiple GPUs, gpu(0) will be used by default."
"To utilize all your GPUs, specify context as a list of gpus, "
"e.g. context=[mx.gpu(0), mx.gpu(1)] ")
context = [gpu(0)]
else:
context = [cpu()]
return context
def _check_batch_processor(self, batch_processor):
# check whether the batch processor contains fit_batch() and evaluate_batch() methods
if batch_processor is not None:
model_fit = getattr(batch_processor, 'fit_batch', None)
model_evaluate = getattr(batch_processor, 'evaluate_batch', None)
if not callable(model_fit) or not callable(model_evaluate):
raise ValueError('Customized Batch Processor must contain fit_batch()'
' and evaluate_batch() methods')
else:
batch_processor = BatchProcessor()
return batch_processor
def _initialize(self, initializer):
# initialize the network
if not self._is_initialized():
# net is partially or not initialized,
# initialize with user specified initializer
# if initializer is None, default initializer will be used
# do not re-init layers already initialized
if initializer:
self.net.initialize(init=initializer, ctx=self.context)
else:
self.net.initialize(ctx=self.context)
elif initializer:
# net is fully initialized, and user passed not None initializer
# do not force reinitialize, give warning
warnings.warn("Network already fully initialized, skipping initialization. "
"You don't need to pass initializer if you already "
"initialized your net. "
"You can use net.initialize(init=your_initializer, force_reinit=True)"
"to force re-initialize.")
def _check_trainer(self, trainer):
# handle trainer
if not trainer:
warnings.warn("No trainer specified, default SGD optimizer "
"with learning rate 0.001 is used.")
trainer = Trainer(self.net.collect_params(),
'sgd', {'learning_rate': 0.001})
elif not isinstance(trainer, Trainer):
raise ValueError("Trainer must be a Gluon Trainer instance, refer to "
"gluon.Trainer:{}".format(trainer))
return trainer
def _is_initialized(self):
param_dict = self.net.collect_params()
for param in param_dict:
try:
param_dict[param].list_ctx()
except RuntimeError:
return False
return True
def _get_data_and_label(self, batch, ctx, batch_axis=0):
data = batch[0]
label = batch[1]
data = split_and_load(data, ctx_list=ctx, batch_axis=batch_axis)
label = split_and_load(label, ctx_list=ctx, batch_axis=batch_axis)
return data, label
def _add_default_training_metrics(self):
if not self._train_metrics:
suggested_metric = _suggest_metric_for_loss(self.loss)
if suggested_metric:
self._train_metrics = [suggested_metric]
loss_name = self.loss.name.rstrip('1234567890')
self._train_metrics.append(metric_loss(loss_name))
for metric in self._train_metrics:
# add training prefix to the metric name
# it is useful for event handlers to distinguish them from validation metrics
metric.name = 'training ' + metric.name
def _add_validation_metrics(self):
if not self._val_metrics:
self._val_metrics = [copy.deepcopy(metric) for metric in self._train_metrics]
for metric in self._val_metrics:
# add validation prefix to the metric name
# it is useful for event handlers to distinguish them from training metrics
if 'training' in metric.name:
metric.name = metric.name.replace('training', 'validation')
else:
metric.name = 'validation ' + metric.name
@property
def train_metrics(self):
return self._train_metrics
@property
def val_metrics(self):
return self._val_metrics
[docs] def evaluate(self,
val_data,
batch_axis=0,
event_handlers=None):
"""Evaluate model on validation data.
This function calls :py:func:`evaluate_batch` on each of the batches from the
validation data loader. Thus, for custom use cases, it's possible to inherit the
estimator class and override :py:func:`evaluate_batch`.
Parameters
----------
val_data : DataLoader
Validation data loader with data and labels.
batch_axis : int, default 0
Batch axis to split the validation data into devices.
event_handlers : EventHandler or list of EventHandler
List of :py:class:`EventHandlers` to apply during validation. Besides
event handlers specified here, a default MetricHandler and a LoggingHandler
will be added if not specified explicitly.
"""
if not isinstance(val_data, DataLoader):
raise ValueError("Estimator only support input as Gluon DataLoader. Alternatively, you "
"can transform your DataIter or any NDArray into Gluon DataLoader. "
"Refer to gluon.data.DataLoader")
for metric in self.val_metrics:
metric.reset()
estimator_ref = self
event_handlers = self._prepare_default_validation_handlers(event_handlers)
_, epoch_begin, batch_begin, batch_end, \
epoch_end, _ = self._categorize_handlers(event_handlers)
estimator_ref = self
for handler in epoch_begin:
handler.epoch_begin(estimator_ref)
for _, batch in enumerate(val_data):
for handler in batch_begin:
handler.batch_begin(estimator_ref, batch=batch)
_, label, pred, loss = \
self.batch_processor.evaluate_batch(estimator_ref, batch,
batch_axis)
for handler in batch_end:
handler.batch_end(estimator_ref, batch=batch, pred=pred, label=label, loss=loss)
for handler in epoch_end:
handler.epoch_end(estimator_ref)
[docs] def fit(self, train_data,
val_data=None,
epochs=None,
event_handlers=None,
batches=None,
batch_axis=0):
"""Trains the model with a given :py:class:`DataLoader` for a specified
number of epochs or batches. The batch size is inferred from the
data loader's batch_size.
This function calls :py:func:`fit_batch` on each of the batches from the
training data loader. Thus, for custom use cases, it's possible to inherit the
estimator class and override :py:func:`fit_batch`.
Parameters
----------
train_data : DataLoader
Training data loader with data and labels.
val_data : DataLoader, default None
Validation data loader with data and labels.
epochs : int, default None
Number of epochs to iterate on the training data.
You can only specify one and only one type of iteration(epochs or batches).
event_handlers : EventHandler or list of EventHandler
List of :py:class:`EventHandlers` to apply during training. Besides
the event handlers specified here, a StoppingHandler,
LoggingHandler and MetricHandler will be added by default if not
yet specified manually. If validation data is provided, a
ValidationHandler is also added if not already specified.
batches : int, default None
Number of batches to iterate on the training data.
You can only specify one and only one type of iteration(epochs or batches).
batch_axis : int, default 0
Batch axis to split the training data into devices.
"""
if not isinstance(train_data, DataLoader):
raise ValueError("Estimator only support input as Gluon DataLoader. Alternatively, you "
"can transform your DataIter or any NDArray into Gluon DataLoader. "
"Refer to gluon.data.dataloader")
# must specify one and only one of epochs or batches
if (not epochs) == (not batches):
raise ValueError(
"Fit only support exactly one type of iteration, "
"train by number of epochs or number of batches."
"Please specify one and only one of: epochs or batches.")
self.max_epoch = epochs
self.max_batch = batches
self.batch_axis = batch_axis
# provide default handlers
event_handlers = self._prepare_default_handlers(val_data, event_handlers)
train_begin, epoch_begin, batch_begin, \
batch_end, epoch_end, train_end = self._categorize_handlers(event_handlers)
# pass a reference to all event handlers
estimator_ref = self
# training begin
for handler in train_begin:
handler.train_begin(estimator_ref)
while True:
# epoch begin
for handler in epoch_begin:
handler.epoch_begin(estimator_ref)
for i, batch in enumerate(train_data):
# batch begin
for handler in batch_begin:
handler.batch_begin(estimator_ref, batch=batch)
_, label, pred, loss = self.batch_processor.fit_batch(estimator_ref,
batch, batch_axis)
# batch end
batch_end_result = []
for handler in batch_end:
batch_end_result.append(handler.batch_end(estimator_ref, batch=batch,
pred=pred, label=label, loss=loss))
# if any handler signaled to stop
if any(batch_end_result):
break
# epoch end
epoch_end_result = []
for handler in epoch_end:
epoch_end_result.append(handler.epoch_end(estimator_ref))
# if any handler signaled to stop
if any(epoch_end_result):
break
# train end
for handler in train_end:
handler.train_end(estimator_ref)
def _prepare_default_handlers(self, val_data, event_handlers):
event_handlers = _check_event_handlers(event_handlers)
added_default_handlers = []
# no need to add to default handler check as StoppingHandler does not use metrics
added_default_handlers.append(StoppingHandler(self.max_epoch, self.max_batch))
if not any(isinstance(handler, GradientUpdateHandler) for handler in event_handlers):
added_default_handlers.append(GradientUpdateHandler())
if not any(isinstance(handler, MetricHandler) for handler in event_handlers):
added_default_handlers.append(MetricHandler(metrics=self.train_metrics))
if not any(isinstance(handler, ValidationHandler) for handler in event_handlers):
# no validation handler
if val_data:
# add default validation handler if validation data found
added_default_handlers.append(ValidationHandler(val_data=val_data,
eval_fn=self.evaluate))
if not any(isinstance(handler, LoggingHandler) for handler in event_handlers):
added_default_handlers.append(LoggingHandler(metrics=self.train_metrics))
# if there is a mix of user defined event handlers and default event handlers
# they should have the same set of metrics
mixing_handlers = event_handlers and added_default_handlers
event_handlers.extend(added_default_handlers)
if mixing_handlers:
# check if all handlers have the same set of references to metrics
known_metrics = set(self.train_metrics + self.val_metrics)
for handler in event_handlers:
_check_handler_metric_ref(handler, known_metrics)
event_handlers.sort(key=lambda handler: getattr(handler, 'priority', 0))
return event_handlers
def _prepare_default_validation_handlers(self, event_handlers):
event_handlers = _check_event_handlers(event_handlers)
added_default_handlers = []
# add default logging handler and metric handler for validation
if not any(isinstance(handler, MetricHandler) for handler in event_handlers):
added_default_handlers.append(MetricHandler(metrics=self.val_metrics))
if not any(isinstance(handler, LoggingHandler) for handler in event_handlers):
added_default_handlers.append(LoggingHandler(metrics=self.val_metrics))
mixing_handlers = event_handlers and added_default_handlers
event_handlers.extend(added_default_handlers)
# check if all handlers refer to well-defined validation metrics
if mixing_handlers:
known_metrics = set(self.val_metrics)
for handler in event_handlers:
_check_handler_metric_ref(handler, known_metrics)
event_handlers.sort(key=lambda handler: getattr(handler, 'priority', 0))
return event_handlers
def _categorize_handlers(self, event_handlers):
"""
categorize handlers into 6 event lists to avoid calling empty methods
for example, only event handlers with train_begin method
implemented will be called at train begin
"""
train_begin = []
epoch_begin = []
batch_begin = []
batch_end = []
epoch_end = []
train_end = []
for handler in event_handlers:
if isinstance(handler, TrainBegin):
train_begin.append(handler)
if isinstance(handler, EpochBegin):
epoch_begin.append(handler)
if isinstance(handler, BatchBegin):
batch_begin.append(handler)
if isinstance(handler, BatchEnd):
batch_end.append(handler)
if isinstance(handler, EpochEnd):
epoch_end.append(handler)
if isinstance(handler, TrainEnd):
train_end.append(handler)
return train_begin, epoch_begin, batch_begin, batch_end, epoch_end, train_end