Source code for mxnet.test_utils

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""Tools for testing."""
# pylint: disable=too-many-lines
import time
import gzip
import struct
import traceback
import numbers
import sys
import os
import platform
import errno
import logging
import bz2
import zipfile
import json
from contextlib import contextmanager
from collections import OrderedDict
import numpy as np
import numpy.testing as npt
import numpy.random as rnd
try:
    import scipy.stats as ss
except ImportError:
    ss = None
try:
    import requests
except ImportError:
    # in rare cases requests may be not installed
    pass
import mxnet as mx
from .device import current_device
from .ndarray.ndarray import _STORAGE_TYPE_STR_TO_ID, get_dtype_name
from .symbol import Symbol
from .symbol.numpy import _Symbol as np_symbol
from .util import use_np, use_np_default_dtype, getenv, setenv  # pylint: disable=unused-import
from .util import get_max_supported_compute_capability, get_rtc_compile_opts # pylint: disable=unused-import
from .runtime import Features
from .numpy_extension import get_cuda_compute_capability


[docs]def default_device(): """Get default device for regression test.""" # _TODO: get device from environment variable to support # testing with GPUs return current_device()
[docs]def set_default_device(device): """Set default device.""" mx.device._current.set(device)
[docs]def default_dtype(): """Get default data type for regression test.""" # _TODO: get default dtype from environment variable return np.float32
[docs]def default_rtols(): """Get default relative tolerances for data comparisons involving each data type.""" return {np.dtype(np.float16): 1e-2, np.dtype(np.float32): 1e-4, np.dtype(np.float64): 1e-5, np.dtype(np.bool): 0, np.dtype(np.int8): 0, np.dtype(np.uint8): 0, np.dtype(np.int32): 0, np.dtype(np.uint32): 0, np.dtype(np.int64): 0, np.dtype(np.uint64): 0}
[docs]def default_atols(): """Get default absolute tolerances for data comparisons involving each data type.""" return {np.dtype(np.float16): 1e-1, np.dtype(np.float32): 1e-3, np.dtype(np.float64): 1e-20, np.dtype(np.bool): 0, np.dtype(np.int8): 0, np.dtype(np.uint8): 0, np.dtype(np.int32): 0, np.dtype(np.uint32): 0, np.dtype(np.int64): 0, np.dtype(np.uint64): 0}
[docs]def default_numeric_eps(): """Get default epsilon for finite difference gradient calculations with data type.""" # prefer a power-of-two eps, since no bits are dropped when serving as an input delta return {np.dtype(np.float16): 1.0 / 2**6, np.dtype(np.float32): 1.0 / 2**9, np.dtype(np.float64): 1.0 / 2**14}
[docs]def effective_dtype(dat): """ Return the most appropriate dtype for determining the tolerance used in dat comparisons Parameters ---------- dat : np.ndarray or mx.nd.array or mx.np.ndarray """ # On arch 80 gpus or later, a float32-io gemm or conv op will trim the mantissa of # data inputs to be of comparable precision to a float16, so float16 becomes the # 'effective dtype' for tolerance tests involving such op outputs. # Is TF32 enabled in the device (the default on arch 80 GPUs) def is_TF32_enabled(device): try: return (device.device_type == 'gpu' and get_cuda_compute_capability(device) >= 80 and os.environ.get('NVIDIA_TF32_OVERRIDE') != '0') except: # pylint: disable=bare-except return False device = dat.device if hasattr(dat, 'device') else None dtype = np.dtype(dat.dtype) if dtype == np.dtype(np.float32) and is_TF32_enabled(device): return np.dtype(np.float16) else: return dtype
[docs]def get_tolerance(dat, tol, default_tol): """ Return the tolerance to be used for dat comparisons based on the given tol, datatype and device. Parameters ---------- dat : np.ndarray or mx.nd.array or mx.np.ndarray tol : float, or a dict of dtype->float default_tol : default dict of dtype->float for all types """ if isinstance(tol, numbers.Number): return tol # If the caller has supplied a tol dict, use that if it has an entry for dtype, # else use the supplied default tol dict. dtype = effective_dtype(dat) tol = {} if tol is None else tol return tol.get(dtype, default_tol[dtype])
[docs]def get_tols(x, y, rtol, atol): """For comparing two datasets 'x' and 'y', what relative and absolute tolerances should be used.""" # Tolerance analysis needs 'dtype' of 'x' and 'y', so convert numbers to numpy scalars as needed if isinstance(x, numbers.Number): x = np.array(x) if isinstance(y, numbers.Number): y = np.array(y) # If tols are not specified, use the largest default tol for 'x' and 'y' based on their ctx and dtype. rtol = max(get_tolerance(x, rtol, default_rtols()), get_tolerance(y, rtol, default_rtols())) atol = max(get_tolerance(x, atol, default_atols()), get_tolerance(y, atol, default_atols())) return rtol, atol
[docs]def get_atol(atol=None, dtype=np.dtype(np.float64)): """Get default numerical threshold for regression test.""" return default_atols()[dtype] if atol is None else atol
[docs]def get_rtol(rtol=None, dtype=np.dtype(np.float64)): """Get default numerical threshold for regression test.""" return default_rtols()[dtype] if rtol is None else rtol
[docs]def get_etol(etol=None): """Get default numerical threshold for regression test.""" # _TODO: get from env variable, different threshold might # be needed for different device and dtype return 0 if etol is None else etol
[docs]def random_arrays(*shapes): """Generate some random numpy arrays.""" arrays = [np.array(np.random.randn(), dtype=default_dtype()) if len(s) == 0 else np.random.randn(*s).astype(default_dtype()) for s in shapes] if len(arrays) == 1: return arrays[0] return arrays
[docs]def random_uniform_arrays(*shapes, **kwargs): """Generate some random numpy arrays.""" low = kwargs.pop('low', 0.0) high = kwargs.pop('high', 1.0) dtype = kwargs.pop('dtype', default_dtype()) if len(kwargs) > 0: raise TypeError('Got unexpected argument/s : ' + str(kwargs.keys())) arrays = [np.random.uniform(low, high, size=s).astype(dtype) for s in shapes] return arrays
[docs]def random_sample(population, k): """Return a k length list of the elements chosen from the population sequence.""" assert 0 <= k <= len(population) population_copy = population[:] np.random.shuffle(population_copy) return population_copy[0:k]
def _sorted_items(d): """Return (key, value) pairs of dict 'd' in a deterministic order (sorted by key).""" return sorted(d.items(), key=lambda t: t[0]) def _sorted_dict(d): """Return ordered dictionary containing items ordered by their keys.""" return OrderedDict(_sorted_items(d)) def _validate_csr_generation_inputs(num_rows, num_cols, density, distribution="uniform"): """Validates inputs for csr generation helper functions """ total_nnz = int(num_rows * num_cols * density) if density < 0 or density > 1: raise ValueError("density has to be between 0 and 1") if num_rows <= 0 or num_cols <= 0: raise ValueError("num_rows or num_cols should be greater than 0") if distribution == "powerlaw": if total_nnz < 2 * num_rows: raise ValueError(f"not supported for this density: {density}" f" for this shape ({num_rows}, {num_cols})" " Please keep :" " num_rows * num_cols * density >= 2 * num_rows")
[docs]def shuffle_csr_column_indices(csr): """Shuffle CSR column indices per row This allows validation of unordered column indices, which is not a requirement for a valid CSR matrix """ row_count = len(csr.indptr) - 1 for i in range(row_count): start_index = csr.indptr[i] end_index = csr.indptr[i + 1] sublist = np.array(csr.indices[start_index : end_index]) np.random.shuffle(sublist) csr.indices[start_index : end_index] = sublist
def _get_uniform_dataset_csr(num_rows, num_cols, density=0.1, dtype=None, data_init=None, shuffle_csr_indices=False): """Returns CSRNDArray with uniform distribution This generates a csr matrix with totalnnz unique randomly chosen numbers from num_rows*num_cols and arranges them in the 2d array in the following way: row_index = (random_number_generated / num_rows) col_index = random_number_generated - row_index * num_cols """ _validate_csr_generation_inputs(num_rows, num_cols, density, distribution="uniform") try: from scipy import sparse as spsp csr = spsp.rand(num_rows, num_cols, density, dtype=dtype, format="csr") if data_init is not None: csr.data.fill(data_init) if shuffle_csr_indices is True: shuffle_csr_column_indices(csr) result = mx.nd.sparse.csr_matrix((csr.data, csr.indices, csr.indptr), shape=(num_rows, num_cols), dtype=dtype) except ImportError: assert(data_init is None), \ "data_init option is not supported when scipy is absent" assert(not shuffle_csr_indices), \ "shuffle_csr_indices option is not supported when scipy is absent" # scipy not available. try to generate one from a dense array dns = mx.nd.random.uniform(shape=(num_rows, num_cols), dtype=dtype) masked_dns = dns * (dns < density) result = masked_dns.tostype('csr') return result def _get_powerlaw_dataset_csr(num_rows, num_cols, density=0.1, dtype=None): """Returns CSRNDArray with powerlaw distribution with exponentially increasing number of non zeros in each row. Not supported for cases where total_nnz < 2*num_rows. This is because the algorithm first tries to ensure that there are rows with no zeros by putting non zeros at beginning of each row. """ _validate_csr_generation_inputs(num_rows, num_cols, density, distribution="powerlaw") total_nnz = int(num_rows * num_cols * density) unused_nnz = total_nnz output_arr = np.zeros((num_rows, num_cols), dtype=dtype) # Start with ones on each row so that no row is empty for row in range(num_rows): output_arr[row][0] = 1 + rnd.uniform(0.001, 2) unused_nnz = unused_nnz - 1 if unused_nnz <= 0: return mx.nd.array(output_arr).tostype("csr") # Populate rest of matrix with 2^i items in ith row. # if we have used all total nnz return the sparse matrix # else if we reached max column size then fill up full columns until we use all nnz col_max = 2 for row in range(num_rows): col_limit = min(num_cols, col_max) # In case col_limit reached assign same value to all elements, which is much faster if col_limit == num_cols and unused_nnz > col_limit: output_arr[row] = 1 + rnd.uniform(0.001, 2) unused_nnz = unused_nnz - col_limit + 1 if unused_nnz <= 0: return mx.nd.array(output_arr).tostype("csr") else: continue for col_index in range(1, col_limit): output_arr[row][col_index] = 1 + rnd.uniform(0.001, 2) unused_nnz = unused_nnz - 1 if unused_nnz <= 0: return mx.nd.array(output_arr).tostype("csr") col_max = col_max * 2 if unused_nnz > 0: raise ValueError(f"not supported for this density: {density}" f" for this shape ({num_rows},{num_cols})") return mx.nd.array(output_arr).tostype("csr")
[docs]def assign_each(the_input, function): """Return ndarray composed of passing each array value through some function""" if function is None: output = np.array(the_input) else: it_input = np.nditer(the_input, flags=['f_index']) output = np.zeros(the_input.shape) it_out = np.nditer(output, flags=['f_index'], op_flags=['writeonly']) while not it_input.finished: val_input = it_input[0] it_out[0] = function(val_input) it_input.iternext() it_out.iternext() return output
[docs]def assign_each2(input1, input2, function): """Return ndarray composed of passing two array values through some function""" if function is None: output = np.array(input1) else: assert input1.shape == input2.shape it_input1 = np.nditer(input1, flags=['f_index']) it_input2 = np.nditer(input2, flags=['f_index']) output = np.zeros(input1.shape) it_out = np.nditer(output, flags=['f_index'], op_flags=['writeonly']) while not it_input1.finished: val_input1 = it_input1[0] val_input2 = it_input2[0] it_out[0] = function(val_input1, val_input2) it_input1.iternext() it_input2.iternext() it_out.iternext() return output
def create_2d_np_tensor(rows, columns, dtype=np.int64): inp = mx.np.arange(0, rows, dtype=dtype).reshape(rows, 1) inp = mx.np.broadcast_to(inp, shape=(inp.shape[0], columns)) return inp # For testing Large Tensors having total size > 2^32 elements def create_2d_tensor(rows, columns, dtype=np.int64): a = mx.nd.arange(0, rows, dtype=dtype).reshape(rows, 1) b = mx.nd.broadcast_to(a, shape=(a.shape[0], columns)) return b # For testing Large Vectors having total size > 2^32 elements def create_vector(size, dtype=np.int64): a = mx.nd.arange(0, size, dtype=dtype) return a
[docs]def rand_sparse_ndarray(shape, stype, density=None, dtype=None, distribution=None, data_init=None, rsp_indices=None, modifier_func=None, shuffle_csr_indices=False, ctx=None): """Generate a random sparse ndarray. Returns the ndarray, value(np) and indices(np) Parameters ---------- shape: list or tuple stype: str valid values: "csr" or "row_sparse" density: float, optional should be between 0 and 1 distribution: str, optional valid values: "uniform" or "powerlaw" dtype: numpy.dtype, optional default value is None Returns ------- Result of type CSRNDArray or RowSparseNDArray Examples -------- Below is an example of the powerlaw distribution with csr as the stype. It calculates the nnz using the shape and density. It fills up the ndarray with exponentially increasing number of elements. If there are enough unused_nnzs, n+1th row will have twice more nnzs compared to nth row. else, remaining unused_nnzs will be used in n+1th row If number of cols is too small and we have already reached column size it will fill up all following columns in all followings rows until we reach the required density. >>> csr_arr, _ = rand_sparse_ndarray(shape=(5, 16), stype="csr", density=0.50, distribution="powerlaw") >>> indptr = csr_arr.indptr.asnumpy() >>> indices = csr_arr.indices.asnumpy() >>> data = csr_arr.data.asnumpy() >>> row2nnz = len(data[indptr[1]:indptr[2]]) >>> row3nnz = len(data[indptr[2]:indptr[3]]) >>> assert(row3nnz == 2*row2nnz) >>> row4nnz = len(data[indptr[3]:indptr[4]]) >>> assert(row4nnz == 2*row3nnz) """ ctx = ctx if ctx else default_device() density = rnd.rand() if density is None else density dtype = default_dtype() if dtype is None else dtype distribution = "uniform" if distribution is None else distribution if stype == 'row_sparse': assert (distribution == "uniform"), \ f"Distribution {distribution} not supported for row_sparse" # sample index if rsp_indices is not None: indices = rsp_indices assert(len(indices) <= shape[0]) else: idx_sample = rnd.rand(shape[0]) indices = np.argwhere(idx_sample < density).flatten() if indices.shape[0] == 0: result = mx.nd.zeros(shape, stype='row_sparse', dtype=dtype, ctx=ctx) return result, (np.array([], dtype=dtype), np.array([])) # generate random values val = rnd.rand(indices.shape[0], *shape[1:]).astype(dtype) # Allow caller to override or adjust random values if data_init is not None: val.fill(data_init) if modifier_func is not None: val = assign_each(val, modifier_func) arr = mx.nd.sparse.row_sparse_array((val, indices), shape=shape, dtype=dtype, ctx=ctx) return arr, (val, indices) elif stype == 'csr': assert len(shape) == 2 if distribution == "uniform": csr = _get_uniform_dataset_csr(shape[0], shape[1], density, data_init=data_init, shuffle_csr_indices=shuffle_csr_indices, dtype=dtype).as_in_context(ctx) return csr, (csr.indptr, csr.indices, csr.data) elif distribution == "powerlaw": csr = _get_powerlaw_dataset_csr(shape[0], shape[1], density=density, dtype=dtype).as_in_context(ctx) return csr, (csr.indptr, csr.indices, csr.data) else: assert(False), f"Distribution not supported: {distribution}" return False else: assert(False), "unknown storage type" return False
[docs]def rand_ndarray(shape, stype='default', density=None, dtype=None, modifier_func=None, shuffle_csr_indices=False, distribution=None, ctx=None): """Generate a random sparse ndarray. Returns the generated ndarray.""" ctx = ctx if ctx else default_device() if stype == 'default': arr = mx.nd.array(random_arrays(shape), dtype=dtype, ctx=ctx) else: arr, _ = rand_sparse_ndarray(shape, stype, density=density, modifier_func=modifier_func, dtype=dtype, shuffle_csr_indices=shuffle_csr_indices, distribution=distribution, ctx=ctx) return arr
[docs]def create_sparse_array(shape, stype, data_init=None, rsp_indices=None, dtype=None, modifier_func=None, density=.5, shuffle_csr_indices=False): """Create a sparse array, For Rsp, assure indices are in a canonical format""" if stype == 'row_sparse': if rsp_indices is not None: arr_indices = np.asarray(rsp_indices) arr_indices.sort() else: arr_indices = None arr_data, (_, _) = rand_sparse_ndarray(shape, stype, density=density, data_init=data_init, rsp_indices=arr_indices, dtype=dtype, modifier_func=modifier_func) elif stype == 'csr': arr_data, (_, _, _) = rand_sparse_ndarray(shape, stype, density=density, data_init=data_init, dtype=dtype, modifier_func=modifier_func, shuffle_csr_indices=shuffle_csr_indices) else: msg = "Unknown storage type: " + stype raise AssertionError(msg) return arr_data
[docs]def create_sparse_array_zd(shape, stype, density, data_init=None, rsp_indices=None, dtype=None, modifier_func=None, shuffle_csr_indices=False): """Create sparse array, using only rsp_indices to determine density""" if stype == 'row_sparse': density = 0.0 if rsp_indices is not None: assert len(rsp_indices) <= shape[0] return create_sparse_array(shape, stype, data_init=data_init, rsp_indices=rsp_indices, dtype=dtype, modifier_func=modifier_func, density=density, shuffle_csr_indices=shuffle_csr_indices)
def rand_shape_2d(dim0=10, dim1=10, allow_zero_size=False): low = 0 if allow_zero_size else 1 return rnd.randint(low, dim0 + 1), rnd.randint(low, dim1 + 1) def rand_shape_3d(dim0=10, dim1=10, dim2=10, allow_zero_size=False): low = 0 if allow_zero_size else 1 return rnd.randint(low, dim0 + 1), rnd.randint(low, dim1 + 1), rnd.randint(low, dim2 + 1) def rand_shape_nd(num_dim, dim=10, allow_zero_size=False): low = 0 if allow_zero_size else 1 return tuple(rnd.randint(low, dim+1, size=num_dim)) def rand_coord_2d(x_low, x_high, y_low, y_high): x = np.random.randint(x_low, x_high, dtype=np.int64) y = np.random.randint(y_low, y_high, dtype=np.int64) return x, y
[docs]def np_reduce(dat, axis, keepdims, numpy_reduce_func): """Compatible reduce for old version of NumPy. Parameters ---------- dat : np.ndarray Same as NumPy. axis : None or int or list-like Same as NumPy. keepdims : bool Same as NumPy. numpy_reduce_func : function A NumPy reducing function like ``np.sum`` or ``np.max``. """ if isinstance(axis, int): axis = [axis] else: axis = list(axis) if axis is not None else range(len(dat.shape)) ret = dat for i in reversed(sorted(axis)): ret = numpy_reduce_func(ret, axis=i) if keepdims: keepdims_shape = list(dat.shape) for i in axis: keepdims_shape[i] = 1 ret = ret.reshape(tuple(keepdims_shape)) return ret
def _find_max_violation(a, b, rtol, atol): """Finds and returns the location of maximum violation.""" # 'smart' absdiff that considers inf's as equals (to match np.allclose) absdiff = np.where(np.equal(a, b), 0, np.abs(a-b)) tol = atol + rtol*np.abs(b) violation = absdiff/(tol+1e-20) loc = np.argmax(violation) idx = np.unravel_index(loc, violation.shape) return idx, np.max(violation)
[docs]def same(a, b): """Test if two NumPy arrays are the same. Parameters ---------- a : np.ndarray b : np.ndarray """ return np.array_equal(a, b)
def checkShapes(a, b): if a.shape != b.shape: msg = npt.build_err_msg([a, b], err_msg="a.shape = {} and b.shape = {} are not equal" .format(str(a.shape), str(b.shape))) raise AssertionError(msg)
[docs]def almost_equal(a, b, rtol=None, atol=None, equal_nan=False, use_broadcast=True): """Test if two numpy arrays are almost equal.""" # pylint: disable=unexpected-keyword-arg if not use_broadcast: checkShapes(a, b) return np.allclose(a, b, rtol=get_rtol(rtol), atol=get_atol(atol), equal_nan=equal_nan)
# pylint: enable=unexpected-keyword-arg
[docs]def locationError(a, b, index, names, maxError=False): """Create element mismatch comment Parameters ---------- a, b : compared np.ndarray's index : tuple of coordinate arrays Location of violation names : tuple of names The names of compared arrays. maxError: boolean, optional Flag indicating that maximum error is reporting. """ maximum = "maximum " if maxError else "" return f"Location of {maximum} error: {str(index)}, {names[0]}={a[index]:.8f}, {names[1]}={b[index]:.8f}"
[docs]def assert_almost_equal(a, b, rtol=None, atol=None, names=('a', 'b'), equal_nan=False, use_broadcast=True, mismatches=(10, 10)): """Test that two numpy arrays are almost equal. Raise exception message if not. Parameters ---------- a : np.ndarray or mx.nd.array b : np.ndarray or mx.nd.array rtol : None or float or dict of dtype -> float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float or dict of dtype -> float The absolute threshold. Default threshold will be used if set to ``None``. names : tuple of names, optional The names used in error message when an exception occurs equal_nan : boolean, optional The flag determining how to treat NAN values in comparison mismatches : tuple of mismatches Maximum number of mismatches to be printed (mismatches[0]) and determine (mismatches[1]) """ if not use_broadcast: checkShapes(a, b) rtol, atol = get_tols(a, b, rtol, atol) if isinstance(a, mx.numpy.ndarray): a = a.asnumpy() if isinstance(b, mx.numpy.ndarray): b = b.asnumpy() use_np_allclose = isinstance(a, np.ndarray) and isinstance(b, np.ndarray) if not use_np_allclose: if not (hasattr(a, 'ctx') and hasattr(b, 'ctx') and a.device == b.device and a.dtype == b.dtype): use_np_allclose = True if isinstance(a, mx.nd.NDArray): a = a.asnumpy() if isinstance(b, mx.nd.NDArray): b = b.asnumpy() if use_np_allclose: if hasattr(a, 'dtype') and a.dtype == np.bool_ and hasattr(b, 'dtype') and b.dtype == np.bool_: np.testing.assert_equal(a, b) return if almost_equal(a, b, rtol, atol, equal_nan=equal_nan): return else: output = mx.nd.contrib.allclose(a, b, rtol, atol, equal_nan) if output.asnumpy() == 1: return a = a.asnumpy() b = b.asnumpy() index, rel = _find_max_violation(a, b, rtol, atol) if index != (): # a, b are the numpy arrays indexErr = index relErr = rel print('\n*** Maximum errors for vector of size {}: rtol={}, atol={}\n'.format(a.size, rtol, atol)) aTmp = a.copy() bTmp = b.copy() i = 1 while i <= a.size: if i <= mismatches[0]: print(f"{i:3d}: Error {rel} {locationError(a, b, index, names)}") aTmp[index] = bTmp[index] = 0 if almost_equal(aTmp, bTmp, rtol, atol, equal_nan=equal_nan): break i += 1 if i <= mismatches[1] or mismatches[1] <= 0: index, rel = _find_max_violation(aTmp, bTmp, rtol, atol) else: break mismatchDegree = "at least " if mismatches[1] > 0 and i > mismatches[1] else "" errMsg = f"Error {relErr} exceeds tolerance rtol={rtol:e}, atol={atol:e} " \ f"(mismatch {mismatchDegree}{100*i/a.size}%).\n" \ f"{locationError(a, b, indexErr, names, maxError=True)}" else: errMsg = f"Error {rel} exceeds tolerance rtol={rtol:e}, atol={atol:e}.\n" np.set_printoptions(threshold=4, suppress=True) msg = npt.build_err_msg([a, b], err_msg=errMsg) raise AssertionError(msg)
def assert_allclose(a, b, rtol=1e-07, atol=0, equal_nan=True): assert_almost_equal(a, b, rtol=rtol, atol=atol, equal_nan=equal_nan)
[docs]def assert_almost_equal_with_err(a, b, rtol=None, atol=None, etol=None, names=('a', 'b'), equal_nan=False, mismatches=(10, 10)): """Test that two numpy arrays are almost equal within given error rate. Raise exception message if not. Parameters ---------- a : np.ndarray b : np.ndarray rtol : None or float or dict of dtype -> float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float or dict of dtype -> float The absolute threshold. Default threshold will be used if set to ``None``. etol : None or float The error rate threshold. If etol is float, return true if error_rate < etol even if any error is found. names : tuple of names, optional The names used in error message when an exception occurs equal_nan : boolean, optional The flag determining how to treat NAN values in comparison mismatches : tuple of mismatches Maximum number of mismatches to be printed (mismatches[0]) and determine (mismatches[1]) """ etol = get_etol(etol) if etol > 0: rtol, atol = get_tols(a, b, rtol, atol) if isinstance(a, mx.nd.NDArray): a = a.asnumpy() if isinstance(b, mx.nd.NDArray): b = b.asnumpy() equals = np.isclose(a, b, rtol=rtol, atol=atol) err = 1 - np.count_nonzero(equals) / equals.size if err > etol: index, rel = _find_max_violation(a, b, rtol, atol) indexErr = index relErr = rel print('\n*** Maximum errors for vector of size {}: rtol={}, atol={}\n'.format(a.size, rtol, atol)) aTmp = a.copy() bTmp = b.copy() i = 1 while i <= a.size: if i <= mismatches[0]: print(f"{i:3d}: Error {rel} {locationError(a, b, index, names)}") aTmp[index] = bTmp[index] = 0 if almost_equal(aTmp, bTmp, rtol, atol, equal_nan=equal_nan): break i += 1 if i <= mismatches[1] or mismatches[1] <= 0: index, rel = _find_max_violation(aTmp, bTmp, rtol, atol) else: break mismatchDegree = "at least " if mismatches[1] > 0 and i > mismatches[1] else "" errMsg = f"Error {relErr} exceeds tolerance rtol={rtol:e}, atol={atol:e} " \ f"(mismatch {mismatchDegree}{100*i/a.size}%).\n" \ f"{locationError(a, b, indexErr, names, maxError=True)}" np.set_printoptions(threshold=4, suppress=True) msg = npt.build_err_msg([a, b], err_msg=errMsg) raise AssertionError(msg) else: assert_almost_equal(a, b, rtol=rtol, atol=atol, equal_nan=equal_nan)
[docs]def assert_almost_equal_ignore_nan(a, b, rtol=None, atol=None, names=('a', 'b')): """Test that two NumPy arrays are almost equal (ignoring NaN in either array). Combines a relative and absolute measure of approximate eqality. If either the relative or absolute check passes, the arrays are considered equal. Including an absolute check resolves issues with the relative check where all array values are close to zero. Parameters ---------- a : np.ndarray b : np.ndarray rtol : None or float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float The absolute threshold. Default threshold will be used if set to ``None``. """ a = np.copy(a) b = np.copy(b) nan_mask = np.logical_or(np.isnan(a), np.isnan(b)) a[nan_mask] = 0 b[nan_mask] = 0 assert_almost_equal(a, b, rtol, atol, names)
[docs]def assert_exception(f, exception_type, *args, **kwargs): """Test that function f will throw an exception of type given by `exception_type`""" try: f(*args, **kwargs) assert(False) except exception_type: return
def _parse_location(sym, location, ctx, dtype=default_dtype()): """Parses the given location to a ordered dictionary. Arguments of the provided op `sym` are used as dictionary keys and elements of `location` are used as values. Parameters ---------- sym : Symbol Symbol containing op location : list or tuple or dict Argument values location - if type is list or tuple of `np.ndarray` inner elements are arrays correspoding to ``sym.list_arguments()``. - if type is dict of str -> `np.ndarray` maps the name of arguments to the corresponding `np.ndarray`. *In either case, value of all the arguments must be provided.* ctx : Device Device context. dtype: "asnumpy" or np.float16 or np.float32 or np.float64 If dtype is "asnumpy" then the mx.nd.array created will have the same type as th numpy array from which it is copied. Otherwise, dtype is the explicit datatype for all mx.nd.array objects created in this function. Returns ------- dict Dictionary with `sym` arguments as keys and `location` elements as values. Examples ------- >>> a = mx.symbol.Variable('a') >>> b = mx.symbol.Variable('b') >>> l1 = np.ndarray([2,3]) >>> l2 = np.ndarray([3,4]) >>> _parse_location(a * b, [l1, l2], None) {'a': <NDArray 2x3 @cpu(0)>, 'b': <NDArray 3x4 @cpu(0)>} >>> _parse_location(a * b, {'a': l1, 'b': l2}, None) {'a': <NDArray 2x3 @cpu(0)>, 'b': <NDArray 3x4 @cpu(0)>} >>> _parse_location(a * b, {'a': l1}, None) ValueError: Symbol arguments and keys of the given location do not match. """ assert isinstance(location, (dict, list, tuple)) assert dtype == "asnumpy" or dtype in (np.float16, np.float32, np.float64) if isinstance(location, dict): if set(location.keys()) != set(sym.list_arguments()): raise ValueError("Symbol arguments and keys of the given location do not match." f"symbol args:{str(set(sym.list_arguments()))}, location.keys():{str(set(location.keys()))}") else: location = {k: v for k, v in zip(sym.list_arguments(), location)} location = {k: mx.nd.array(v, ctx=ctx, dtype=v.dtype if dtype == "asnumpy" else dtype) \ if isinstance(v, np.ndarray) else v for k, v in location.items()} return _sorted_dict(location) def _parse_aux_states(sym, aux_states, ctx, dtype=default_dtype()): """Parses the given auxiliary states to a dictionary. Auxiliary states of the provided op `sym` are used as dictionary keys and elements of `aux_states` are used as values. Parameters ---------- sym : Symbol Symbol containing op aux_states : None or list or dict Aux states - if type is list or tuple of `np.ndarray` inner elements are arrays correspoding to ``sym.list_auxiliary_states()``. - if type is dict of str -> `np.ndarray` maps the name of arguments to the corresponding `np.ndarray`. *In either case, all aux states of `sym` must be provided.* ctx : Device Device context. dtype: "asnumpy" or np.float16 or np.float32 or np.float64 If dtype is "asnumpy" then the mx.nd.array created will have the same type as th numpy array from which it is copied. Otherwise, dtype is the explicit datatype for all mx.nd.array objects created in this function. Returns ------- dict Dictionary with `sym` aux states as keys and `aux_states` elements as values. Examples ------- >>> data = mx.symbol.Variable('data') >>> weight = mx.sym.Variable(name='fc1_weight') >>> fc1 = mx.symbol.FullyConnected(data = data, weight=weight, name='fc1', num_hidden=128) >>> fc2 = mx.symbol.BatchNorm(fc1, name='batchnorm0') >>> mean_states = np.ones(3) >>> var_states = np.ones(3) >>> _parse_aux_states(fc2, [mean_states, var_states], None) {'batchnorm0_moving_var': <NDArray 3 @cpu(0)>, 'batchnorm0_moving_mean': <NDArray 3 @cpu(0)>} >>> _parse_aux_states(fc2, {'batchnorm0_moving_var': mean_states, ... 'batchnorm0_moving_mean': var_states}, None) {'batchnorm0_moving_var': <NDArray 3 @cpu(0)>, 'batchnorm0_moving_mean': <NDArray 3 @cpu(0)>} >>> _parse_aux_states(fc2, {'batchnorm0_moving_var': mean_states}, None) ValueError: Symbol aux_states names and given aux_states do not match. """ assert dtype == "asnumpy" or dtype in (np.float16, np.float32, np.float64) if aux_states is not None: if isinstance(aux_states, dict): if set(aux_states.keys()) != set(sym.list_auxiliary_states()): raise ValueError("Symbol aux_states names and given aux_states do not match." f"symbol aux_names:{str(set(sym.list_auxiliary_states()))}, aux_states.keys:{str(set(aux_states.keys()))}") elif isinstance(aux_states, (list, tuple)): aux_names = sym.list_auxiliary_states() aux_states = {k:v for k, v in zip(aux_names, aux_states)} aux_states = {k: mx.nd.array(v, ctx=ctx, dtype=v.dtype if dtype == "asnumpy" else dtype) \ for k, v in aux_states.items()} return aux_states
[docs]def numeric_grad(executor, location, aux_states=None, eps=1e-4, use_forward_train=True, dtype=default_dtype()): """Calculates a numeric gradient via finite difference method. Class based on Theano's `theano.gradient.numeric_grad` [1] Parameters ---------- executor : Executor Executor that computes the forward pass. location : list of numpy.ndarray or dict of str to numpy.ndarray Argument values used as location to compute gradient Maps the name of arguments to the corresponding numpy.ndarray. Value of all the arguments must be provided. aux_states : None or list of numpy.ndarray or dict of str to numpy.ndarray, optional Auxiliary states values used as location to compute gradient Maps the name of aux_states to the corresponding numpy.ndarray. Value of all the auxiliary arguments must be provided. eps : float, optional Epsilon for the finite-difference method. use_forward_train : bool, optional Whether to use `is_train=True` in testing. dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. References --------- ..[1] https://github.com/Theano/Theano/blob/master/theano/gradient.py """ def as_stype(var, stype, dtype): return mx.nd.cast_storage(mx.nd.array(var, dtype=dtype), stype=stype) assert dtype in (np.float16, np.float32, np.float64) approx_grads = {k: np.zeros(v.shape, dtype=dtype) for k, v in location.items()} for k, v in location.items(): stype = executor.arg_dict[k].stype if stype == 'default': executor.arg_dict[k][:] = as_stype(v, stype, dtype=dtype) for k in location: location[k] = np.asarray(location[k], order='C') for k, v in location.items(): if v.dtype.kind != 'f': continue stype = executor.arg_dict[k].stype old_value = v.copy() for i in range(int(np.prod(v.shape))): # inplace update v.ravel()[i] += eps/2.0 executor.arg_dict[k][:] = as_stype(v, stype, dtype=dtype) if aux_states is not None: for key, val in aux_states.items(): executor.aux_dict[key][:] = val executor.forward(is_train=use_forward_train) f_peps = executor.outputs[0].asnumpy() v.ravel()[i] -= eps executor.arg_dict[k][:] = as_stype(v, stype, dtype=dtype) if aux_states is not None: for key, val in aux_states.items(): adstype = executor.aux_dict[key].stype executor.aux_dict[key][:] = as_stype(val, adstype, dtype=dtype) executor.forward(is_train=use_forward_train) f_neps = executor.outputs[0].asnumpy() approx_grad = (f_peps - f_neps).sum() / eps approx_grads[k].ravel()[i] = approx_grad v.ravel()[i] = old_value.ravel()[i] # copy back the original value executor.arg_dict[k][:] = as_stype(old_value, stype, dtype=dtype) return approx_grads
[docs]def check_numeric_gradient(sym, location, aux_states=None, numeric_eps=None, rtol=None, atol=None, grad_nodes=None, use_forward_train=True, ctx=None, grad_stype_dict=None, dtype=default_dtype()): """Verify an operation by checking backward pass via finite difference method. Based on Theano's `theano.gradient.verify_grad` [1] Parameters ---------- sym : Symbol Symbol containing op to test location : list or tuple or dict Argument values used as location to compute gradient - if type is list of numpy.ndarray, \ inner elements should have the same order as mxnet.sym.list_arguments(). - if type is dict of str -> numpy.ndarray, \ maps the name of arguments to the corresponding numpy.ndarray. *In either case, value of all the arguments must be provided.* aux_states : list or tuple or dict, optional The auxiliary states required when generating the executor for the symbol. numeric_eps : float, optional Delta for the finite difference method that approximates the gradient. rtol : None or float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float The absolute threshold. Default threshold will be used if set to ``None``. grad_nodes : None or list or tuple or dict, optional Names of the nodes to check gradient on use_forward_train : bool Whether to use is_train=True when computing the finite-difference. ctx : Context, optional Check the gradient computation on the specified device. grad_stype_dict : dict of str->str, optional Storage type dictionary for gradient ndarrays. dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. References --------- [1] https://github.com/Theano/Theano/blob/master/theano/gradient.py """ assert dtype in (np.float16, np.float32, np.float64) if ctx is None: ctx = default_device() def random_projection(shape): """Get a random weight matrix with not too small elements Parameters ---------- shape : list or tuple """ # random_projection should not have elements too small, # otherwise too much precision is lost in numerical gradient plain = np.random.rand(*shape) + 0.1 return plain location = _parse_location(sym=sym, location=location, ctx=ctx, dtype=dtype) location_npy = {k:v.asnumpy() for k, v in location.items()} aux_states = _parse_aux_states(sym=sym, aux_states=aux_states, ctx=ctx, dtype=dtype) if aux_states is not None: aux_states_npy = {k: v.asnumpy() for k, v in aux_states.items()} else: aux_states_npy = None if grad_nodes is None: grad_nodes = sym.list_arguments() grad_req = {k: 'write' for k in grad_nodes} elif isinstance(grad_nodes, (list, tuple)): grad_nodes = list(grad_nodes) grad_req = {k: 'write' for k in grad_nodes} elif isinstance(grad_nodes, dict): grad_req = grad_nodes.copy() grad_nodes = grad_nodes.keys() else: raise ValueError input_shape = {k: v.shape for k, v in location.items()} _, out_shape, _ = sym.infer_shape(**input_shape) proj = mx.sym.Variable("__random_proj") is_np_sym = bool(isinstance(sym, np_symbol)) if is_np_sym: # convert to np symbol for using element-wise multiplication proj = proj.as_np_ndarray() out = sym * proj if is_np_sym: # convert to classic symbol so that make_loss can be used out = out.as_nd_ndarray() out = mx.sym.make_loss(out) location = dict(list(location.items()) + [("__random_proj", mx.nd.array(random_projection(out_shape[0]), ctx=ctx, dtype=dtype))]) args_grad_npy = dict([(k, np.random.normal(0, 0.01, size=location[k].shape)) for k in grad_nodes] + [("__random_proj", np.random.normal(0, 0.01, size=out_shape[0]))]) args_grad = {k: mx.nd.array(v, ctx=ctx, dtype=dtype) for k, v in args_grad_npy.items()} if grad_stype_dict is not None: assert isinstance(grad_stype_dict, dict), "grad_stype_dict must be a dict" for k, v in grad_stype_dict.items(): if k in args_grad and v in _STORAGE_TYPE_STR_TO_ID and v != 'default': # create an uninitialized sparse ndarray for executor # if the symbolic grad is expected to be zero, it should not be initialized at all args_grad[k] = mx.nd.zeros(args_grad[k].shape, args_grad[k].context, args_grad[k].dtype, v) grad_req["__random_proj"] = 'write' executor = out._bind(ctx, grad_req=grad_req, args=location, args_grad=args_grad, aux_states=aux_states) inps = executor.arg_arrays if len(inps) != len(location): raise ValueError("Executor arg_arrays and and location len do not match." f"Got {len(inps)} inputs and {len(location)} locations") executor.forward(is_train=True) assert len(executor.outputs) == 1 eps = get_tolerance(executor.outputs[0], numeric_eps, default_numeric_eps()) # cannot use finite differences with small eps without high precision if dtype in (np.float32, np.float16): assert eps >= 1e-5 executor.backward() symbolic_grads = executor.grad_dict numeric_gradients = numeric_grad( executor, location_npy, aux_states_npy, eps=eps, use_forward_train=use_forward_train, dtype=dtype) for name in grad_nodes: fd_grad = numeric_gradients[name] orig_grad = args_grad_npy[name] sym_grad = symbolic_grads[name] if grad_req[name] == 'write': assert_almost_equal(fd_grad, sym_grad, rtol, atol, (f"NUMERICAL_{name}", f"BACKWARD_{name}")) elif grad_req[name] == 'add': if isinstance(sym_grad, mx.nd.NDArray): sym_grad = sym_grad.asnumpy() assert_almost_equal(fd_grad, sym_grad - orig_grad, rtol, atol, (f"NUMERICAL_{name}", f"BACKWARD_{name}")) elif grad_req[name] == 'null': assert sym_grad is None else: raise ValueError(f"Invalid grad_req {grad_req[name]} for argument {name}")
[docs]def check_symbolic_forward(sym, location, expected, rtol=None, atol=None, aux_states=None, ctx=None, equal_nan=False, dtype=default_dtype()): """Compares a symbol's forward results with the expected ones. Prints error messages if the forward results are not the same as the expected ones. Parameters --------- sym : Symbol output symbol location : list of np.ndarray or dict of str to np.ndarray The evaluation point - if type is list of np.ndarray Contains all the numpy arrays corresponding to `sym.list_arguments()`. - if type is dict of str to np.ndarray Contains the mapping between argument names and their values. expected : list of np.ndarray or dict of str to np.ndarray The expected output value - if type is list of np.ndarray Contains arrays corresponding to exe.outputs. - if type is dict of str to np.ndarray Contains mapping between sym.list_output() and exe.outputs. rtol : None or float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float The absolute threshold. Default threshold will be used if set to ``None``. aux_states : list of np.ndarray of dict, optional - if type is list of np.ndarray Contains all the NumPy arrays corresponding to sym.list_auxiliary_states - if type is dict of str to np.ndarray Contains the mapping between names of auxiliary states and their values. device : Device, optional running context dtype: "asnumpy" or np.float16 or np.float32 or np.float64 If dtype is "asnumpy" then the mx.nd.array created will have the same type as th numpy array from which it is copied. Otherwise, dtype is the explicit datatype for all mx.nd.array objects created in this function. equal_nan: Boolean if True, `nan` is a valid value for checking equivalency (ie `nan` == `nan`) Example ------- >>> shape = (2, 2) >>> lhs = mx.symbol.Variable('lhs') >>> rhs = mx.symbol.Variable('rhs') >>> sym_dot = mx.symbol.dot(lhs, rhs) >>> mat1 = np.array([[1, 2], [3, 4]]) >>> mat2 = np.array([[5, 6], [7, 8]]) >>> ret_expected = np.array([[19, 22], [43, 50]]) >>> check_symbolic_forward(sym_dot, [mat1, mat2], [ret_expected]) """ assert dtype == "asnumpy" or dtype in (np.float16, np.float32, np.float64) if ctx is None: ctx = default_device() location = _parse_location(sym=sym, location=location, ctx=ctx, dtype=dtype) aux_states = _parse_aux_states(sym=sym, aux_states=aux_states, ctx=ctx, dtype=dtype) if isinstance(expected, dict): expected = [expected[k] for k in sym.list_outputs()] args_grad_data = {k:mx.nd.empty(v.shape, ctx=ctx, dtype=v.dtype if dtype == "asnumpy" else dtype) \ for k, v in location.items()} executor = sym._bind(ctx=ctx, args=location, args_grad=args_grad_data, aux_states=aux_states) for g in executor.grad_arrays: if g.ndim == 0: g[()] = 0 else: g[:] = 0 executor.forward(is_train=False) outputs = executor.outputs for output_name, expect, output in zip(sym.list_outputs(), expected, outputs): assert_almost_equal(expect, output, rtol, atol, (f"EXPECTED_{output_name}", f"FORWARD_{output_name}"), equal_nan=equal_nan) return executor.outputs
[docs]def check_symbolic_backward(sym, location, out_grads, expected, rtol=None, atol=None, aux_states=None, grad_req='write', ctx=None, grad_stypes=None, equal_nan=False, dtype=default_dtype()): """Compares a symbol's backward results with the expected ones. Prints error messages if the backward results are not the same as the expected results. Parameters --------- sym : Symbol output symbol location : list of np.ndarray or dict of str to np.ndarray The evaluation point - if type is list of np.ndarray Contains all the NumPy arrays corresponding to ``mx.sym.list_arguments``. - if type is dict of str to np.ndarray Contains the mapping between argument names and their values. out_grads : None or list of np.ndarray or dict of str to np.ndarray NumPys arrays corresponding to sym.outputs for incomming gradient. - if type is list of np.ndarray Contains arrays corresponding to ``exe.outputs``. - if type is dict of str to np.ndarray contains mapping between mxnet.sym.list_output() and Executor.outputs expected : list of np.ndarray or dict of str to np.ndarray expected gradient values - if type is list of np.ndarray Contains arrays corresponding to exe.grad_arrays - if type is dict of str to np.ndarray Contains mapping between ``sym.list_arguments()`` and exe.outputs. rtol : None or float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float The absolute threshold. Default threshold will be used if set to ``None``. aux_states : list of np.ndarray or dict of str to np.ndarray grad_req : str or list of str or dict of str to str, optional Gradient requirements. 'write', 'add' or 'null'. ctx : Context, optional Running context. grad_stypes: dict of str->str dictionary of mapping argument name to stype for the gradient equal_nan: Boolean if True, `nan` is a valid value for checking equivalency (ie `nan` == `nan`) dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. Example ------- >>> lhs = mx.symbol.Variable('lhs') >>> rhs = mx.symbol.Variable('rhs') >>> sym_add = mx.symbol.elemwise_add(lhs, rhs) >>> mat1 = np.array([[1, 2], [3, 4]]) >>> mat2 = np.array([[5, 6], [7, 8]]) >>> grad1 = mx.nd.zeros(shape) >>> grad2 = mx.nd.zeros(shape) >>> exec_add = sym_add._bind(default_device(), args={'lhs': mat1, 'rhs': mat2}, ... args_grad={'lhs': grad1, 'rhs': grad2}, grad_req={'lhs': 'write', 'rhs': 'write'}) >>> exec_add.forward(is_train=True) >>> ograd = mx.nd.ones(shape) >>> grad_expected = ograd.copy().asnumpy() >>> check_symbolic_backward(sym_add, [mat1, mat2], [ograd], [grad_expected, grad_expected]) """ assert dtype == 'asnumpy' or dtype in (np.float16, np.float32, np.float64) if ctx is None: ctx = default_device() location = _parse_location(sym=sym, location=location, ctx=ctx, dtype=dtype) aux_states = _parse_aux_states(sym=sym, aux_states=aux_states, ctx=ctx, dtype=dtype) if isinstance(expected, (list, tuple)): expected = {k:v for k, v in zip(sym.list_arguments(), expected)} # Dirty the output buffer deterministically, for reproducibility. args_grad_npy = {k:np.random.normal(size=v.shape) for k, v in _sorted_items(expected)} args_grad_data = {} for k, v in args_grad_npy.items(): nd = mx.nd.array(v, ctx=ctx, dtype=expected[k].dtype if dtype == "asnumpy" else dtype) if grad_stypes is not None and k in grad_stypes: stype = grad_stypes[k] if stype is not None and stype != 'default': out = create_sparse_array(v.shape, stype, density=0.0) else: out = nd args_grad_data[k] = out else: args_grad_data[k] = nd if isinstance(grad_req, str): grad_req = {k:grad_req for k in sym.list_arguments()} elif isinstance(grad_req, (list, tuple)): grad_req = {k:v for k, v in zip(sym.list_arguments(), grad_req)} executor = sym._bind(ctx=ctx, args=location, args_grad=args_grad_data, aux_states=aux_states, grad_req=grad_req) outputs = executor.forward(is_train=True) if isinstance(out_grads, (tuple, list)): outg = list() for i, arr in enumerate(out_grads): stype = outputs[i].stype if isinstance(arr, np.ndarray): dtype = arr.dtype if dtype == "asnumpy" else dtype outg.append(mx.nd.array(arr, ctx=ctx, dtype=dtype).tostype(stype)) else: outg.append(arr.tostype(stype)) out_grads = outg elif isinstance(out_grads, dict): outg = dict() for k, v in out_grads.items(): if isinstance(v, np.ndarray): dtype = v.dtype if dtype == "asnumpy" else dtype outg[k] = mx.nd.array(v, ctx=ctx, dtype=dtype) else: outg[k] = v out_grads = outg else: assert out_grads is None executor.backward(out_grads) grads = args_grad_data for name in expected: if grad_req[name] == 'write': assert_almost_equal(expected[name], grads[name], rtol, atol, (f"EXPECTED_{name}", f"BACKWARD_{name}"), equal_nan=equal_nan) elif grad_req[name] == 'add': grad = grads[name].asnumpy() if isinstance(grads[name], mx.nd.NDArray) else grads[name] assert_almost_equal(expected[name], grad - args_grad_npy[name], rtol, atol, (f"EXPECTED_{name}", f"BACKWARD_{name}"), equal_nan=equal_nan) elif grad_req[name] == 'null': assert_almost_equal(args_grad_npy[name], grads[name], rtol, atol, (f"EXPECTED_{name}", f"BACKWARD_{name}"), equal_nan=equal_nan) else: raise ValueError(f"Invalid grad_req {grad_req[name]} for argument {name}") return args_grad_data
[docs]def check_speed(sym, location=None, ctx=None, N=20, grad_req=None, typ="whole", **kwargs): """Check the running speed of a symbol. Parameters ---------- sym : Symbol Symbol to run the speed test. location : none or dict of str to np.ndarray Location to evaluate the inner executor. ctx : Context Running context. N : int, optional Repeat times. grad_req : None or str or list of str or dict of str to str, optional Gradient requirements. typ : str, optional "whole" or "forward" - "whole" Test the forward_backward speed. - "forward" Only test the forward speed. """ if ctx is None: ctx = default_device() if grad_req is None: grad_req = 'write' if location is None: exe = sym._simple_bind(grad_req=grad_req, ctx=ctx, **kwargs) location = {k: np.random.normal(size=arr.shape, scale=1.0) for k, arr in exe.arg_dict.items()} else: assert isinstance(location, dict), f'Expect dict, get "location"={str(location)}' exe = sym._simple_bind(grad_req=grad_req, ctx=ctx, **{k: v.shape for k, v in location.items()}) for name, iarr in location.items(): exe.arg_dict[name][:] = iarr.astype(exe.arg_dict[name].dtype) if typ == "whole": # Warm up exe.forward(is_train=True) exe.backward(out_grads=exe.outputs) for output in exe.outputs: output.wait_to_read() # Test forward + backward tic = time.time() for _ in range(N): exe.forward(is_train=True) exe.backward(out_grads=exe.outputs) mx.nd.waitall() toc = time.time() forward_backward_time = (toc - tic) * 1.0 / N return forward_backward_time elif typ == "forward": # Warm up exe.forward(is_train=False) for output in exe.outputs: output.wait_to_read() # Test forward only tic = time.time() for _ in range(N): exe.forward(is_train=False) mx.nd.waitall() toc = time.time() forward_time = (toc - tic) * 1.0 / N return forward_time else: raise ValueError('typ can only be "whole" or "forward".')
[docs]def check_consistency(sym, ctx_list, scale=1.0, grad_req='write', arg_params=None, aux_params=None, rtol=None, atol=None, raise_on_err=True, ground_truth=None, equal_nan=False, use_uniform=False, rand_type=np.float64): """Check symbol gives the same output for different running context Parameters ---------- sym : Symbol or list of Symbols Symbol(s) to run the consistency test. ctx_list : list Running context. See example for more detail. scale : float, optional Standard deviation of the inner normal distribution. Used in initialization. grad_req : str or list of str or dict of str to str Gradient requirement. arg_params : dict of input name -> input data data to use for non-aux inputs aux_params : dict of input name -> input data data to use for aux inputs rtol : float or dictionary dtype->float, optional The relative error tolerance. atol : float or dictionary dtype->float, optional The absolute error tolerance. raise_on_err : bool, optional, defaults to True Should an error raise an exception (or just output exception message) ground_truth : dict of output name -> data, optional Provided ideal result to be compared against equal_nan : bool, optional, defaults to False Should nans be treated as equal in the comparison use_uniform: bool Optional, When flag set to true, random input data generated follows uniform distribution, not normal distribution rand_type: np.dtype casts the randomly generated data to this type Optional, when input data is passed via arg_params, defaults to np.float64 (numpy float default) Examples -------- >>> # create the symbol >>> sym = mx.sym.Convolution(num_filter=3, kernel=(3,3), name='conv') >>> # initialize the running context >>> ctx_list =\ [{'ctx': mx.gpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float64}},\ {'ctx': mx.gpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float32}},\ {'ctx': mx.gpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float16}},\ {'ctx': mx.cpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float64}},\ {'ctx': mx.cpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float32}}] >>> check_consistency(sym, ctx_list) >>> sym = mx.sym.Concat(name='concat', num_args=2) >>> ctx_list = \ [{'ctx': mx.gpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float64, 'concat_arg1': np.float64}},\ {'ctx': mx.gpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float32, 'concat_arg1': np.float32}},\ {'ctx': mx.gpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float16, 'concat_arg1': np.float16}},\ {'ctx': mx.cpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float64, 'concat_arg1': np.float64}},\ {'ctx': mx.cpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float32, 'concat_arg1': np.float32}}] >>> check_consistency(sym, ctx_list) """ assert len(ctx_list) > 1 if isinstance(sym, Symbol): sym = [sym]*len(ctx_list) else: assert len(sym) == len(ctx_list) output_names = sym[0].list_outputs() arg_names = sym[0].list_arguments() exe_list = [] for s, ctx in zip(sym, ctx_list): assert s.list_arguments() == arg_names assert s.list_outputs() == output_names exe_list.append(s._simple_bind(grad_req=grad_req, **ctx)) arg_params = {} if arg_params is None else arg_params aux_params = {} if aux_params is None else aux_params # returns the least precise of two dtypes def smaller_dtype(dt1, dt2): return dt1 if dt2 is None or np.dtype(dt1).itemsize < np.dtype(dt2).itemsize else dt2 # It's important to assign random inputs in a deterministic order, for reproducibility. for n, arr in _sorted_items(exe_list[0].arg_dict): if n not in arg_params: if use_uniform: arg_params[n] = np.random.uniform(low=-0.92 * scale, high=0.92 * scale, size=arr.shape).astype(rand_type) else: arg_params[n] = np.random.normal(size=arr.shape, scale=scale).astype(rand_type) for n in exe_list[0].aux_dict: if n not in aux_params: aux_params[n] = 0 for exe in exe_list: for name, arr in exe.arg_dict.items(): arr[:] = arg_params[name] for name, arr in exe.aux_dict.items(): arr[:] = aux_params[name] # We need to initialize the gradient arrays if it's add. if (grad_req == "add"): for arr in exe.grad_arrays: arr[:] = np.zeros(arr.shape, dtype=arr.dtype) # test for exe in exe_list: exe.forward(is_train=False) dtypes = [np.dtype(exe.outputs[0].dtype) for exe in exe_list] # Select the ground truth as the first model having the highest precision output[0] gt_idx = np.argmax(dtypes) gt = ground_truth if gt is None: gt = exe_list[gt_idx].output_dict.copy() for i, exe in enumerate(exe_list): if i == gt_idx: continue for name, arr in zip(output_names, exe.outputs): gtarr = gt[name] try: assert_almost_equal(arr, gtarr, rtol=rtol, atol=atol, equal_nan=equal_nan) except AssertionError as e: print(f'Predict Err: ctx {i} vs ctx {gt_idx} at {name}') traceback.print_exc() if raise_on_err: raise e print(str(e)) # train if grad_req != 'null': # Perform forward() for exe in exe_list: exe.forward(is_train=True) # Use the first executor's output data, cast to the least precise dtype, # as the gradient data to pass to all executor's backward() call. least_precise_dtype = [out.dtype for out in exe_list[0].outputs] for exe in exe_list: least_precise_dtype = [smaller_dtype(out1.dtype, dt) \ for (out1, dt) in zip(exe.outputs, least_precise_dtype)] golden_data_np = [out.astype(dt).asnumpy() \ for (out, dt) in zip(exe_list[0].outputs, least_precise_dtype)] # Perform backward() for exe in exe_list: out_grads = [mx.nd.array(golden_np, ctx=exe._device, dtype=out.dtype).tostype(out.stype) for (golden_np, out) in zip(golden_data_np, exe.outputs)] exe.backward(out_grads) gt = ground_truth if gt is None: gt = exe_list[gt_idx].output_dict.copy() if grad_req != 'null': gt.update(exe_list[gt_idx].grad_dict) for i, exe in enumerate(exe_list): if i == gt_idx: continue curr = zip(output_names + arg_names, exe.outputs + exe.grad_arrays) for name, arr in curr: if gt[name] is None: assert arr is None, name continue gtarr = gt[name] try: rt, at = rtol, atol # If the primary data i/o type is float16, then the tolerance used when # comparing a float32 input gradient (e.g. batchnorm gamma) should be float16. smaller_arr_dtype = smaller_dtype(arr.dtype, dtypes[i]) smaller_gt_dtype = smaller_dtype(gtarr.dtype, dtypes[gt_idx]) if smaller_arr_dtype != arr.dtype or \ smaller_gt_dtype != gtarr.dtype: rt, at = get_tols(arr.astype(smaller_arr_dtype), gtarr.astype(smaller_gt_dtype), rtol, atol) assert_almost_equal(arr, gtarr, rtol=rt, atol=at, equal_nan=equal_nan) except AssertionError as e: print('Train Err: {} {} ctx {} vs {} {} ctx {} at {}'.format( get_dtype_name(arr.dtype), arr.device, i, get_dtype_name(gtarr.dtype), gtarr.device, gt_idx, name)) traceback.print_exc() if raise_on_err: raise e print(str(e)) return gt
[docs]def list_gpus(): """Return a list of GPUs Returns ------- list of int: If there are n GPUs, then return a list [0,1,...,n-1]. Otherwise returns []. """ return range(mx.util.get_gpu_count())
[docs]def download(url, fname=None, dirname=None, overwrite=False, retries=5): """Download an given URL Parameters ---------- url : str URL to download fname : str, optional filename of the downloaded file. If None, then will guess a filename from url. dirname : str, optional output directory name. If None, then guess from fname or use the current directory overwrite : bool, optional Default is false, which means skipping download if the local file exists. If true, then download the url to overwrite the local file if exists. retries : integer, default 5 The number of times to attempt the download in case of failure or non 200 return codes Returns ------- str The filename of the downloaded file """ assert retries >= 0, "Number of retries should be at least 0" if fname is None: fname = url.split('/')[-1] if dirname is None: dirname = os.path.dirname(fname) else: fname = os.path.join(dirname, fname) if dirname != "": if not os.path.exists(dirname): try: logging.info('create directory %s', dirname) os.makedirs(dirname) except OSError as exc: if exc.errno != errno.EEXIST: raise OSError('failed to create ' + dirname) if not overwrite and os.path.exists(fname): logging.info("%s exists, skipping download", fname) return fname while retries+1 > 0: # Disable pyling too broad Exception # pylint: disable=W0703 try: r = requests.get(url, stream=True) assert r.status_code == 200, f"failed to open {url}" with open(fname, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks f.write(chunk) break except Exception as e: retries -= 1 if retries <= 0: raise e print("download failed, retrying, {} attempt{} left" .format(retries, 's' if retries > 1 else '')) logging.info("downloaded %s into %s successfully", url, fname) return fname
[docs]def get_mnist(path='data'): """Download and load the MNIST dataset Parameters ---------- path : str Path in which to save the files. Returns ------- dict A dict containing the data. """ def read_data(label_url, image_url): if not os.path.isdir(path): os.makedirs(path) with gzip.open(mx.gluon.utils.download(label_url, path=path)) as flbl: struct.unpack(">II", flbl.read(8)) label = np.frombuffer(flbl.read(), dtype=np.int8) with gzip.open(mx.gluon.utils.download(image_url, path=path), 'rb') as fimg: _, _, rows, cols = struct.unpack(">IIII", fimg.read(16)) image = np.frombuffer(fimg.read(), dtype=np.uint8).reshape(len(label), rows, cols) image = image.reshape(image.shape[0], 1, 28, 28).astype(np.float32)/255 return (label, image) # changed to mxnet.io for more stable hosting url_path = 'https://repo.mxnet.io/gluon/dataset/mnist/' (train_lbl, train_img) = read_data( url_path+'train-labels-idx1-ubyte.gz', url_path+'train-images-idx3-ubyte.gz') (test_lbl, test_img) = read_data( url_path+'t10k-labels-idx1-ubyte.gz', url_path+'t10k-images-idx3-ubyte.gz') return {'train_data':train_img, 'train_label':train_lbl, 'test_data':test_img, 'test_label':test_lbl}
[docs]def get_mnist_ubyte(path='data'): """Downloads ubyte version of the MNIST dataset into a directory in the current directory with the name `data` and extracts all files in the zip archive to this directory. """ if not os.path.isdir(path): os.makedirs(path) files = ['train-images-idx3-ubyte', 'train-labels-idx1-ubyte', 't10k-images-idx3-ubyte', 't10k-labels-idx1-ubyte'] if not all(os.path.exists(os.path.join(path, f)) for f in files): get_mnist(path) for f in files: ubyte_file_path = os.path.join(path, f) zip_file_path = ubyte_file_path + '.gz' with gzip.GzipFile(zip_file_path) as zf: with open(ubyte_file_path, 'wb') as ubyte_file: ubyte_file.write(zf.read())
[docs]def get_cifar10(path='data'): """Downloads CIFAR10 dataset into a directory in the current directory with the name `data`, and then extracts all files into the directory `data/cifar`. """ if not os.path.isdir(path): os.makedirs(path) if (not os.path.exists(os.path.join(path, 'cifar', 'train.rec'))) or \ (not os.path.exists(os.path.join(path, 'cifar', 'test.rec'))) or \ (not os.path.exists(os.path.join(path, 'cifar', 'train.lst'))) or \ (not os.path.exists(os.path.join(path, 'cifar', 'test.lst'))): url = 'https://repo.mxnet.io/gluon/dataset/cifar10/cifar10-b9ac2870.zip' sha1 = 'b9ac287012f2dad9dfb49d8271c39ecdd7db376c' zip_file_path = mx.gluon.utils.download(url, path=path, sha1_hash=sha1, verify_ssl=False) with zipfile.ZipFile(zip_file_path) as zf: zf.extractall(path)
[docs]def get_mnist_iterator(batch_size, input_shape, num_parts=1, part_index=0, path='data'): """Returns training and validation iterators for MNIST dataset """ get_mnist_ubyte(path) flat = len(input_shape) != 3 train_dataiter = mx.io.MNISTIter( image=os.path.join(path, "train-images-idx3-ubyte"), label=os.path.join(path, "train-labels-idx1-ubyte"), input_shape=input_shape, batch_size=batch_size, shuffle=True, flat=flat, num_parts=num_parts, part_index=part_index) val_dataiter = mx.io.MNISTIter( image=os.path.join(path, "t10k-images-idx3-ubyte"), label=os.path.join(path, "t10k-labels-idx1-ubyte"), input_shape=input_shape, batch_size=batch_size, flat=flat, num_parts=num_parts, part_index=part_index) return (train_dataiter, val_dataiter)
[docs]def get_bz2_data(data_dir, data_name, url, data_origin_name): """Download and extract bz2 data. Parameters ---------- data_dir : str Absolute or relative path of the directory name to store bz2 files data_name : str Name of the output file in which bz2 contents will be extracted url : str URL to download data from data_origin_name : str Name of the downloaded b2 file Examples -------- >>> get_bz2_data("data_dir", "kdda.t", "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2", "kdda.t.bz2") """ data_name = os.path.join(data_dir, data_name) data_origin_name = os.path.join(data_dir, data_origin_name) if not os.path.exists(data_name): download(url, fname=data_origin_name, dirname=data_dir, overwrite=False) bz_file = bz2.BZ2File(data_origin_name, 'rb') with open(data_name, 'wb') as fout: for line in bz_file: fout.write(line) bz_file.close() os.remove(data_origin_name)
[docs]def same_array(array1, array2): """Check whether two NDArrays sharing the same memory block Parameters ---------- array1 : NDArray First NDArray to be checked array2 : NDArray Second NDArray to be checked Returns ------- bool Whether two NDArrays share the same memory """ array1[:] += 1 if not same(array1.asnumpy(), array2.asnumpy()): array1[:] -= 1 return False array1[:] -= 1 return same(array1.asnumpy(), array2.asnumpy())
[docs]@contextmanager def discard_stderr(): """ Discards error output of a routine if invoked as: with discard_stderr(): ... """ with open(os.devnull, 'w') as bit_bucket: try: stderr_fileno = sys.stderr.fileno() old_stderr = os.dup(stderr_fileno) try: os.dup2(bit_bucket.fileno(), stderr_fileno) yield finally: os.dup2(old_stderr, stderr_fileno) except AttributeError: # On some systems is stderr not a file descriptor but actually a virtual pipeline # that can not be copied yield
[docs]class DummyIter(mx.io.DataIter): """A dummy iterator that always returns the same batch of data (the first data batch of the real data iter). This is usually used for speed testing. Parameters ---------- real_iter: mx.io.DataIter The real data iterator where the first batch of data comes from """ def __init__(self, real_iter): super(DummyIter, self).__init__() self.real_iter = real_iter self.provide_data = real_iter.provide_data self.provide_label = real_iter.provide_label self.batch_size = real_iter.batch_size self.the_batch = next(real_iter) def __iter__(self): return self
[docs] def next(self): """Get a data batch from iterator. The first data batch of real iter is always returned. StopIteration will never be raised. Returns ------- DataBatch The data of next batch. """ return self.the_batch
[docs]def gen_buckets_probs_with_ppf(ppf, nbuckets): """Generate the buckets and probabilities for chi_square test when the ppf (Quantile function) is specified. Parameters ---------- ppf : function The Quantile function that takes a probability and maps it back to a value. It's the inverse of the cdf function nbuckets : int size of the buckets Returns ------- buckets : list of tuple The generated buckets probs : list The generate probabilities """ assert nbuckets > 0 probs = [1.0 / nbuckets for _ in range(nbuckets)] buckets = [(ppf(i / float(nbuckets)), ppf((i + 1) / float(nbuckets))) for i in range(nbuckets)] return buckets, probs
[docs]def mean_check(generator, mu, sigma, nsamples=1000000): """Test the generator by matching the mean. We test the sample mean by checking if it falls inside the range (mu - 3 * sigma / sqrt(n), mu + 3 * sigma / sqrt(n)) References:: @incollection{goucher2009beautiful, title={Beautiful Testing: Leading Professionals Reveal How They Improve Software}, author={Goucher, Adam and Riley, Tim}, year={2009}, chapter=10 } Examples:: generator = lambda x: np.random.normal(0, 1.0, size=x) mean_check_ret = mean_check(generator, 0, 1.0) Parameters ---------- generator : function The generator function. It's expected to generate N i.i.d samples by calling generator(N). mu : float sigma : float nsamples : int Returns ------- ret : bool Whether the mean test succeeds """ samples = np.array(generator(nsamples)) sample_mean = samples.mean() ret = (sample_mean > mu - 3 * sigma / np.sqrt(nsamples)) and\ (sample_mean < mu + 3 * sigma / np.sqrt(nsamples)) return ret
[docs]def get_im2rec_path(home_env="MXNET_HOME"): """Get path to the im2rec.py tool Parameters ---------- home_env : str Env variable that holds the path to the MXNET folder Returns ------- str The path to im2rec.py """ # Check first if the path to MXNET is passed as an env variable if home_env in os.environ: mxnet_path = os.environ[home_env] else: # Else use currently imported mxnet as reference mxnet_path = os.path.dirname(mx.__file__) # If MXNet was installed through pip, the location of im2rec.py im2rec_path = os.path.join(mxnet_path, 'tools', 'im2rec.py') if os.path.isfile(im2rec_path): return im2rec_path # If MXNet has been built locally im2rec_path = os.path.join(mxnet_path, '..', '..', 'tools', 'im2rec.py') if os.path.isfile(im2rec_path): return im2rec_path raise IOError('Could not find path to tools/im2rec.py')
[docs]def var_check(generator, sigma, nsamples=1000000): """Test the generator by matching the variance. It will need a large number of samples and is not recommended to use We test the sample variance by checking if it falls inside the range (sigma^2 - 3 * sqrt(2 * sigma^4 / (n-1)), sigma^2 + 3 * sqrt(2 * sigma^4 / (n-1))) References:: @incollection{goucher2009beautiful, title={Beautiful Testing: Leading Professionals Reveal How They Improve Software}, author={Goucher, Adam and Riley, Tim}, year={2009}, chapter=10 } Examples:: generator = lambda x: np.random.normal(0, 1.0, size=x) var_check_ret = var_check(generator, 0, 1.0) Parameters ---------- generator : function The generator function. It's expected to generate N i.i.d samples by calling generator(N). sigma : float nsamples : int Returns ------- ret : bool Whether the variance test succeeds """ samples = np.array(generator(nsamples)) sample_var = samples.var(ddof=1) ret = (sample_var > sigma ** 2 - 3 * np.sqrt(2 * sigma ** 4 / (nsamples - 1))) and\ (sample_var < sigma ** 2 + 3 * np.sqrt(2 * sigma ** 4 / (nsamples - 1))) return ret
[docs]def chi_square_check(generator, buckets, probs, nsamples=1000000): """Run the chi-square test for the generator. The generator can be both continuous and discrete. If the generator is continuous, the buckets should contain tuples of (range_min, range_max) \ and the probs should be the corresponding ideal probability within the specific ranges. \ Otherwise, the buckets should contain all the possible values generated over the discrete distribution and the \ probs should be groud-truth probability. Usually the user is required to specify the probs parameter. After obtaining the p value, we could further use the standard p > 0.05 (alpha) threshold to get \ the final result. Examples:: buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.norm.ppf(x, 0, 1), 5) generator = lambda x: np.random.normal(0, 1.0, size=x) p = chi_square_check(generator=generator, buckets=buckets, probs=probs) assert(p > 0.05) Parameters ---------- generator: function A function that is assumed to generate i.i.d samples from a specific distribution. generator(N) should generate N random samples. buckets: list of tuple or list of number The buckets to run the chi-square the test. Make sure that the buckets cover the whole range of the distribution. Also, the buckets must be in ascending order and have no intersection probs: list or tuple The ground-truth probability of the random value fall in a specific bucket. nsamples:int The number of samples to generate for the testing Returns ------- p : float p value that the generator has the expected distribution. A higher value indicates a larger confidence obs_freq : list Observed frequency of buckets expected_freq : list The expected (ground-truth) frequency of the buckets """ if not ss: raise ImportError("scipy is not available." " Please check if the scipy python bindings are installed.") assert isinstance(buckets, list) samples = generator(nsamples) assert len(probs) == len(buckets) if isinstance(buckets[0], (list, tuple)): # Check whether the buckets are valid and fill them into a npy array continuous_dist = True buckets_npy = np.zeros((len(buckets) * 2, ), dtype=np.float32) for i, _ in enumerate(buckets): assert(buckets[i][0] <= buckets[i][1]) if i < len(buckets) - 1: assert(buckets[i][1] <= buckets[i + 1][0]) buckets_npy[i * 2] = buckets[i][0] buckets_npy[i * 2 + 1] = buckets[i][1] else: continuous_dist = False expected_freq = (nsamples * np.array(probs, dtype=np.float32)).astype(np.int32) if continuous_dist: sample_bucket_ids = np.searchsorted(buckets_npy, samples, side='right') else: sample_bucket_ids = np.array(samples) if continuous_dist: sample_bucket_ids = sample_bucket_ids // 2 obs_freq = np.zeros(shape=len(buckets), dtype=np.int) for i, _ in enumerate(buckets): if continuous_dist: obs_freq[i] = (sample_bucket_ids == i).sum() else: obs_freq[i] = (sample_bucket_ids == buckets[i]).sum() _, p = ss.chisquare(f_obs=obs_freq, f_exp=expected_freq) return p, obs_freq, expected_freq
[docs]def verify_generator(generator, buckets, probs, nsamples=1000000, nrepeat=5, success_rate=0.2, alpha=0.05): """Verify whether the generator is correct using chi-square testing. The test is repeated for "nrepeat" times and we check if the success rate is above the threshold (25% by default). Parameters ---------- generator: function A function that is assumed to generate i.i.d samples from a specific distribution. generator(N) should generate N random samples. buckets: list of tuple or list of number The buckets to run the chi-square the test. Make sure that the buckets cover the whole range of the distribution. Also, the buckets must be in ascending order and have no intersection probs: list or tuple The ground-truth probability of the random value fall in a specific bucket. nsamples: int The number of samples to generate for the testing nrepeat: int The times to repeat the test success_rate: float The desired success rate alpha: float The desired threshold for type-I error i.e. when a true null hypothesis is rejected Returns ------- cs_ret_l: list The p values of the chi-square test. """ cs_ret_l = [] obs_freq_l = [] expected_freq_l = [] for _ in range(nrepeat): cs_ret, obs_freq, expected_freq = chi_square_check(generator=generator, buckets=buckets, probs=probs, nsamples=nsamples) cs_ret_l.append(cs_ret) obs_freq_l.append(obs_freq) expected_freq_l.append(expected_freq) success_num = (np.array(cs_ret_l) > alpha).sum() if success_num < nrepeat * success_rate: raise AssertionError(f"Generator test fails, Chi-square p={str(cs_ret_l)}, " f"obs_freq={str(obs_freq_l)}, expected_freq={str(expected_freq_l)}." f"\nbuckets={str(buckets)}, probs={str(probs)}") return cs_ret_l
[docs]def compare_ndarray_tuple(t1, t2, rtol=None, atol=None): """Compare ndarray tuple.""" if t1 is None or t2 is None: return if isinstance(t1, tuple): for s1, s2 in zip(t1, t2): compare_ndarray_tuple(s1, s2, rtol, atol) else: assert_almost_equal(t1, t2, rtol=rtol, atol=atol)
[docs]def compare_optimizer(opt1, opt2, shapes, dtype, w_stype='default', g_stype='default', rtol=1e-4, atol=1e-5, compare_states=True): """Compare opt1 and opt2.""" w1_list, w2_list = [], [] g1_list, g2_list = [], [] s1_list, s2_list = [], [] for i, shape in enumerate(shapes): if w_stype == 'default': w2 = mx.random.uniform(shape=shape, ctx=default_device(), dtype=dtype) w1 = w2.copyto(default_device()) elif w_stype in ('row_sparse', 'csr'): w2 = rand_ndarray(shape, w_stype, density=1, dtype=dtype) w1 = w2.copyto(default_device()).tostype('default') else: raise Exception("type not supported yet") if g_stype == 'default': g2 = mx.random.uniform(shape=shape, ctx=default_device(), dtype=dtype) g1 = g2.copyto(default_device()) elif g_stype in ('row_sparse', 'csr'): g2 = rand_ndarray(shape, g_stype, dtype=dtype) g1 = g2.copyto(default_device()).tostype('default') else: raise Exception("type not supported yet") s1 = opt1.create_state_multi_precision(i, w1) s2 = opt2.create_state_multi_precision(i, w2) if compare_states: compare_ndarray_tuple(s1, s2) w1_list.append(w1) w2_list.append(w2) g1_list.append(g1) g2_list.append(g2) s1_list.append(s1) s2_list.append(s2) indices = list(range(len(shapes))) opt1.update_multi_precision(indices, w1_list, g1_list, s1_list) opt2.update_multi_precision(indices, w2_list, g2_list, s2_list) if compare_states: compare_ndarray_tuple(tuple(s1_list), tuple(s2_list), rtol=rtol, atol=atol) compare_ndarray_tuple(tuple(w1_list), tuple(w2_list), rtol=rtol, atol=atol)
[docs]def compare_optimizer_noise_seeded(opt1, opt2, shapes, dtype, noise_seed, w_stype='default', g_stype='default', rtol=1e-4, atol=1e-5, compare_states=True): """Compare opt1 and opt2 with the added functionality that the seed for generating random noise in the SGLD optimizer update is set so that the same noise is used in opt1 and opt2. """ w1_list, w2_list = [], [] g1_list, g2_list = [], [] s1_list, s2_list = [], [] for i, shape in enumerate(shapes): if w_stype == 'default': w2 = mx.random.uniform(shape=shape, ctx=default_device(), dtype=dtype) w1 = w2.copyto(default_device()) elif w_stype in ('row_sparse', 'csr'): w2 = rand_ndarray(shape, w_stype, density=1, dtype=dtype) w1 = w2.copyto(default_device()).tostype('default') else: raise Exception("type not supported yet") if g_stype == 'default': g2 = mx.random.uniform(shape=shape, ctx=default_device(), dtype=dtype) g1 = g2.copyto(default_device()) elif g_stype in ('row_sparse', 'csr'): g2 = rand_ndarray(shape, g_stype, dtype=dtype) g1 = g2.copyto(default_device()).tostype('default') else: raise Exception("type not supported yet") s1 = opt1.create_state_multi_precision(i, w1) s2 = opt2.create_state_multi_precision(i, w2) if compare_states: compare_ndarray_tuple(s1, s2) w1_list.append(w1) w2_list.append(w2) g1_list.append(g1) g2_list.append(g2) s1_list.append(s1) s2_list.append(s2) indices = list(range(len(shapes))) # set seed for Gaussian noise replication mx.random.seed(noise_seed) opt1.update_multi_precision(indices, w1_list, g1_list, s1_list) mx.random.seed(noise_seed) opt2.update_multi_precision(indices, w2_list, g2_list, s2_list) if compare_states: compare_ndarray_tuple(tuple(s1_list), tuple(s2_list), rtol=rtol, atol=atol) compare_ndarray_tuple(tuple(w1_list), tuple(w2_list), rtol=rtol, atol=atol)
[docs]def same_symbol_structure(sym1, sym2): """Compare two symbols to check if they have the same computation graph structure. Returns true if operator corresponding to a particular node id is same in both symbols for all nodes """ conf = json.loads(sym1.tojson()) nodes = conf["nodes"] conf2 = json.loads(sym2.tojson()) nodes2 = conf2["nodes"] for node1, node2 in zip(nodes, nodes2): if node1["op"] != node2["op"]: return False return True
[docs]@contextmanager def environment(*args): """ Environment variable setter and unsetter via `with` idiom. Takes a specification of env var names and desired values and adds those settings to the environment in advance of running the body of the `with` statement. The original environment state is restored afterwards, even if exceptions are raised in the `with` body. Parameters ---------- args: if 2 args are passed: name, desired_value strings of the single env var to update, or if 1 arg is passed: a dict of name:desired_value for env var's to update """ # On Linux, env var changes made through python's os.environ are seen # by the backend. On Windows though, the C runtime gets a snapshot # of the environment that cannot be altered by os.environ. Here we # check, using a wrapped version of the backend's getenv(), that # the desired env var value is seen by the backend, and otherwise use # a wrapped setenv() to establish that value in the backend. # Also on Windows, a set env var can never have the value '', since # the command 'set FOO= ' is used to unset the variable. Perhaps # as a result, the wrapped dmlc::GetEnv() routine returns the same # value for unset variables and those set to ''. As a result, we # ignore discrepancy. def validate_backend_setting(name, value, can_use_setenv=True): backend_value = getenv(name) if value == backend_value or \ value == '' and backend_value is None and platform.system() == 'Windows': return if not can_use_setenv: raise RuntimeError('Could not set env var {}={} within C Runtime'.format(name, value)) setenv(name, value) validate_backend_setting(name, value, can_use_setenv=False) # Core routine to alter environment from a dict of env_var_name, env_var_value pairs def set_environ(env_var_dict): for env_var_name, env_var_value in env_var_dict.items(): if env_var_value is None: os.environ.pop(env_var_name, None) else: os.environ[env_var_name] = env_var_value validate_backend_setting(env_var_name, env_var_value) # Create env_var name:value dict from the two calling methods of this routine if len(args) == 1 and isinstance(args[0], dict): env_vars = args[0] else: assert len(args) == 2, 'Expecting one dict arg or two args: env var name and value' env_vars = {args[0]: args[1]} # Take a snapshot of the existing environment variable state # for those variables to be changed. get() return None for unset keys. snapshot = {x: os.environ.get(x) for x in env_vars.keys()} # Alter the environment per the env_vars dict set_environ(env_vars) # Now run the wrapped code try: yield finally: # the backend engines may still be referencing the changed env var state mx.nd.waitall() # reinstate original env_var state per the snapshot taken earlier set_environ(snapshot)
[docs]def collapse_sum_like(a, shape): """Given `a` as a numpy ndarray, perform reduce_sum on `a` over the axes that do not exist in `shape`. Note that an ndarray with `shape` must be broadcastable to `a`. """ assert len(a.shape) >= len(shape) if np.prod(shape) == 0 or a.size == 0: return np.zeros(shape, dtype=a.dtype) axes = [] ndim_diff = len(a.shape) - len(shape) for i in range(ndim_diff): axes.append(i) for i, s in enumerate(shape): if s != a.shape[i+ndim_diff]: assert s == 1 axes.append(i+ndim_diff) return np.sum(a, axis=tuple(axes)).reshape(shape)
[docs]def is_cd_run(): """Checks if the test is running as part of a Continuous Delivery run""" return os.environ.get("CD_JOB", 0) == "1"
_features = Features()
[docs]def has_tvm_ops(): """Returns True if MXNet is compiled with TVM generated operators. If current ctx is GPU, it only returns True for CUDA compute capability > 52 where FP16 is supported. """ built_with_tvm_op = _features.is_enabled("TVM_OP") device = current_device() if device.device_type == 'gpu': try: cc = get_cuda_compute_capability(device) except: # pylint: disable=bare-except print('Failed to get CUDA compute capability for context {}. The operators ' 'built with USE_TVM_OP=1 will not be run in unit tests.'.format(device)) return False print('Cuda arch compute capability: sm_{}'.format(str(cc))) return built_with_tvm_op and cc >= 53 return built_with_tvm_op
[docs]def is_op_runnable(): """Returns True for all CPU tests. Returns True for GPU tests that are either of the following. 1. Built with USE_TVM_OP=0. 2. Built with USE_TVM_OP=1, but with compute capability >= 53. """ device = current_device() if device.device_type == 'gpu': if not _features.is_enabled("TVM_OP"): return True else: try: cc = get_cuda_compute_capability(device) except: # pylint: disable=bare-except print('Failed to get CUDA compute capability for context {}. The operators ' 'built with USE_TVM_OP=1 will not be run in unit tests.'.format(device)) return False print('Cuda arch compute capability: sm_{}'.format(str(cc))) return cc >= 53 return True
[docs]@use_np def check_gluon_hybridize_consistency(net_builder, data_l, numpy_func=None, test_grad=True, rtol=1E-4, atol=1E-4): """Check whether a HybridBlock has consistent output when hybridized or not hybridized The network should not contain any random number generators. Parameters ---------- net_builder : function The builder of the HybridBlock that we are going to check the consistency. Inside the implementation, we will call net_builder() to construct the hybrid block. Also, the net_builder will need to support specifying the params data_l : list of mx.np.ndarray List of input ndarrays. numpy_func : function, optional The ground truth numpy function that has the same functionality as net_builder(). Default None. test_grad : bool, optional Whether to test the consistency of the gradient. Default True. rtol : float, optional The relative error tolerance, default 1E-4. Default 1E-4. atol : float, optional The absolute error tolerance, default 1E-4. Default 1E-4. """ saved_out_np = None saved_grad_np_l = None params_init = None use_autograd_flags = [False, True] if test_grad else [False] for hybridize in [False, True]: for use_autograd in use_autograd_flags: net = net_builder() if params_init is None: net.initialize() else: net.load_dict(params_init) if hybridize: net.hybridize() in_data_l = [ele.copy() for ele in data_l] if use_autograd: for ele in in_data_l: ele.attach_grad() with mx.autograd.record(): out = net(*in_data_l) out.backward(out) else: out = net(*in_data_l) if params_init is None: # Deferred initialization finished params_init = {k: v.data().asnumpy() for k, v in net.collect_params().items()} if saved_out_np is None: saved_out_np = out.asnumpy() else: # Check for correctness assert_almost_equal(out.asnumpy(), saved_out_np, rtol=rtol, atol=atol) if use_autograd: if saved_grad_np_l is None: saved_grad_np_l = [ele.grad.asnumpy() for ele in in_data_l] else: # Check for correctness for data, saved_grad_np in zip(in_data_l, saved_grad_np_l): assert_almost_equal(data.grad.asnumpy(), saved_grad_np, rtol=rtol, atol=atol) if numpy_func is not None: numpy_out = numpy_func(*[ele.asnumpy() for ele in data_l]) assert_almost_equal(saved_out_np, numpy_out, rtol=rtol, atol=atol)
[docs]def new_matrix_with_real_eigvals_2d(n): """Generate a well-conditioned matrix with small real eigenvalues.""" shape = (n, n) q = np.ones(shape) while 1: D = np.diag(np.random.uniform(-1.0, 1.0, shape[-1])) I = np.eye(shape[-1]).reshape(shape) v = np.random.uniform(-1., 1., shape[-1]).reshape(shape[:-1] + (1,)) v = v / np.linalg.norm(v, axis=-2, keepdims=True) v_T = np.swapaxes(v, -1, -2) U = I - 2 * np.matmul(v, v_T) q = np.matmul(U, D) if (np.linalg.cond(q, 2) < 3): break D = np.diag(np.random.uniform(-10.0, 10.0, n)) q_inv = np.linalg.inv(q) return np.matmul(np.matmul(q_inv, D), q)
[docs]def new_matrix_with_real_eigvals_nd(shape): """Generate well-conditioned matrices with small real eigenvalues.""" n = int(np.prod(shape[:-2])) if len(shape) > 2 else 1 return np.array([new_matrix_with_real_eigvals_2d(shape[-1]) for i in range(n)]).reshape(shape)
[docs]def new_orthonormal_matrix_2d(n): """Generate a orthonormal matrix.""" x = np.random.randn(n, n) x_trans = x.T sym_mat = np.matmul(x_trans, x) return np.linalg.qr(sym_mat)[0]
[docs]def new_sym_matrix_with_real_eigvals_2d(n): """Generate a sym matrix with real eigenvalues.""" q = new_orthonormal_matrix_2d(n) D = np.diag(np.random.uniform(-10.0, 10.0, n)) return np.matmul(np.matmul(q.T, D), q)
[docs]def new_sym_matrix_with_real_eigvals_nd(shape): """Generate sym matrices with real eigenvalues.""" n = int(np.prod(shape[:-2])) if len(shape) > 2 else 1 return np.array([new_sym_matrix_with_real_eigvals_2d(shape[-1]) for i in range(n)]).reshape(shape)