Source code for mxnet.gluon.metric
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# coding: utf-8
# pylint: disable=no-member, too-many-lines
"""Online evaluation metric module."""
import math
from collections import OrderedDict
from .. import numpy
from ..util import use_np
from ..base import numeric_types, string_types
from .. import ndarray, npx
from .. import registry
[docs]def check_label_shapes(labels, preds, wrap=False, shape=False):
    """Helper function for checking shape of label and prediction
    Parameters
    ----------
    labels : list of `NDArray`
        The labels of the data.
    preds : list of `NDArray`
        Predicted values.
    wrap : boolean
        If True, wrap labels/preds in a list if they are single NDArray
    shape : boolean
        If True, check the shape of labels and preds;
        Otherwise only check their length.
    """
    if not shape:
        label_shape, pred_shape = len(labels), len(preds)
    else:
        label_shape, pred_shape = labels.shape, preds.shape
    if label_shape != pred_shape:
        raise ValueError("Shape of labels {} does not match shape of "
                         "predictions {}".format(label_shape, pred_shape))
    if wrap:
        if isinstance(labels, ndarray.ndarray.NDArray):
            labels = [labels]
        if isinstance(preds, ndarray.ndarray.NDArray):
            preds = [preds]
    return labels, preds
[docs]class EvalMetric(object):
    """Base class for all evaluation metrics.
    .. note::
        This is a base class that provides common metric interfaces.
        One should not use this class directly, but instead create new metric
        classes that extend it.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    """
    def __init__(self, name, output_names=None,
                 label_names=None, **kwargs):
        self.name = str(name)
        self.output_names = output_names
        self.label_names = label_names
        self._kwargs = kwargs
        self.reset()
    def __str__(self):
        return "EvalMetric: {}".format(dict(self.get_name_value()))
[docs]    def get_config(self):
        """Save configurations of metric. Can be recreated
        from configs with metric.create(``**config``)
        """
        config = self._kwargs.copy()
        config.update({
            'metric': self.__class__.__name__,
            'name': self.name,
            'output_names': self.output_names,
            'label_names': self.label_names})
        return config
[docs]    def update_dict(self, label, pred):
        """Update the internal evaluation with named label and pred
        Parameters
        ----------
        labels : OrderedDict of str -> NDArray
            name to array mapping for labels.
        preds : OrderedDict of str -> NDArray
            name to array mapping of predicted outputs.
        """
        if self.output_names is not None:
            pred = [pred[name] for name in self.output_names]
        else:
            pred = list(pred.values())
        if self.label_names is not None:
            label = [label[name] for name in self.label_names]
        else:
            label = list(label.values())
        self.update(label, pred)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        raise NotImplementedError()
[docs]    def reset(self):
        """Resets the internal evaluation result to initial state."""
        self.num_inst = 0
        self.sum_metric = 0.0
[docs]    def get(self):
        """Gets the current evaluation result.
        Returns
        -------
        names : list of str
           Name of the metrics.
        values : list of float
           Value of the evaluations.
        """
        if self.num_inst == 0:
            return (self.name, float('nan'))
        else:
            res = self.sum_metric / self.num_inst
            if isinstance(res, numpy.ndarray) and len(res.shape) == 0:
                # currently calling ' c = mxnet.numpy.array([1,2,3]).sum() ' would get
                # ' array(6.) ', a ndarray with shape ()
                # In this case, returning a 'float' in .get() is more explicit.
                res = res.item()
            return (self.name, res)
[docs]    def get_name_value(self):
        """Returns zipped name and value pairs.
        Returns
        -------
        list of tuples
            A (name, value) tuple list.
        """
        name, value = self.get()
        if not isinstance(name, list):
            name = [name]
        if not isinstance(value, list):
            value = [value]
        return list(zip(name, value))
# pylint: disable=invalid-name
register = registry.get_register_func(EvalMetric, 'metric')
alias = registry.get_alias_func(EvalMetric, 'metric')
_create = registry.get_create_func(EvalMetric, 'metric')
# pylint: enable=invalid-name
[docs]def create(metric, *args, **kwargs):
    """Creates evaluation metric from metric names or instances of EvalMetric
    or a custom metric function.
    Parameters
    ----------
    metric : str or callable
        Specifies the metric to create.
        This argument must be one of the below:
        - Name of a metric.
        - An instance of `EvalMetric`.
        - A list, each element of which is a metric or a metric name.
        - An evaluation function that computes custom metric for a given batch of
          labels and predictions.
    *args : list
        Additional arguments to metric constructor.
        Only used when metric is str.
    **kwargs : dict
        Additional arguments to metric constructor.
        Only used when metric is str
    Examples
    --------
    >>> def custom_metric(label, pred):
    ...     return np.mean(np.abs(label - pred))
    ...
    >>> metric1 = mx.gluon.metric.create('acc')
    >>> metric2 = mx.gluon.metric.create(custom_metric)
    >>> metric3 = mx.gluon.metric.create([metric1, metric2, 'rmse'])
    """
    if callable(metric):
        return CustomMetric(metric, *args, **kwargs)
    elif isinstance(metric, list):
        composite_metric = CompositeEvalMetric()
        for child_metric in metric:
            composite_metric.add(create(child_metric, *args, **kwargs))
        return composite_metric
    return _create(metric, *args, **kwargs)
[docs]@register
@alias('composite')
class CompositeEvalMetric(EvalMetric):
    """Manages multiple evaluation metrics.
    Parameters
    ----------
    metrics : list of EvalMetric
        List of child metrics.
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0, 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([0, 1, 1])]
    >>> eval_metrics_1 = mx.gluon.metric.Accuracy()
    >>> eval_metrics_2 = mx.gluon.metric.F1()
    >>> eval_metrics = mx.gluon.metric.CompositeEvalMetric()
    >>> for child_metric in [eval_metrics_1, eval_metrics_2]:
    >>>     eval_metrics.add(child_metric)
    >>> eval_metrics.update(labels = labels, preds = predicts)
    >>> eval_metrics.get()
    (['accuracy', 'f1'], [0.6666666666666666, 0.8])
    """
    def __init__(self, metrics=None, name='composite',
                 output_names=None, label_names=None):
        super(CompositeEvalMetric, self).__init__(
            name, output_names=output_names, label_names=label_names)
        if metrics is None:
            metrics = []
        self.metrics = [create(i) for i in metrics]
[docs]    def add(self, metric):
        """Adds a child metric.
        Parameters
        ----------
        metric
            A metric instance.
        """
        self.metrics.append(create(metric))
[docs]    def get_metric(self, index):
        """Returns a child metric.
        Parameters
        ----------
        index : int
            Index of child metric in the list of metrics.
        """
        try:
            return self.metrics[index]
        except IndexError:
            return ValueError("Metric index {} is out of range 0 and {}".format(
                index, len(self.metrics)))
[docs]    def update_dict(self, labels, preds): # pylint: disable=arguments-differ
        if self.label_names is not None:
            labels = OrderedDict([i for i in labels.items()
                                  if i[0] in self.label_names])
        if self.output_names is not None:
            preds = OrderedDict([i for i in preds.items()
                                 if i[0] in self.output_names])
        for metric in self.metrics:
            metric.update_dict(labels, preds)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        for metric in self.metrics:
            metric.update(labels, preds)
[docs]    def reset(self):
        """Resets the internal evaluation result to initial state."""
        try:
            for metric in self.metrics:
                metric.reset()
        except AttributeError:
            pass
[docs]    def get(self):
        """Returns the current evaluation result.
        Returns
        -------
        names : list of str
           Name of the metrics.
        values : list of float
           Value of the evaluations.
        """
        names = []
        values = []
        for metric in self.metrics:
            name, value = metric.get()
            if isinstance(name, string_types):
                name = [name]
            if isinstance(value, numeric_types):
                value = [value]
            names.extend(name)
            values.extend(value)
        return (names, values)
[docs]    def get_config(self):
        config = super(CompositeEvalMetric, self).get_config()
        config.update({'metrics': [i.get_config() for i in self.metrics]})
        return config
########################
# CLASSIFICATION METRICS
########################
[docs]@register
@alias('acc')
@use_np
class Accuracy(EvalMetric):
    """Computes accuracy classification score.
    The accuracy score is defined as
    .. math::
        \\text{accuracy}(y, \\hat{y}) = \\frac{1}{n} \\sum_{i=0}^{n-1}
        \\text{1}(\\hat{y_i} == y_i)
    Parameters
    ----------
    axis : int, default=1
        The axis that represents classes
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0, 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([0, 1, 1])]
    >>> acc = mx.gluon.metric.Accuracy()
    >>> acc.update(preds = predicts, labels = labels)
    >>> acc.get()
    ('accuracy', 0.6666666666666666)
    """
    def __init__(self, axis=1, name='accuracy',
                 output_names=None, label_names=None):
        super(Accuracy, self).__init__(
            name, axis=axis,
            output_names=output_names, label_names=label_names)
        self.axis = axis
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data with class indices as values, one per sample.
        preds : list of `NDArray`
            Prediction values for samples. Each prediction value can either be the class index,
            or a vector of likelihoods for all classes.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred_label in zip(labels, preds):
            pred_label = pred_label.as_np_ndarray().to_device(label.device)
            label = label.as_np_ndarray()
            if pred_label.shape != label.shape:
                pred_label = pred_label.argmax(axis=self.axis)
            pred_label = pred_label.astype('int32')
            label = label.astype('int32')
            # flatten before checking shapes to avoid shape miss match
            label = label.reshape(-1)
            pred_label = pred_label.reshape(-1)
            check_label_shapes(label, pred_label)
            num_correct = (pred_label == label).sum().astype('float64')
            self.sum_metric += num_correct
            self.num_inst += len(pred_label)
[docs]@register
@alias('top_k_accuracy', 'top_k_acc')
@use_np
class TopKAccuracy(EvalMetric):
    """Computes top k predictions accuracy.
    `TopKAccuracy` differs from Accuracy in that it considers the prediction
    to be ``True`` as long as the ground truth label is in the top K
    predicated labels.
    If `top_k` = ``1``, then `TopKAccuracy` is identical to `Accuracy`.
    Parameters
    ----------
    top_k : int
        Whether targets are in top k predictions.
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> np.random.seed(999)
    >>> top_k = 3
    >>> labels = [mx.np.array([2, 6, 9, 2, 3, 4, 7, 8, 9, 6])]
    >>> predicts = [mx.np.array(np.random.rand(10, 10))]
    >>> acc = mx.gluon.metric.TopKAccuracy(top_k=top_k)
    >>> acc.update(labels, predicts)
    >>> acc.get()
    ('top_k_accuracy', 0.3)
    """
    def __init__(self, top_k=1, name='top_k_accuracy',
                 output_names=None, label_names=None):
        super(TopKAccuracy, self).__init__(
            name, top_k=top_k,
            output_names=output_names, label_names=label_names)
        self.top_k = top_k
        assert(self.top_k > 1), 'Please use Accuracy if top_k is no more than 1'
        self.name += f'_{self.top_k}'
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred_label in zip(labels, preds):
            assert(len(pred_label.shape) <= 2), 'Predictions should be no more than 2 dims'
            # Using argpartition here instead of argsort is safe because
            # we do not care about the order of top k elements. It is
            # much faster, which is important since that computation is
            # single-threaded due to Python GIL.
            pred_label = pred_label.as_np_ndarray().to_device(label.device).astype('float32')
            pred_label = numpy.argpartition(pred_label, -self.top_k).to_device(label.device)
            label = label.as_np_ndarray().astype('int32')
            check_label_shapes(label, pred_label)
            num_samples = pred_label.shape[0]
            num_dims = len(pred_label.shape)
            if num_dims == 1:
                num_correct = (pred_label.reshape(-1) == label.reshape(-1)).sum()
                self.sum_metric += num_correct.astype('float64')
            elif num_dims == 2:
                num_classes = pred_label.shape[1]
                top_k = min(num_classes, self.top_k)
                for j in range(top_k):
                    num_correct = (pred_label[:, num_classes - 1 - j].reshape(-1) == label.reshape(-1)).sum()
                    self.sum_metric += num_correct.astype('float64')
            self.num_inst += num_samples
[docs]def predict_with_threshold(pred, threshold=0.5):
    """Do thresholding of predictions in binary and multilabel cases.
    Parameters
    ----------
    preds : ndarray
        predictions in shape of (batch_size, ...) or (batch_size, ..., num_categories)
    preds : float or ndarray
        threshold(s) in shape of float or (num_categories)
    """
    if isinstance(threshold, float):
        return pred > threshold
    elif isinstance(threshold, (numpy.ndarray, ndarray.ndarray.NDArray)):
        num_classes = pred.shape[-1]
        assert threshold.shape[-1] == num_classes, \
                f"shape mismatch: {pred.shape[-1]} vs. {threshold.shape[-1]}"
        return pred > threshold
    else:
        raise ValueError("{} is a wrong type for threshold!".format(type(threshold)))
def one_hot(idx, num):
    return (numpy.arange(num).astype(idx) == idx[:, None]).astype('int32')
@use_np
class _ClassificationMetrics(object):
    """Private container class for classification metric statistics.
    True/false positive and true/false negative counts are sufficient statistics for various classification metrics.
    This class provides the machinery to track those statistics across mini-batches of
    (label, prediction) pairs.
    Parameters
    ----------
    class_type : str, default "binary"
        "binary": f1 for binary classification.
        "multiclass": f1 for multiclassification problem.
        "multilabel": f1 for multilabel classification.
    beta : float, default 1
        weight of precision in harmonic mean.
    threshold : float, default 0.5
        threshold for deciding whether the predictions are positive or negative.
    """
    def __init__(self, class_type="binary", threshold=0.5, beta=1):
        self.class_type = class_type
        self.threshold = threshold
        self.beta = beta
        self.reset_stats()
    def _set(self, num, device):
        if self.num_classes is None:
            self.num_classes = num
            self.true_positives = numpy.zeros(num, dtype='float64').to_device(device)
            self.false_negatives = numpy.zeros(num, dtype='float64').to_device(device)
            self.false_positives = numpy.zeros(num, dtype='float64').to_device(device)
            self.true_negatives = numpy.zeros(num, dtype='float64').to_device(device)
        else:
            assert self.num_classes == num, \
                "Input number of classes has changed from {} to {}".format(self.num_classes, num)
    def update_stats(self, label, pred):
        """Update various binary classification counts for a single (label, pred) pair.
        Parameters
        ----------
        label : `NDArray`
            The labels of the data.
        pred : `NDArray`
            Predicted values.
        """
        pred = pred.as_np_ndarray().to_device(label.device)
        label = label.as_np_ndarray().astype('int32')
        if self.class_type == "binary":
            self._set(1, label.device)
            if label.max() > 1:
                raise ValueError("Wrong label for binary classification.")
            if pred.shape == label.shape:
                pass
            elif pred.shape[-1] > 2:
                raise ValueError("The shape of prediction {} is wrong for binary classification.".format(pred.shape))
            elif pred.shape[-1] == 2:
                pred = pred.reshape(-1, 2)[:, 1]
            pred_label = predict_with_threshold(pred, self.threshold).reshape(-1)
            label = label.reshape(-1)
        elif self.class_type == "multiclass":
            num = pred.shape[-1]
            self._set(num, label.device)
            assert label.max() < num, "pred contains fewer classes than label!"
            pred_label = one_hot(pred.argmax(axis=-1).reshape(-1), num)
            label = one_hot(label.reshape(-1), num)
        elif self.class_type == "multilabel":
            num = pred.shape[-1]
            self._set(num, label.device)
            assert pred.shape == label.shape, \
                "The shape of label should be same as that of prediction for multilabel classification."
            pred_label = predict_with_threshold(pred, self.threshold).reshape(-1, num)
            label = label.reshape(-1, num)
        else:
            raise ValueError(
                "Wrong class_type {}! Only supports ['binary', 'multiclass', 'multilabel']".format(self.class_type))
        check_label_shapes(label, pred_label)
        pred_true = (pred_label == 1)
        pred_false = (pred_label == 0)
        label_true = (label == 1)
        label_false = (label == 0)
        true_pos = (pred_true * label_true).sum(0)
        false_pos = (pred_true * label_false).sum(0)
        false_neg = (pred_false * label_true).sum(0)
        true_neg = (pred_false * label_false).sum(0)
        self.true_positives += true_pos
        self.false_positives += false_pos
        self.false_negatives += false_neg
        self.true_negatives += true_neg
    @property
    def precision(self):
        if self.num_classes is not None:
            return self.true_positives / numpy.maximum(self.true_positives + self.false_positives, 1e-12)
        else:
            return 0.
    @property
    def micro_precision(self):
        if self.num_classes is not None:
            return self.true_positives.sum() / \
                numpy.maximum(self.true_positives.sum() + self.false_positives.sum(), 1e-12)
        else:
            return 0.
    @property
    def recall(self):
        if self.num_classes is not None:
            return self.true_positives / numpy.maximum(self.true_positives + self.false_negatives, 1e-12)
        else:
            return 0.
    @property
    def micro_recall(self):
        if self.num_classes is not None:
            return self.true_positives.sum() / \
                numpy.maximum(self.true_positives.sum() + self.false_negatives.sum(), 1e-12)
        else:
            return 0.
    @property
    def fscore(self):
        return (1 + self.beta ** 2) * self.precision * self.recall / \
            numpy.maximum(self.beta ** 2 * self.precision + self.recall, 1e-12)
    @property
    def micro_fscore(self):
        if self.micro_precision + self.micro_recall > 0:
            return (1 + self.beta ** 2) * self.micro_precision * self.micro_recall / \
                (self.beta ** 2 * self.micro_precision + self.micro_recall)
        else:
            return 0.
    def binary_matthewscc(self):
        """Calculate the Matthew's Correlation Coefficent"""
        if not self.total_examples:
            return 0.
        true_pos = float(self.true_positives)
        false_pos = float(self.false_positives)
        false_neg = float(self.false_negatives)
        true_neg = float(self.true_negatives)
        terms = [(true_pos + false_pos),
                 (true_pos + false_neg),
                 (true_neg + false_pos),
                 (true_neg + false_neg)]
        denom = 1.
        for t in filter(lambda t: t != 0., terms):
            denom *= t
        return ((true_pos * true_neg) - (false_pos * false_neg)) / math.sqrt(denom)
    @property
    def total_examples(self):
        if self.num_classes is None:
            return 0
        return int(self.false_negatives[0] + self.false_positives[0] + \
               self.true_negatives[0] + self.true_positives[0])
    def reset_stats(self):
        self.num_classes = None
        self.true_positives = None
        self.false_negatives = None
        self.false_positives = None
        self.true_negatives = None
[docs]@register
@use_np
class F1(EvalMetric):
    """Computes the F1 score of a binary classification problem.
    The F1 score is equivalent to harmonic mean of the precision and recall,
    where the best value is 1.0 and the worst value is 0.0. The formula for F1 score is::
        F1 = 2 * (precision * recall) / (precision + recall)
    The formula for precision and recall is::
        precision = true_positives / (true_positives + false_positives)
        recall    = true_positives / (true_positives + false_negatives)
    .. note::
        This F1 score only supports binary classification.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    class_type : str, default "binary"
        "binary": f1 for binary classification.
        "multiclass": f1 for multiclassification problem.
        "multilabel": f1 for multilabel classification.
    threshold : float, default 0.5
        threshold for postive confidence value.
    average : str, default 'micro'
        Strategy to be used for aggregating across mini-batches.
            "macro": Calculate metrics for each label and return unweighted mean of f1.
            "micro": Calculate metrics globally by counting the total TP, FN and FP.
            None: Return f1 scores for each class (numpy.ndarray) .
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0., 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([0., 1., 1.])]
    >>> f1 = mx.gluon.metric.F1()
    >>> f1.update(preds = predicts, labels = labels)
    >>> f1.get()
    ('f1', 0.8)
    """
    def __init__(self, name='f1',
                 output_names=None, label_names=None, class_type="binary", threshold=0.5, average="micro"):
        self.average = average
        self.metrics = _ClassificationMetrics(class_type=class_type, threshold=threshold)
        EvalMetric.__init__(self, name=name,
                            output_names=output_names, label_names=label_names)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            self.metrics.update_stats(label, pred)
        if self.average == "micro":
            self.sum_metric = self.metrics.micro_fscore * self.metrics.total_examples
        elif self.average == "macro":
            self.sum_metric = self.metrics.fscore.mean() * self.metrics.total_examples
        else:
            self.sum_metric = self.metrics.fscore * self.metrics.total_examples
        self.num_inst = self.metrics.total_examples
[docs]    def reset(self):
        """Resets the internal evaluation result to initial state."""
        self.sum_metric = 0.
        self.num_inst = 0
        self.metrics.reset_stats()
[docs]@register
@use_np
class Fbeta(F1):
    """Computes the Fbeta score of a binary classification problem.
    The Fbeta score is equivalent to harmonic mean of the precision and recall,
    where the best value is 1.0 and the worst value is 0.0. The formula for Fbeta score is::
        Fbeta = (1 + beta ** 2) * (precision * recall) / (beta ** 2 * precision + recall)
    The formula for precision and recall is::
        precision = true_positives / (true_positives + false_positives)
        recall    = true_positives / (true_positives + false_negatives)
    .. note::
        This Fbeta score only supports binary classification.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    class_type : str, default "binary"
        "binary": f1 for binary classification.
        "multiclass": f1 for multiclassification problem.
        "multilabel": f1 for multilabel classification.
    beta : float, default 1
        weight of precision in harmonic mean.
    threshold : float, default 0.5
        threshold for postive confidence value.
    average : str, default 'micro'
        Strategy to be used for aggregating across mini-batches.
            "macro": Calculate metrics for each label and return unweighted mean of f1.
            "micro": Calculate metrics globally by counting the total TP, FN and FP.
            None: Return f1 scores for each class.
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0., 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([0., 1., 1.])]
    >>> fbeta = mx.gluon.metric.Fbeta(beta=2)
    >>> fbeta.update(preds = predicts, labels = labels)
    >>> fbeta.get()
    ('fbeta', 0.9090909090909091)
    """
    def __init__(self, name='fbeta',
                 output_names=None, label_names=None, class_type="binary", beta=1, threshold=0.5, average="micro"):
        super(Fbeta, self).__init__(
            name=name, output_names=output_names, label_names=label_names,
            class_type=class_type, threshold=threshold, average=average)
        self.metrics = _ClassificationMetrics(class_type=class_type, threshold=threshold, beta=beta)
[docs]@register
@use_np
class BinaryAccuracy(EvalMetric):
    """Computes the accuracy of a binary or multilabel classification problem.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    threshold : float or ndarray, default 0.5
        threshold for deciding whether the predictions are positive or negative.
    Examples
    --------
    >>> predicts = [mx.np.array([0.7, 1, 0.55])]
    >>> labels   = [mx.np.array([0., 1., 0.])]
    >>> bacc = mx.gluon.metric.BinaryAccuracy(threshold=0.6)
    >>> bacc.update(preds = predicts, labels = labels)
    >>> bacc.get()
    ('binary_accuracy', 0.6666666666666666)
    """
    def __init__(self, name='binary_accuracy',
                 output_names=None, label_names=None, threshold=0.5):
        self.threshold = threshold
        EvalMetric.__init__(self, name=name,
                            output_names=output_names, label_names=label_names)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            Each label denotes positive/negative for each class.
        preds : list of `NDArray`
            Each prediction value is a confidence value of being positive for each class.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred_label in zip(labels, preds):
            pred_label = predict_with_threshold(pred_label, self.threshold)
            pred_label = pred_label.as_np_ndarray().astype('int32').to_device(label.device)
            label = label.as_np_ndarray().astype('int32')
            # flatten before checking shapes to avoid shape miss match
            label = label.reshape(-1)
            pred_label = pred_label.reshape(-1)
            check_label_shapes(label, pred_label)
            num_correct = (pred_label == label).sum().astype('float64')
            self.sum_metric += num_correct
            self.num_inst += len(pred_label)
[docs]@register
@use_np
class MCC(EvalMetric):
    """Computes the Matthews Correlation Coefficient of a binary classification problem.
    While slower to compute than F1 the MCC can give insight that F1 or Accuracy cannot.
    For instance, if the network always predicts the same result
    then the MCC will immeadiately show this. The MCC is also symetric with respect
    to positive and negative categorization, however, there needs to be both
    positive and negative examples in the labels or it will always return 0.
    MCC of 0 is uncorrelated, 1 is completely correlated, and -1 is negatively correlated.
    .. math::
        \\text{MCC} = \\frac{ TP \\times TN - FP \\times FN }
        {\\sqrt{ (TP + FP) ( TP + FN ) ( TN + FP ) ( TN + FN ) } }
    where 0 terms in the denominator are replaced by 1.
    .. note::
        This version of MCC only supports binary classification.  See PCC.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> # In this example the network almost always predicts positive
    >>> false_positives = 1000
    >>> false_negatives = 1
    >>> true_positives = 10000
    >>> true_negatives = 1
    >>> predicts = [mx.np.array(
        [[.3, .7]]*false_positives +
        [[.7, .3]]*true_negatives +
        [[.7, .3]]*false_negatives +
        [[.3, .7]]*true_positives
    )]
    >>> labels  = [mx.np.array(
        [0.]*(false_positives + true_negatives) +
        [1.]*(false_negatives + true_positives)
    )]
    >>> f1 = mx.gluon.metric.F1()
    >>> f1.update(preds = predicts, labels = labels)
    >>> mcc = mx.gluon.metric.MCC()
    >>> mcc.update(preds = predicts, labels = labels)
    >>> f1.get()
    ('f1', 0.95233560306652054)
    >>> mcc.get()
    ('mcc', 0.01917751877733392)
    """
    def __init__(self, name='mcc',
                 output_names=None, label_names=None):
        self._metrics = _ClassificationMetrics()
        EvalMetric.__init__(self, name=name,
                            output_names=output_names, label_names=label_names)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            self._metrics.update_stats(label, pred)
        self.sum_metric = self._metrics.binary_matthewscc() * self._metrics.total_examples
        self.num_inst = self._metrics.total_examples
[docs]    def reset(self):
        """Resets the internal evaluation result to initial state."""
        self.sum_metric = 0.
        self.num_inst = 0.
        self._metrics.reset_stats()
####################
# REGRESSION METRICS
####################
[docs]@register
@use_np
class MAE(EvalMetric):
    """Computes Mean Absolute Error (MAE) loss.
    The mean absolute error is given by
    .. math::
        \\frac{\\sum_i^n |y_i - \\hat{y}_i|}{n}
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([3, -0.5, 2, 7])]
    >>> labels = [mx.np.array([2.5, 0.0, 2, 8])]
    >>> mean_absolute_error = mx.gluon.metric.MAE()
    >>> mean_absolute_error.update(labels = labels, preds = predicts)
    >>> mean_absolute_error.get()
    ('mae', 0.5)
    """
    def __init__(self, name='mae',
                 output_names=None, label_names=None):
        super(MAE, self).__init__(
            name, output_names=output_names, label_names=label_names)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            label = label.as_np_ndarray()
            pred = pred.as_np_ndarray().to_device(label.device)
            num_inst = label.shape[0]
            mae = numpy.abs(label - pred).reshape(num_inst, -1).mean(axis=-1).sum()
            self.sum_metric += mae
            self.num_inst += num_inst
[docs]@register
@use_np
class MSE(EvalMetric):
    """Computes Mean Squared Error (MSE) loss.
    The mean squared error is given by
    .. math::
        \\frac{\\sum_i^n (y_i - \\hat{y}_i)^2}{n}
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([3, -0.5, 2, 7])]
    >>> labels = [mx.np.array([2.5, 0.0, 2, 8])]
    >>> mean_squared_error = mx.gluon.metric.MSE()
    >>> mean_squared_error.update(labels = labels, preds = predicts)
    >>> mean_squared_error.get()
    ('mse', 0.375)
    """
    def __init__(self, name='mse',
                 output_names=None, label_names=None):
        super(MSE, self).__init__(
            name, output_names=output_names, label_names=label_names)
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            label = label.as_np_ndarray()
            pred = pred.as_np_ndarray().to_device(label.device)
            num_inst = label.shape[0]
            mse = ((label - pred)**2.0).reshape(num_inst, -1).mean(axis=-1).sum()
            self.sum_metric += mse
            self.num_inst += num_inst
[docs]@register
@use_np
class RMSE(MSE):
    """Computes Root Mean Squred Error (RMSE) loss.
    The root mean squared error is given by
    .. math::
        \\sqrt{\\frac{\\sum_i^n (y_i - \\hat{y}_i)^2}{n}}
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([3, -0.5, 2, 7])]
    >>> labels = [mx.np.array([2.5, 0.0, 2, 8])]
    >>> root_mean_squared_error = mx.gluon.metric.RMSE()
    >>> root_mean_squared_error.update(labels = labels, preds = predicts)
    >>> root_mean_squared_error.get()
    ('rmse', 0.612372457981)
    """
    def __init__(self, name='rmse',
                 output_names=None, label_names=None):
        super(RMSE, self).__init__(
            name, output_names=output_names, label_names=label_names)
[docs]    def get(self):
        if self.num_inst == 0:
            return (self.name, float('nan'))
        else:
            return (self.name, math.sqrt(self.sum_metric / self.num_inst))
[docs]@register
@use_np
class MeanPairwiseDistance(EvalMetric):
    """Computes Mean Pairwise Distance.
    The mean pairwise distance is given by
    .. math::
        \\sqrt{\\frac{(\\sum_i^n (y_i - \\hat{y}_i)^p)^\\frac{1}{p}}{n}}
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    p : float, default 2
        calculating distance using the p-norm
    Examples
    --------
    >>> predicts = [mx.np.array([[1., 2.], [3., 4.]])]
    >>> labels = [mx.np.array([[1., 0.], [4., 2.]])]
    >>> mpd = mx.gluon.metric.MeanPairwiseDistance()
    >>> mpd.update(labels = labels, preds = predicts)
    >>> mpd.get()
    ('mpd', 2.1180338859558105)
    """
    def __init__(self, name='mpd',
                 output_names=None, label_names=None, p=2):
        super(MeanPairwiseDistance, self).__init__(
            name, output_names=output_names, label_names=label_names)
        self.p = p
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            label = label.as_np_ndarray()
            pred = pred.as_np_ndarray().to_device(label.device)
            label = label.reshape(label.shape[0], -1)
            pred = pred.reshape(pred.shape[0], -1)
            dis = (((label - pred) ** self.p).sum(axis=-1)) ** (1./self.p)
            dis = dis.sum()
            num_inst = label.shape[0]
            self.sum_metric += dis
            self.num_inst += num_inst
[docs]@register
@use_np
class MeanCosineSimilarity(EvalMetric):
    r"""Computes Mean Cosine Similarity.
    The mean cosine similarity is given by
    .. math::
        cos_sim(label, pred) = \frac{{label}.{pred}}{max(||label||.||pred||, eps)}
    Calculation happens on the last dimension of label and pred.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    eps : float, default 1e-8
        small vale to avoid division by zero.
    Examples
    --------
    >>> predicts = [mx.np.array([[1., 0.], [1., 1.]])]
    >>> labels = [mx.np.array([[3., 4.], [2., 2.]])]
    >>> mcs = mx.gluon.metric.MeanCosineSimilarity()
    >>> mcs.update(labels = labels, preds = predicts)
    >>> mcs.get()
    ('cos_sim', 0.8)
    """
    def __init__(self, name='cos_sim',
                 output_names=None, label_names=None, eps=1e-8):
        super(MeanCosineSimilarity, self).__init__(
            name, output_names=output_names, label_names=label_names)
        self.eps = eps
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            label = label.as_np_ndarray()
            pred = pred.as_np_ndarray().to_device(label.device)
            if len(label.shape) == 1:
                label = label.reshape(1, label.shape[0])
            if len(pred.shape) == 1:
                pred = pred.reshape(1, pred.shape[0])
            sim = (label * pred).sum(axis=-1)
            n_p = numpy.linalg.norm(pred, axis=-1)
            n_l = numpy.linalg.norm(label, axis=-1)
            sim = sim / numpy.maximum(n_l * n_p, self.eps)
            sim = sim.sum()
            num_inst = len(label.reshape(-1, label.shape[-1])) # numpy.prod(label.shape[:-1]) is not supported
            self.sum_metric += sim
            self.num_inst += num_inst
[docs]@register
@alias('ce')
@use_np
class CrossEntropy(EvalMetric):
    """Computes Cross Entropy loss.
    The cross entropy over a batch of sample size :math:`N` is given by
    .. math::
       -\\sum_{n=1}^{N}\\sum_{k=1}^{K}t_{nk}\\log (y_{nk}),
    where :math:`t_{nk}=1` if and only if sample :math:`n` belongs to class :math:`k`.
    :math:`y_{nk}` denotes the probability of sample :math:`n` belonging to
    class :math:`k`.
    Parameters
    ----------
    eps : float, default 1e-12
        Use small constant for the case that predicted value is 0.
    ignore_label : int or None, default None
        Index of invalid label to ignore when
        counting. By default, sets to -1.
        If set to `None`, it will include all entries.
    axis : int, default -1
        The axis from prediction that was used to
        compute softmax. By default use the last axis.
    from_logits : boolean, default False
        Whether `pred` is expected to be a logits tensor.
        By default, we assume that `pred` encodes a probability distribution.
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0, 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([0, 1, 1])]
    >>> ce = mx.gluon.metric.CrossEntropy()
    >>> ce.update(labels, predicts)
    >>> ce.get()
    ('cross-entropy', 0.57159948348999023)
    """
    def __init__(self, eps=1e-12, ignore_label=None, axis=-1, from_logits=False,
                 name='cross-entropy', output_names=None, label_names=None):
        super(CrossEntropy, self).__init__(
            name, output_names=output_names, label_names=label_names)
        self.ignore_label = ignore_label
        self.axis = axis
        self.from_logits = from_logits
        self.eps = eps
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        loss = 0.
        num = 0
        for label, pred in zip(labels, preds):
            assert label.size == pred.size/pred.shape[-1], \
                f"shape mismatch: {label.shape} vs. {pred.shape}"
            label = label.reshape((label.size,))
            if self.from_logits:
                pred = npx.softmax(pred, axis=self.axis)
            pred = npx.pick(pred.to_device(label.device), label.astype(dtype='int32'), axis=self.axis)
            if self.ignore_label is not None:
                ignore = (label == self.ignore_label).astype(pred.dtype)
                num -= ignore.sum()
                pred = pred * (1 - ignore) + ignore
            loss -= numpy.log(numpy.maximum(self.eps, pred)).sum()
            num += pred.size
        self.sum_metric += loss
        self.num_inst += num
[docs]@register
@use_np
class Perplexity(CrossEntropy):
    """Computes perplexity.
    Perplexity is a measurement of how well a probability distribution
    or model predicts a sample. A low perplexity indicates the model
    is good at predicting the sample.
    The perplexity of a model q is defined as
    .. math::
        b^{\\big(-\\frac{1}{N} \\sum_{i=1}^N \\log_b q(x_i) \\big)}
        = \\exp \\big(-\\frac{1}{N} \\sum_{i=1}^N \\log q(x_i)\\big)
    where we let `b = e`.
    :math:`q(x_i)` is the predicted value of its ground truth
    label on sample :math:`x_i`.
    For example, we have three samples :math:`x_1, x_2, x_3` and their labels
    are :math:`[0, 1, 1]`.
    Suppose our model predicts :math:`q(x_1) = p(y_1 = 0 | x_1) = 0.3`
    and :math:`q(x_2) = 1.0`,
    :math:`q(x_3) = 0.6`. The perplexity of model q is
    :math:`exp\\big(-(\\log 0.3 + \\log 1.0 + \\log 0.6) / 3\\big) = 1.77109762852`.
    Parameters
    ----------
    eps : float, default 1e-12
        Use small constant for the case that predicted value is 0.
    ignore_label : int or None, default None
        Index of invalid label to ignore when
        counting. By default, sets to -1.
        If set to `None`, it will include all entries.
    axis : int (default -1)
        The axis from prediction that was used to
        compute softmax. By default use the last axis.
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0, 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([0, 1, 1])]
    >>> perp = mx.gluon.metric.Perplexity(ignore_label=None)
    >>> perp.update(labels, predicts)
    >>> perp.get()
    ('Perplexity', 1.7710976285155853)
    """
    def __init__(self, eps=1e-12, ignore_label=None, axis=-1, from_logits=False,
                 name='perplexity', output_names=None, label_names=None):
        super(Perplexity, self).__init__(
            eps=eps, ignore_label=ignore_label, axis=axis, from_logits=from_logits,
            name=name, output_names=output_names, label_names=label_names)
[docs]    def get(self):
        if self.num_inst == 0:
            return (self.name, float('nan'))
        else:
            return (self.name, math.exp(self.sum_metric/self.num_inst))
[docs]@register
@alias('pearsonr')
@use_np
class PearsonCorrelation(EvalMetric):
    """Computes Pearson correlation.
    The pearson correlation is given by
    .. math::
        \\frac{cov(y, \\hat{y})}{\\sigma{y}\\sigma{\\hat{y}}}
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array([[0.3, 0.7], [0, 1.], [0.4, 0.6]])]
    >>> labels   = [mx.np.array([[1, 0], [0, 1], [0, 1]])]
    >>> pr = mx.gluon.metric.PearsonCorrelation()
    >>> pr.update(labels, predicts)
    >>> pr.get()
    ('pearsonr', 0.42163704544016178)
    """
    def __init__(self, name='pearsonr',
                 output_names=None, label_names=None):
        super(PearsonCorrelation, self).__init__(
            name, output_names=output_names, label_names=label_names)
        self.reset()
[docs]    def reset(self):
        self._sse_p = 0
        self._mean_p = 0
        self._sse_l = 0
        self._mean_l = 0
        self._pred_nums = 0
        self._label_nums = 0
        self._conv = 0
        self.num_inst = 0
        self.sum_metric = 0.0
    def update_variance(self, new_values, *aggregate):
        #Welford's online algorithm for variance update
        count, mean, m_2 = aggregate
        count += len(new_values)
        delta = new_values - mean
        mean += numpy.sum(delta / count)
        delta_2 = new_values - mean
        m_2 += numpy.sum(delta * delta_2)
        return count, mean, m_2
    def update_cov(self, label, pred):
        self._conv = self._conv + numpy.sum((label - self._mean_l) * (pred - self._mean_p))
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        for label, pred in zip(labels, preds):
            check_label_shapes(label, pred, False, True)
            label = label.as_np_ndarray().reshape(-1).astype(numpy.float64)
            pred = pred.as_np_ndarray().to_device(label.device).reshape(-1).astype(numpy.float64)
            self.num_inst += 1
            self._label_nums, self._mean_l, self._sse_l = \
                self.update_variance(label, self._label_nums, self._mean_l, self._sse_l)
            self.update_cov(label, pred)
            self._pred_nums, self._mean_p, self._sse_p = \
                self.update_variance(pred, self._pred_nums, self._mean_p, self._sse_p)
[docs]    def get(self):
        if self.num_inst == 0:
            return (self.name, float('nan'))
        n = self._label_nums
        pearsonr = self._conv / ((n-1) * numpy.sqrt(self._sse_p / (n - 1)) * numpy.sqrt(self._sse_l / (n - 1)))
        return (self.name, float(pearsonr))
[docs]@register
@use_np
class PCC(EvalMetric):
    """PCC is a multiclass equivalent for the Matthews correlation coefficient derived
    from a discrete solution to the Pearson correlation coefficient.
    .. math::
        \\text{PCC} = \\frac {\\sum _{k}\\sum _{l}\\sum _{m}C_{kk}C_{lm}-C_{kl}C_{mk}}
        {{\\sqrt {\\sum _{k}(\\sum _{l}C_{kl})(\\sum _{k'|k'\\neq k}\\sum _{l'}C_{k'l'})}}
         {\\sqrt {\\sum _{k}(\\sum _{l}C_{lk})(\\sum _{k'|k'\\neq k}\\sum _{l'}C_{l'k'})}}}
    defined in terms of a K x K confusion matrix C.
    When there are more than two labels the PCC will no longer range between -1 and +1.
    Instead the minimum value will be between -1 and 0 depending on the true distribution.
    The maximum value is always +1.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> # In this example the network almost always predicts positive
    >>> false_positives = 1000
    >>> false_negatives = 1
    >>> true_positives = 10000
    >>> true_negatives = 1
    >>> predicts = [mx.np.array(
        [[.3, .7]]*false_positives +
        [[.7, .3]]*true_negatives +
        [[.7, .3]]*false_negatives +
        [[.3, .7]]*true_positives
    )]
    >>> labels  = [mx.np.array(
        [0]*(false_positives + true_negatives) +
        [1]*(false_negatives + true_positives)
    )]
    >>> f1 = mx.gluon.metric.F1()
    >>> f1.update(preds = predicts, labels = labels)
    >>> pcc = mx.gluon.metric.PCC()
    >>> pcc.update(preds = predicts, labels = labels)
    >>> f1.get()
    ('f1', 0.95233560306652054)
    >>> pcc.get()
    ('pcc', 0.01917751877733392)
    """
    def __init__(self, name='pcc',
                 output_names=None, label_names=None):
        self.k = 2
        super(PCC, self).__init__(
            name=name, output_names=output_names, label_names=label_names)
    def _grow(self, inc):
        self.lcm = numpy.pad(
            self.lcm, ((0, inc), (0, inc)), 'constant', constant_values=(0))
        self.k += inc
    def _calc_mcc(self, cmat):
        n = cmat.sum()
        x = cmat.sum(axis=1)
        y = cmat.sum(axis=0)
        cov_xx = numpy.sum(x * (n - x))
        cov_yy = numpy.sum(y * (n - y))
        if cov_xx == 0 or cov_yy == 0:
            return float('nan')
        # i = cmat.diagonal() # mxnet.numpy.ndarray.diagonal() is currently not available.
        i = cmat[numpy.arange(self.k), numpy.arange(self.k)]
        cov_xy = numpy.sum(i * n - x * y)
        return cov_xy / (cov_xx * cov_yy) ** 0.5
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        labels, preds = check_label_shapes(labels, preds, True)
        # update the confusion matrix
        for label, pred in zip(labels, preds):
            label = label.astype('int32', copy=False).as_np_ndarray()
            pred = pred.as_np_ndarray().to_device(label.device)
            if pred.shape != label.shape:
                pred = pred.argmax(axis=1).astype(label, copy=False)
            else:
                pred = pred.astype('int32', copy=False)
            n = int(max(pred.max(), label.max()))
            if n >= self.k:
                self._grow(n + 1 - self.k)
            bcm = numpy.zeros((self.k, self.k), dtype='float64')
            for i, j in zip(pred, label):
                bcm[i, j] += 1
            self.lcm += bcm
        self.num_inst += 1
    @property
    def sum_metric(self):
        return self._calc_mcc(self.lcm) * self.num_inst
[docs]    def reset(self):
        """Resets the internal evaluation result to initial state."""
        self.num_inst = 0.
        self.lcm = numpy.zeros((self.k, self.k), dtype='float64')
[docs]@register
@use_np
class Loss(EvalMetric):
    """Dummy metric for directly printing loss.
    Parameters
    ----------
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    """
    def __init__(self, name='loss',
                 output_names=None, label_names=None):
        super(Loss, self).__init__(
            name, output_names=output_names, label_names=label_names)
[docs]    def update(self, _, preds):
        if isinstance(preds, ndarray.ndarray.NDArray):
            preds = [preds]
        for pred in preds:
            loss = pred.sum().item()
            self.sum_metric += loss
            self.num_inst += pred.size
[docs]@register
class Torch(Loss):
    """Dummy metric for torch criterions."""
    def __init__(self, name='torch',
                 output_names=None, label_names=None):
        super(Torch, self).__init__(
            name, output_names=output_names, label_names=label_names)
[docs]@register
@use_np
class CustomMetric(EvalMetric):
    """Computes a customized evaluation metric.
    The `feval` function can return a `tuple` of (sum_metric, num_inst) or return
    an `int` sum_metric.
    Parameters
    ----------
    feval : callable(label, pred)
        Customized evaluation function.
    name : str, optional
        The name of the metric. (the default is None).
    allow_extra_outputs : bool, optional
        If true, the prediction outputs can have extra outputs.
        This is useful in RNN, where the states are also produced
        in outputs for forwarding. (the default is False).
    name : str
        Name of this metric instance for display.
    output_names : list of str, or None
        Name of predictions that should be used when updating with update_dict.
        By default include all predictions.
    label_names : list of str, or None
        Name of labels that should be used when updating with update_dict.
        By default include all labels.
    Examples
    --------
    >>> predicts = [mx.np.array(np.array([3, -0.5, 2, 7]).reshape(4,1))]
    >>> labels = [mx.np.array(np.array([2.5, 0.0, 2, 8]).reshape(4,1))]
    >>> feval = lambda x, y : (x + y).mean()
    >>> eval_metrics = mx.gluon.metric.CustomMetric(feval=feval)
    >>> eval_metrics.update(labels, predicts)
    >>> eval_metrics.get()
    ('custom(<lambda>)', 6.0)
    """
    def __init__(self, feval, name=None, allow_extra_outputs=False,
                 output_names=None, label_names=None):
        if name is None:
            name = feval.__name__
            if name.find('<') != -1:
                name = f'custom({name})'
        super(CustomMetric, self).__init__(
            name, feval=feval,
            allow_extra_outputs=allow_extra_outputs,
            output_names=output_names, label_names=label_names)
        self._feval = feval
        self._allow_extra_outputs = allow_extra_outputs
[docs]    def update(self, labels, preds):
        """Updates the internal evaluation result.
        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data.
        preds : list of `NDArray`
            Predicted values.
        """
        if not self._allow_extra_outputs:
            labels, preds = check_label_shapes(labels, preds, True)
        for pred, label in zip(preds, labels):
            label = label.as_np_ndarray()
            pred = pred.as_np_ndarray().to_device(label.device)
            reval = self._feval(label, pred)
            if isinstance(reval, tuple):
                (sum_metric, num_inst) = reval
                self.sum_metric += sum_metric
                self.num_inst += num_inst
            else:
                self.sum_metric += reval
                self.num_inst += 1
# pylint: disable=invalid-name
[docs]def np(numpy_feval, name=None, allow_extra_outputs=False):
    """Creates a custom evaluation metric that receives its inputs as numpy arrays.
    Parameters
    ----------
    numpy_feval : callable(label, pred)
        Custom evaluation function that receives labels and predictions for a minibatch
        as numpy arrays and returns the corresponding custom metric as a floating point number.
    name : str, optional
        Name of the custom metric.
    allow_extra_outputs : bool, optional
        Whether prediction output is allowed to have extra outputs. This is useful in cases
        like RNN where states are also part of output which can then be fed back to the RNN
        in the next step. By default, extra outputs are not allowed.
    Returns
    -------
    float
        Custom metric corresponding to the provided labels and predictions.
    Example
    -------
    >>> def custom_metric(label, pred):
    ...     return np.mean(np.abs(label-pred))
    ...
    >>> metric = mx.gluon.metric.np(custom_metric)
    """
    def feval(label, pred):
        """Internal eval function."""
        return numpy_feval(label, pred)
    feval.__name__ = numpy_feval.__name__
    return CustomMetric(feval, name, allow_extra_outputs)
# pylint: enable=invalid-name
Did this page help you?
    Yes
        No
    Thanks for your feedback!
