Source code for mxnet.test_utils

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""Tools for testing."""
# pylint: disable=too-many-lines
from __future__ import absolute_import, print_function, division
import time
import gzip
import struct
import traceback
import numbers
import subprocess
import sys
import os
import errno
import logging
import bz2
import zipfile
from contextlib import contextmanager
import numpy as np
import numpy.testing as npt
import numpy.random as rnd
try:
    import scipy.stats as ss
except ImportError:
    ss = None
try:
    import requests
except ImportError:
    # in rare cases requests may be not installed
    pass
import mxnet as mx
from .context import Context, current_context
from .ndarray.ndarray import _STORAGE_TYPE_STR_TO_ID
from .ndarray import array
from .symbol import Symbol


[docs]def default_context(): """Get default context for regression test.""" # _TODO: get context from environment variable to support # testing with GPUs return current_context()
[docs]def set_default_context(ctx): """Set default context.""" Context._default_ctx.value = ctx
[docs]def default_dtype(): """Get default data type for regression test.""" # _TODO: get default dtype from environment variable return np.float32
[docs]def get_atol(atol=None): """Get default numerical threshold for regression test.""" # _TODO: get from env variable, different threshold might # be needed for different device and dtype return 1e-20 if atol is None else atol
[docs]def get_rtol(rtol=None): """Get default numerical threshold for regression test.""" # _TODO: get from env variable, different threshold might # be needed for different device and dtype return 1e-5 if rtol is None else rtol
[docs]def random_arrays(*shapes): """Generate some random numpy arrays.""" arrays = [np.random.randn(*s).astype(default_dtype()) for s in shapes] if len(arrays) == 1: return arrays[0] return arrays
[docs]def random_sample(population, k): """Return a k length list of the elements chosen from the population sequence.""" assert 0 <= k <= len(population) population_copy = population[:] np.random.shuffle(population_copy) return population_copy[0:k]
def _validate_csr_generation_inputs(num_rows, num_cols, density, distribution="uniform"): """Validates inputs for csr generation helper functions """ total_nnz = int(num_rows * num_cols * density) if density < 0 or density > 1: raise ValueError("density has to be between 0 and 1") if num_rows <= 0 or num_cols <= 0: raise ValueError("num_rows or num_cols should be greater than 0") if distribution == "powerlaw": if total_nnz < 2 * num_rows: raise ValueError("not supported for this density: %s" " for this shape (%s, %s)" " Please keep :" " num_rows * num_cols * density >= 2 * num_rows" % (density, num_rows, num_cols))
[docs]def shuffle_csr_column_indices(csr): """Shuffle CSR column indices per row This allows validation of unordered column indices, which is not a requirement for a valid CSR matrix """ row_count = len(csr.indptr) - 1 for i in range(row_count): start_index = csr.indptr[i] end_index = csr.indptr[i + 1] sublist = np.array(csr.indices[start_index : end_index]) np.random.shuffle(sublist) csr.indices[start_index : end_index] = sublist
def _get_uniform_dataset_csr(num_rows, num_cols, density=0.1, dtype=None, data_init=None, shuffle_csr_indices=False): """Returns CSRNDArray with uniform distribution This generates a csr matrix with totalnnz unique randomly chosen numbers from num_rows*num_cols and arranges them in the 2d array in the following way: row_index = (random_number_generated / num_rows) col_index = random_number_generated - row_index * num_cols """ _validate_csr_generation_inputs(num_rows, num_cols, density, distribution="uniform") try: from scipy import sparse as spsp csr = spsp.rand(num_rows, num_cols, density, dtype=dtype, format="csr") if data_init is not None: csr.data.fill(data_init) if shuffle_csr_indices is True: shuffle_csr_column_indices(csr) result = mx.nd.sparse.csr_matrix((csr.data, csr.indices, csr.indptr), shape=(num_rows, num_cols), dtype=dtype) except ImportError: assert(data_init is None), \ "data_init option is not supported when scipy is absent" assert(not shuffle_csr_indices), \ "shuffle_csr_indices option is not supported when scipy is absent" # scipy not available. try to generate one from a dense array dns = mx.nd.random.uniform(shape=(num_rows, num_cols), dtype=dtype) masked_dns = dns * (dns < density) result = masked_dns.tostype('csr') return result def _get_powerlaw_dataset_csr(num_rows, num_cols, density=0.1, dtype=None): """Returns CSRNDArray with powerlaw distribution with exponentially increasing number of non zeros in each row. Not supported for cases where total_nnz < 2*num_rows. This is because the algorithm first tries to ensure that there are rows with no zeros by putting non zeros at beginning of each row. """ _validate_csr_generation_inputs(num_rows, num_cols, density, distribution="powerlaw") total_nnz = int(num_rows * num_cols * density) unused_nnz = total_nnz output_arr = np.zeros((num_rows, num_cols), dtype=dtype) # Start with ones on each row so that no row is empty for row in range(num_rows): output_arr[row][0] = 1 + rnd.uniform(0.001, 2) unused_nnz = unused_nnz - 1 if unused_nnz <= 0: return mx.nd.array(output_arr).tostype("csr") # Populate rest of matrix with 2^i items in ith row. # if we have used all total nnz return the sparse matrix # else if we reached max column size then fill up full columns until we use all nnz col_max = 2 for row in range(num_rows): col_limit = min(num_cols, col_max) # In case col_limit reached assign same value to all elements, which is much faster if col_limit == num_cols and unused_nnz > col_limit: output_arr[row] = 1 + rnd.uniform(0.001, 2) unused_nnz = unused_nnz - col_limit + 1 if unused_nnz <= 0: return mx.nd.array(output_arr).tostype("csr") else: continue for col_index in range(1, col_limit): output_arr[row][col_index] = 1 + rnd.uniform(0.001, 2) unused_nnz = unused_nnz - 1 if unused_nnz <= 0: return mx.nd.array(output_arr).tostype("csr") col_max = col_max * 2 if unused_nnz > 0: raise ValueError("not supported for this density: %s" " for this shape (%s,%s)" % (density, num_rows, num_cols)) else: return mx.nd.array(output_arr).tostype("csr")
[docs]def assign_each(the_input, function): """Return ndarray composed of passing each array value through some function""" if function is None: output = np.array(the_input) else: it_input = np.nditer(the_input, flags=['f_index']) output = np.zeros(the_input.shape) it_out = np.nditer(output, flags=['f_index'], op_flags=['writeonly']) while not it_input.finished: val_input = it_input[0] it_out[0] = function(val_input) it_input.iternext() it_out.iternext() return output
[docs]def assign_each2(input1, input2, function): """Return ndarray composed of passing two array values through some function""" if function is None: output = np.array(input1) else: assert input1.shape == input2.shape it_input1 = np.nditer(input1, flags=['f_index']) it_input2 = np.nditer(input2, flags=['f_index']) output = np.zeros(input1.shape) it_out = np.nditer(output, flags=['f_index'], op_flags=['writeonly']) while not it_input1.finished: val_input1 = it_input1[0] val_input2 = it_input2[0] it_out[0] = function(val_input1, val_input2) it_input1.iternext() it_input2.iternext() it_out.iternext() return output
[docs]def rand_sparse_ndarray(shape, stype, density=None, dtype=None, distribution=None, data_init=None, rsp_indices=None, modifier_func=None, shuffle_csr_indices=False): """Generate a random sparse ndarray. Returns the ndarray, value(np) and indices(np) Parameters ---------- shape: list or tuple stype: str valid values: "csr" or "row_sparse" density: float, optional should be between 0 and 1 distribution: str, optional valid values: "uniform" or "powerlaw" dtype: numpy.dtype, optional default value is None Returns ------- Result of type CSRNDArray or RowSparseNDArray Examples -------- Below is an example of the powerlaw distribution with csr as the stype. It calculates the nnz using the shape and density. It fills up the ndarray with exponentially increasing number of elements. If there are enough unused_nnzs, n+1th row will have twice more nnzs compared to nth row. else, remaining unused_nnzs will be used in n+1th row If number of cols is too small and we have already reached column size it will fill up all following columns in all followings rows until we reach the required density. >>> csr_arr, _ = rand_sparse_ndarray(shape=(5, 16), stype="csr", density=0.50, distribution="powerlaw") >>> indptr = csr_arr.indptr.asnumpy() >>> indices = csr_arr.indices.asnumpy() >>> data = csr_arr.data.asnumpy() >>> row2nnz = len(data[indptr[1]:indptr[2]]) >>> row3nnz = len(data[indptr[2]:indptr[3]]) >>> assert(row3nnz == 2*row2nnz) >>> row4nnz = len(data[indptr[3]:indptr[4]]) >>> assert(row4nnz == 2*row3nnz) """ density = rnd.rand() if density is None else density dtype = default_dtype() if dtype is None else dtype distribution = "uniform" if distribution is None else distribution if stype == 'row_sparse': assert (distribution == "uniform"), \ "Distribution %s not supported for row_sparse" % (distribution) # sample index if rsp_indices is not None: indices = rsp_indices assert(len(indices) <= shape[0]) else: idx_sample = rnd.rand(shape[0]) indices = np.argwhere(idx_sample < density).flatten() if indices.shape[0] == 0: result = mx.nd.zeros(shape, stype='row_sparse', dtype=dtype) return result, (np.array([], dtype=dtype), np.array([])) # generate random values val = rnd.rand(indices.shape[0], *shape[1:]).astype(dtype) # Allow caller to override or adjust random values if data_init is not None: val.fill(data_init) if modifier_func is not None: val = assign_each(val, modifier_func) arr = mx.nd.sparse.row_sparse_array((val, indices), shape=shape, dtype=dtype) return arr, (val, indices) elif stype == 'csr': assert len(shape) == 2 if distribution == "uniform": csr = _get_uniform_dataset_csr(shape[0], shape[1], density, data_init=data_init, shuffle_csr_indices=shuffle_csr_indices, dtype=dtype) return csr, (csr.indptr, csr.indices, csr.data) elif distribution == "powerlaw": csr = _get_powerlaw_dataset_csr(shape[0], shape[1], density=density, dtype=dtype) return csr, (csr.indptr, csr.indices, csr.data) else: assert(False), "Distribution not supported: %s" % (distribution) return False else: assert(False), "unknown storage type" return False
def rand_ndarray(shape, stype='default', density=None, dtype=None, modifier_func=None, shuffle_csr_indices=False, distribution=None): if stype == 'default': arr = mx.nd.array(random_arrays(shape), dtype=dtype) else: arr, _ = rand_sparse_ndarray(shape, stype, density=density, modifier_func=modifier_func, dtype=dtype, shuffle_csr_indices=shuffle_csr_indices, distribution=distribution) return arr
[docs]def create_sparse_array(shape, stype, data_init=None, rsp_indices=None, dtype=None, modifier_func=None, density=.5, shuffle_csr_indices=False): """Create a sparse array, For Rsp, assure indices are in a canonical format""" if stype == 'row_sparse': if rsp_indices is not None: arr_indices = np.asarray(rsp_indices) arr_indices.sort() else: arr_indices = None arr_data, (_, _) = rand_sparse_ndarray(shape, stype, density=density, data_init=data_init, rsp_indices=arr_indices, dtype=dtype, modifier_func=modifier_func) elif stype == 'csr': arr_data, (_, _, _) = rand_sparse_ndarray(shape, stype, density=density, data_init=data_init, dtype=dtype, modifier_func=modifier_func, shuffle_csr_indices=shuffle_csr_indices) else: msg = "Unknown storage type: " + stype raise AssertionError(msg) return arr_data
[docs]def create_sparse_array_zd(shape, stype, density, data_init=None, rsp_indices=None, dtype=None, modifier_func=None, shuffle_csr_indices=False): """Create sparse array, using only rsp_indices to determine density""" if stype == 'row_sparse': density = 0.0 if rsp_indices is not None: assert len(rsp_indices) <= shape[0] return create_sparse_array(shape, stype, data_init=data_init, rsp_indices=rsp_indices, dtype=dtype, modifier_func=modifier_func, density=density, shuffle_csr_indices=shuffle_csr_indices)
def rand_shape_2d(dim0=10, dim1=10): return rnd.randint(1, dim0 + 1), rnd.randint(1, dim1 + 1) def rand_shape_3d(dim0=10, dim1=10, dim2=10): return rnd.randint(1, dim0 + 1), rnd.randint(1, dim1 + 1), rnd.randint(1, dim2 + 1) def rand_shape_nd(num_dim, dim=10): return tuple(rnd.randint(1, dim+1, size=num_dim))
[docs]def np_reduce(dat, axis, keepdims, numpy_reduce_func): """Compatible reduce for old version of NumPy. Parameters ---------- dat : np.ndarray Same as NumPy. axis : None or int or list-like Same as NumPy. keepdims : bool Same as NumPy. numpy_reduce_func : function A NumPy reducing function like ``np.sum`` or ``np.max``. """ if isinstance(axis, int): axis = [axis] else: axis = list(axis) if axis is not None else range(len(dat.shape)) ret = dat for i in reversed(sorted(axis)): ret = numpy_reduce_func(ret, axis=i) if keepdims: keepdims_shape = list(dat.shape) for i in axis: keepdims_shape[i] = 1 ret = ret.reshape(tuple(keepdims_shape)) return ret
[docs]def find_max_violation(a, b, rtol=None, atol=None): """Finds and returns the location of maximum violation.""" rtol = get_rtol(rtol) atol = get_atol(atol) diff = np.abs(a-b) tol = atol + rtol*np.abs(b) violation = diff/(tol+1e-20) loc = np.argmax(violation) idx = np.unravel_index(loc, violation.shape) return idx, np.max(violation)
[docs]def same(a, b): """Test if two NumPy arrays are the same. Parameters ---------- a : np.ndarray b : np.ndarray """ return np.array_equal(a, b)
[docs]def almost_equal(a, b, rtol=None, atol=None, equal_nan=False): """Test if two numpy arrays are almost equal.""" # pylint: disable=unexpected-keyword-arg return np.allclose(a, b, rtol=get_rtol(rtol), atol=get_atol(atol), equal_nan=equal_nan)
# pylint: enable=unexpected-keyword-arg
[docs]def assert_almost_equal(a, b, rtol=None, atol=None, names=('a', 'b'), equal_nan=False): """Test that two numpy arrays are almost equal. Raise exception message if not. Parameters ---------- a : np.ndarray b : np.ndarray threshold : None or float The checking threshold. Default threshold will be used if set to ``None``. """ rtol = get_rtol(rtol) atol = get_atol(atol) if almost_equal(a, b, rtol, atol, equal_nan=equal_nan): return index, rel = find_max_violation(a, b, rtol, atol) np.set_printoptions(threshold=4, suppress=True) msg = npt.build_err_msg([a, b], err_msg="Error %f exceeds tolerance rtol=%f, atol=%f. " " Location of maximum error:%s, a=%f, b=%f" % (rel, rtol, atol, str(index), a[index], b[index]), names=names) raise AssertionError(msg)
[docs]def almost_equal_ignore_nan(a, b, rtol=None, atol=None): """Test that two NumPy arrays are almost equal (ignoring NaN in either array). Combines a relative and absolute measure of approximate eqality. If either the relative or absolute check passes, the arrays are considered equal. Including an absolute check resolves issues with the relative check where all array values are close to zero. Parameters ---------- a : np.ndarray b : np.ndarray rtol : None or float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float The absolute threshold. Default threshold will be used if set to ``None``. """ a = np.copy(a) b = np.copy(b) nan_mask = np.logical_or(np.isnan(a), np.isnan(b)) a[nan_mask] = 0 b[nan_mask] = 0 return almost_equal(a, b, rtol, atol)
[docs]def assert_almost_equal_ignore_nan(a, b, rtol=None, atol=None, names=('a', 'b')): """Test that two NumPy arrays are almost equal (ignoring NaN in either array). Combines a relative and absolute measure of approximate eqality. If either the relative or absolute check passes, the arrays are considered equal. Including an absolute check resolves issues with the relative check where all array values are close to zero. Parameters ---------- a : np.ndarray b : np.ndarray rtol : None or float The relative threshold. Default threshold will be used if set to ``None``. atol : None or float The absolute threshold. Default threshold will be used if set to ``None``. """ a = np.copy(a) b = np.copy(b) nan_mask = np.logical_or(np.isnan(a), np.isnan(b)) a[nan_mask] = 0 b[nan_mask] = 0 assert_almost_equal(a, b, rtol, atol, names)
[docs]def assert_exception(f, exception_type, *args, **kwargs): """Test that function f will throw an exception of type given by `exception_type`""" try: f(*args, **kwargs) assert(False) except exception_type: return
[docs]def retry(n): """Retry n times before failing for stochastic test cases.""" assert n > 0 def decorate(f): """Decorate a test case.""" def wrapper(*args, **kwargs): """Wrapper for tests function.""" for _ in range(n): try: f(*args, **kwargs) except AssertionError as e: err = e continue return raise err return wrapper return decorate
[docs]def simple_forward(sym, ctx=None, is_train=False, **inputs): """A simple forward function for a symbol. Primarily used in doctest to test the functionality of a symbol. Takes NumPy arrays as inputs and outputs are also converted to NumPy arrays. Parameters ---------- ctx : Context If ``None``, will take the default context. inputs : keyword arguments Mapping each input name to a NumPy array. Returns ------- The result as a numpy array. Multiple results will be returned as a list of NumPy arrays. """ ctx = ctx or default_context() inputs = {k: array(v) for k, v in inputs.items()} exe = sym.bind(ctx, args=inputs) exe.forward(is_train=is_train) outputs = [x.asnumpy() for x in exe.outputs] if len(outputs) == 1: outputs = outputs[0] return outputs
def _parse_location(sym, location, ctx, dtype=default_dtype()): """Parses the given location to a dictionary. Arguments of the provided op `sym` are used as dictionary keys and elements of `location` are used as values. Parameters ---------- sym : Symbol Symbol containing op location : list or tuple or dict Argument values location - if type is list or tuple of `np.ndarray` inner elements are arrays correspoding to ``sym.list_arguments()``. - if type is dict of str -> `np.ndarray` maps the name of arguments to the corresponding `np.ndarray`. *In either case, value of all the arguments must be provided.* ctx : Context Device context. dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. Returns ------- dict Dictionary with `sym` arguments as keys and `location` elements as values. Examples ------- >>> a = mx.symbol.Variable('a') >>> b = mx.symbol.Variable('b') >>> l1 = np.ndarray([2,3]) >>> l2 = np.ndarray([3,4]) >>> _parse_location(a * b, [l1, l2], None) {'a': , 'b': } >>> _parse_location(a * b, {'a': l1, 'b': l2}, None) {'a': , 'b': } >>> _parse_location(a * b, {'a': l1}, None) ValueError: Symbol arguments and keys of the given location do not match. """ assert isinstance(location, (dict, list, tuple)) assert dtype in (np.float16, np.float32, np.float64) if isinstance(location, dict): if set(location.keys()) != set(sym.list_arguments()): raise ValueError("Symbol arguments and keys of the given location do not match." "symbol args:%s, location.keys():%s" % (str(set(sym.list_arguments())), str(set(location.keys())))) else: location = {k: v for k, v in zip(sym.list_arguments(), location)} location = {k: mx.nd.array(v, ctx=ctx, dtype=dtype) if isinstance(v, np.ndarray) \ else v for k, v in location.items()} return location def _parse_aux_states(sym, aux_states, ctx, dtype=default_dtype()): """Parses the given auxiliary states to a dictionary. Auxiliary states of the provided op `sym` are used as dictionary keys and elements of `aux_states` are used as values. Parameters ---------- sym : Symbol Symbol containing op aux_states : None or list or dict Aux states - if type is list or tuple of `np.ndarray` inner elements are arrays correspoding to ``sym.list_auxiliary_states()``. - if type is dict of str -> `np.ndarray` maps the name of arguments to the corresponding `np.ndarray`. *In either case, all aux states of `sym` must be provided.* ctx : Context Device context. dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. Returns ------- dict Dictionary with `sym` aux states as keys and `aux_states` elements as values. Examples ------- >>> data = mx.symbol.Variable('data') >>> weight = mx.sym.Variable(name='fc1_weight') >>> fc1 = mx.symbol.FullyConnected(data = data, weight=weight, name='fc1', num_hidden=128) >>> fc2 = mx.symbol.BatchNorm(fc1, name='batchnorm0') >>> mean_states = np.ones(3) >>> var_states = np.ones(3) >>> _parse_aux_states(fc2, [mean_states, var_states], None) {'batchnorm0_moving_var': , 'batchnorm0_moving_mean': } >>> _parse_aux_states(fc2, {'batchnorm0_moving_var': mean_states, ... 'batchnorm0_moving_mean': var_states}, None) {'batchnorm0_moving_var': , 'batchnorm0_moving_mean': } >>> _parse_aux_states(fc2, {'batchnorm0_moving_var': mean_states}, None) ValueError: Symbol aux_states names and given aux_states do not match. """ assert dtype in (np.float16, np.float32, np.float64) if aux_states is not None: if isinstance(aux_states, dict): if set(aux_states.keys()) != set(sym.list_auxiliary_states()): raise ValueError("Symbol aux_states names and given aux_states do not match." "symbol aux_names:%s, aux_states.keys:%s" % (str(set(sym.list_auxiliary_states())), str(set(aux_states.keys())))) elif isinstance(aux_states, (list, tuple)): aux_names = sym.list_auxiliary_states() aux_states = {k:v for k, v in zip(aux_names, aux_states)} aux_states = {k: mx.nd.array(v, ctx=ctx, dtype=dtype) for k, v in aux_states.items()} return aux_states
[docs]def numeric_grad(executor, location, aux_states=None, eps=1e-4, use_forward_train=True, dtype=default_dtype()): """Calculates a numeric gradient via finite difference method. Class based on Theano's `theano.gradient.numeric_grad` [1] Parameters ---------- executor : Executor Executor that computes the forward pass. location : list of numpy.ndarray or dict of str to numpy.ndarray Argument values used as location to compute gradient Maps the name of arguments to the corresponding numpy.ndarray. Value of all the arguments must be provided. aux_states : None or list of numpy.ndarray or dict of str to numpy.ndarray, optional Auxiliary states values used as location to compute gradient Maps the name of aux_states to the corresponding numpy.ndarray. Value of all the auxiliary arguments must be provided. eps : float, optional Epsilon for the finite-difference method. use_forward_train : bool, optional Whether to use `is_train=True` in testing. dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. References --------- ..[1] https://github.com/Theano/Theano/blob/master/theano/gradient.py """ def as_stype(var, stype, dtype): return mx.nd.cast_storage(mx.nd.array(var, dtype=dtype), stype=stype) assert dtype in (np.float16, np.float32, np.float64) approx_grads = {k: np.zeros(v.shape, dtype=dtype) for k, v in location.items()} for k, v in location.items(): stype = executor.arg_dict[k].stype if stype == 'default': executor.arg_dict[k][:] = as_stype(v, stype, dtype=dtype) for k in location: location[k] = np.asarray(location[k], order='C') for k, v in location.items(): if v.dtype.kind != 'f': continue stype = executor.arg_dict[k].stype old_value = v.copy() for i in range(np.prod(v.shape)): # inplace update v.ravel()[i] += eps/2.0 executor.arg_dict[k][:] = as_stype(v, stype, dtype=dtype) if aux_states is not None: for key, val in aux_states.items(): executor.aux_dict[key][:] = val executor.forward(is_train=use_forward_train) f_peps = executor.outputs[0].asnumpy() v.ravel()[i] -= eps executor.arg_dict[k][:] = as_stype(v, stype, dtype=dtype) if aux_states is not None: for key, val in aux_states.items(): adstype = executor.aux_dict[key].stype executor.aux_dict[key][:] = as_stype(val, adstype, dtype=dtype) executor.forward(is_train=use_forward_train) f_neps = executor.outputs[0].asnumpy() approx_grad = (f_peps - f_neps).sum() / eps approx_grads[k].ravel()[i] = approx_grad v.ravel()[i] = old_value.ravel()[i] # copy back the original value executor.arg_dict[k][:] = as_stype(old_value, stype, dtype=dtype) return approx_grads
[docs]def check_numeric_gradient(sym, location, aux_states=None, numeric_eps=1e-3, rtol=1e-2, atol=None, grad_nodes=None, use_forward_train=True, ctx=None, grad_stype_dict=None, dtype=default_dtype()): """Verify an operation by checking backward pass via finite difference method. Based on Theano's `theano.gradient.verify_grad` [1] Parameters ---------- sym : Symbol Symbol containing op to test location : list or tuple or dict Argument values used as location to compute gradient - if type is list of numpy.ndarray, \ inner elements should have the same order as mxnet.sym.list_arguments(). - if type is dict of str -> numpy.ndarray, \ maps the name of arguments to the corresponding numpy.ndarray. *In either case, value of all the arguments must be provided.* aux_states : list or tuple or dict, optional The auxiliary states required when generating the executor for the symbol. numeric_eps : float, optional Delta for the finite difference method that approximates the gradient. check_eps : float, optional relative error eps used when comparing numeric grad to symbolic grad. grad_nodes : None or list or tuple or dict, optional Names of the nodes to check gradient on use_forward_train : bool Whether to use is_train=True when computing the finite-difference. ctx : Context, optional Check the gradient computation on the specified device. grad_stype_dict : dict of str->str, optional Storage type dictionary for gradient ndarrays. dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. References --------- [1] https://github.com/Theano/Theano/blob/master/theano/gradient.py """ assert dtype in (np.float16, np.float32, np.float64) # cannot use finite differences with small eps without high precision if dtype in (np.float32, np.float16): assert numeric_eps >= 1e-5 if ctx is None: ctx = default_context() def random_projection(shape): """Get a random weight matrix with not too small elements Parameters ---------- shape : list or tuple """ # random_projection should not have elements too small, # otherwise too much precision is lost in numerical gradient plain = np.random.rand(*shape) + 0.1 return plain location = _parse_location(sym=sym, location=location, ctx=ctx, dtype=dtype) location_npy = {k:v.asnumpy() for k, v in location.items()} aux_states = _parse_aux_states(sym=sym, aux_states=aux_states, ctx=ctx, dtype=dtype) if aux_states is not None: aux_states_npy = {k: v.asnumpy() for k, v in aux_states.items()} else: aux_states_npy = None if grad_nodes is None: grad_nodes = sym.list_arguments() grad_req = {k: 'write' for k in grad_nodes} elif isinstance(grad_nodes, (list, tuple)): grad_nodes = list(grad_nodes) grad_req = {k: 'write' for k in grad_nodes} elif isinstance(grad_nodes, dict): grad_req = grad_nodes.copy() grad_nodes = grad_nodes.keys() else: raise ValueError input_shape = {k: v.shape for k, v in location.items()} _, out_shape, _ = sym.infer_shape(**input_shape) proj = mx.sym.Variable("__random_proj") out = sym * proj out = mx.sym.make_loss(out) location = dict(list(location.items()) + [("__random_proj", mx.nd.array(random_projection(out_shape[0]), ctx=ctx, dtype=dtype))]) args_grad_npy = dict([(k, np.random.normal(0, 0.01, size=location[k].shape)) for k in grad_nodes] + [("__random_proj", np.random.normal(0, 0.01, size=out_shape[0]))]) args_grad = {k: mx.nd.array(v, ctx=ctx, dtype=dtype) for k, v in args_grad_npy.items()} if grad_stype_dict is not None: assert isinstance(grad_stype_dict, dict), "grad_stype_dict must be a dict" for k, v in grad_stype_dict.items(): if k in args_grad and v in _STORAGE_TYPE_STR_TO_ID and v != 'default': # create an uninitialized sparse ndarray for executor # if the symbolic grad is expected to be zero, it should not be initialized at all args_grad[k] = mx.nd.zeros(args_grad[k].shape, args_grad[k].context, args_grad[k].dtype, v) executor = out.bind(ctx, grad_req=grad_req, args=location, args_grad=args_grad, aux_states=aux_states) inps = executor.arg_arrays if len(inps) != len(location): raise ValueError("Executor arg_arrays and and location len do not match." "Got %d inputs and %d locations"%(len(inps), len(location))) assert len(executor.outputs) == 1 executor.forward(is_train=True) executor.backward() symbolic_grads = {k:executor.grad_dict[k].asnumpy() for k in grad_nodes} numeric_gradients = numeric_grad( executor, location_npy, aux_states_npy, eps=numeric_eps, use_forward_train=use_forward_train, dtype=dtype) for name in grad_nodes: fd_grad = numeric_gradients[name] orig_grad = args_grad_npy[name] sym_grad = symbolic_grads[name] if grad_req[name] == 'write': assert_almost_equal(fd_grad, sym_grad, rtol, atol, ("NUMERICAL_%s"%name, "BACKWARD_%s"%name)) elif grad_req[name] == 'add': assert_almost_equal(fd_grad, sym_grad - orig_grad, rtol, atol, ("NUMERICAL_%s"%name, "BACKWARD_%s"%name)) elif grad_req[name] == 'null': assert_almost_equal(orig_grad, sym_grad, rtol, atol, ("NUMERICAL_%s"%name, "BACKWARD_%s"%name)) else: raise ValueError("Invalid grad_req %s for argument %s"%(grad_req[name], name))
[docs]def check_symbolic_forward(sym, location, expected, rtol=1E-4, atol=None, aux_states=None, ctx=None, equal_nan=False, dtype=default_dtype()): """Compares a symbol's forward results with the expected ones. Prints error messages if the forward results are not the same as the expected ones. Parameters --------- sym : Symbol output symbol location : list of np.ndarray or dict of str to np.ndarray The evaluation point - if type is list of np.ndarray Contains all the numpy arrays corresponding to `sym.list_arguments()`. - if type is dict of str to np.ndarray Contains the mapping between argument names and their values. expected : list of np.ndarray or dict of str to np.ndarray The expected output value - if type is list of np.ndarray Contains arrays corresponding to exe.outputs. - if type is dict of str to np.ndarray Contains mapping between sym.list_output() and exe.outputs. check_eps : float, optional Relative error to check to. aux_states : list of np.ndarray of dict, optional - if type is list of np.ndarray Contains all the NumPy arrays corresponding to sym.list_auxiliary_states - if type is dict of str to np.ndarray Contains the mapping between names of auxiliary states and their values. ctx : Context, optional running context dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. equal_nan: Boolean if True, `nan` is a valid value for checking equivalency (ie `nan` == `nan`) Example ------- >>> shape = (2, 2) >>> lhs = mx.symbol.Variable('lhs') >>> rhs = mx.symbol.Variable('rhs') >>> sym_dot = mx.symbol.dot(lhs, rhs) >>> mat1 = np.array([[1, 2], [3, 4]]) >>> mat2 = np.array([[5, 6], [7, 8]]) >>> ret_expected = np.array([[19, 22], [43, 50]]) >>> check_symbolic_forward(sym_dot, [mat1, mat2], [ret_expected]) """ assert dtype in (np.float16, np.float32, np.float64) if ctx is None: ctx = default_context() location = _parse_location(sym=sym, location=location, ctx=ctx, dtype=dtype) aux_states = _parse_aux_states(sym=sym, aux_states=aux_states, ctx=ctx, dtype=dtype) if isinstance(expected, dict): expected = [expected[k] for k in sym.list_outputs()] args_grad_data = {k:mx.nd.empty(v.shape, ctx=ctx, dtype=dtype) for k, v in location.items()} executor = sym.bind(ctx=ctx, args=location, args_grad=args_grad_data, aux_states=aux_states) for g in executor.grad_arrays: g[:] = 0 executor.forward(is_train=False) outputs = [x.asnumpy() for x in executor.outputs] for output_name, expect, output in zip(sym.list_outputs(), expected, outputs): assert_almost_equal(expect, output, rtol, atol, ("EXPECTED_%s"%output_name, "FORWARD_%s"%output_name), equal_nan=equal_nan) return executor.outputs
[docs]def check_symbolic_backward(sym, location, out_grads, expected, rtol=1e-5, atol=None, aux_states=None, grad_req='write', ctx=None, grad_stypes=None, equal_nan=False, dtype=default_dtype()): """Compares a symbol's backward results with the expected ones. Prints error messages if the backward results are not the same as the expected results. Parameters --------- sym : Symbol output symbol location : list of np.ndarray or dict of str to np.ndarray The evaluation point - if type is list of np.ndarray Contains all the NumPy arrays corresponding to ``mx.sym.list_arguments``. - if type is dict of str to np.ndarray Contains the mapping between argument names and their values. out_grads : None or list of np.ndarray or dict of str to np.ndarray NumPys arrays corresponding to sym.outputs for incomming gradient. - if type is list of np.ndarray Contains arrays corresponding to ``exe.outputs``. - if type is dict of str to np.ndarray contains mapping between mxnet.sym.list_output() and Executor.outputs expected : list of np.ndarray or dict of str to np.ndarray expected gradient values - if type is list of np.ndarray Contains arrays corresponding to exe.grad_arrays - if type is dict of str to np.ndarray Contains mapping between ``sym.list_arguments()`` and exe.outputs. check_eps: float, optional Relative error to check to. aux_states : list of np.ndarray or dict of str to np.ndarray grad_req : str or list of str or dict of str to str, optional Gradient requirements. 'write', 'add' or 'null'. ctx : Context, optional Running context. grad_stypes: dict of str->str dictionary of mapping argument name to stype for the gradient equal_nan: Boolean if True, `nan` is a valid value for checking equivalency (ie `nan` == `nan`) dtype: np.float16 or np.float32 or np.float64 Datatype for mx.nd.array. Example ------- >>> lhs = mx.symbol.Variable('lhs') >>> rhs = mx.symbol.Variable('rhs') >>> sym_add = mx.symbol.elemwise_add(lhs, rhs) >>> mat1 = np.array([[1, 2], [3, 4]]) >>> mat2 = np.array([[5, 6], [7, 8]]) >>> grad1 = mx.nd.zeros(shape) >>> grad2 = mx.nd.zeros(shape) >>> exec_add = sym_add.bind(default_context(), args={'lhs': mat1, 'rhs': mat2}, ... args_grad={'lhs': grad1, 'rhs': grad2}, grad_req={'lhs': 'write', 'rhs': 'write'}) >>> exec_add.forward(is_train=True) >>> ograd = mx.nd.ones(shape) >>> grad_expected = ograd.copy().asnumpy() >>> check_symbolic_backward(sym_add, [mat1, mat2], [ograd], [grad_expected, grad_expected]) """ assert dtype in (np.float16, np.float32, np.float64) if ctx is None: ctx = default_context() location = _parse_location(sym=sym, location=location, ctx=ctx, dtype=dtype) aux_states = _parse_aux_states(sym=sym, aux_states=aux_states, ctx=ctx, dtype=dtype) if isinstance(expected, (list, tuple)): expected = {k:v for k, v in zip(sym.list_arguments(), expected)} args_grad_npy = {k:np.random.normal(size=v.shape) for k, v in expected.items()} args_grad_data = {} for k, v in args_grad_npy.items(): nd = mx.nd.array(v, ctx=ctx, dtype=dtype) if grad_stypes is not None and k in grad_stypes: stype = grad_stypes[k] if stype is not None and stype != 'default': out = create_sparse_array(v.shape, stype, density=0.0) else: out = nd args_grad_data[k] = out else: args_grad_data[k] = nd if isinstance(grad_req, str): grad_req = {k:grad_req for k in sym.list_arguments()} elif isinstance(grad_req, (list, tuple)): grad_req = {k:v for k, v in zip(sym.list_arguments(), grad_req)} executor = sym.bind(ctx=ctx, args=location, args_grad=args_grad_data, aux_states=aux_states, grad_req=grad_req) executor.forward(is_train=True) if isinstance(out_grads, (tuple, list)): outg = list() for arr in out_grads: if isinstance(arr, np.ndarray): outg.append(mx.nd.array(arr, ctx=ctx, dtype=dtype)) else: outg.append(arr) out_grads = outg elif isinstance(out_grads, dict): outg = dict() for k, v in out_grads.items(): if isinstance(v, np.ndarray): outg[k] = mx.nd.array(v, ctx=ctx, dtype=dtype) else: outg[k] = v out_grads = outg else: assert out_grads is None executor.backward(out_grads) grads = {k: v.asnumpy() for k, v in args_grad_data.items()} for name in expected: if grad_req[name] == 'write': assert_almost_equal(expected[name], grads[name], rtol, atol, ("EXPECTED_%s"%name, "BACKWARD_%s"%name), equal_nan=equal_nan) elif grad_req[name] == 'add': assert_almost_equal(expected[name], grads[name] - args_grad_npy[name], rtol, atol, ("EXPECTED_%s"%name, "BACKWARD_%s"%name), equal_nan=equal_nan) elif grad_req[name] == 'null': assert_almost_equal(args_grad_npy[name], grads[name], rtol, atol, ("EXPECTED_%s"%name, "BACKWARD_%s"%name), equal_nan=equal_nan) else: raise ValueError("Invalid grad_req %s for argument %s"%(grad_req[name], name)) return args_grad_data
[docs]def check_speed(sym, location=None, ctx=None, N=20, grad_req=None, typ="whole", **kwargs): """Check the running speed of a symbol. Parameters ---------- sym : Symbol Symbol to run the speed test. location : none or dict of str to np.ndarray Location to evaluate the inner executor. ctx : Context Running context. N : int, optional Repeat times. grad_req : None or str or list of str or dict of str to str, optional Gradient requirements. typ : str, optional "whole" or "forward" - "whole" Test the forward_backward speed. - "forward" Only test the forward speed. """ if ctx is None: ctx = default_context() if grad_req is None: grad_req = 'write' if location is None: exe = sym.simple_bind(grad_req=grad_req, ctx=ctx, **kwargs) location = {k: np.random.normal(size=arr.shape, scale=1.0) for k, arr in exe.arg_dict.items()} else: assert isinstance(location, dict), "Expect dict, get \"location\"=%s" %str(location) exe = sym.simple_bind(grad_req=grad_req, ctx=ctx, **{k: v.shape for k, v in location.items()}) for name, iarr in location.items(): exe.arg_dict[name][:] = iarr.astype(exe.arg_dict[name].dtype) if typ == "whole": # Warm up exe.forward(is_train=True) exe.backward(out_grads=exe.outputs) for output in exe.outputs: output.wait_to_read() # Test forward + backward tic = time.time() for _ in range(N): exe.forward(is_train=True) exe.backward(out_grads=exe.outputs) mx.nd.waitall() toc = time.time() forward_backward_time = (toc - tic) * 1.0 / N return forward_backward_time elif typ == "forward": # Warm up exe.forward(is_train=False) for output in exe.outputs: output.wait_to_read() # Test forward only tic = time.time() for _ in range(N): exe.forward(is_train=False) mx.nd.waitall() toc = time.time() forward_time = (toc - tic) * 1.0 / N return forward_time else: raise ValueError('typ can only be "whole" or "forward".')
[docs]def check_consistency(sym, ctx_list, scale=1.0, grad_req='write', arg_params=None, aux_params=None, tol=None, raise_on_err=True, ground_truth=None, equal_nan=False, use_uniform=False, rand_type=np.float64): """Check symbol gives the same output for different running context Parameters ---------- sym : Symbol or list of Symbols Symbol(s) to run the consistency test. ctx_list : list Running context. See example for more detail. scale : float, optional Standard deviation of the inner normal distribution. Used in initialization. grad_req : str or list of str or dict of str to str Gradient requirement. use_unifrom: bool Optional, When flag set to true, random input data generated follows uniform distribution, not normal distribution rand_type: np.dtype casts the randomly generated data to this type Optional, when input data is passed via arg_params, defaults to np.float64 (numpy float default) Examples -------- >>> # create the symbol >>> sym = mx.sym.Convolution(num_filter=3, kernel=(3,3), name='conv') >>> # initialize the running context >>> ctx_list =\ [{'ctx': mx.gpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float64}},\ {'ctx': mx.gpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float32}},\ {'ctx': mx.gpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float16}},\ {'ctx': mx.cpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float64}},\ {'ctx': mx.cpu(0), 'conv_data': (2, 2, 10, 10), 'type_dict': {'conv_data': np.float32}}] >>> check_consistency(sym, ctx_list) >>> sym = mx.sym.Concat(name='concat', num_args=2) >>> ctx_list = \ [{'ctx': mx.gpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float64, 'concat_arg1': np.float64}},\ {'ctx': mx.gpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float32, 'concat_arg1': np.float32}},\ {'ctx': mx.gpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float16, 'concat_arg1': np.float16}},\ {'ctx': mx.cpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float64, 'concat_arg1': np.float64}},\ {'ctx': mx.cpu(0), 'concat_arg1': (2, 10), 'concat_arg0': (2, 10),\ 'type_dict': {'concat_arg0': np.float32, 'concat_arg1': np.float32}}] >>> check_consistency(sym, ctx_list) """ if tol is None: tol = {np.dtype(np.float16): 1e-1, np.dtype(np.float32): 1e-3, np.dtype(np.float64): 1e-5, np.dtype(np.uint8): 0, np.dtype(np.int32): 0, np.dtype(np.int64): 0} elif isinstance(tol, numbers.Number): tol = {np.dtype(np.float16): tol, np.dtype(np.float32): tol, np.dtype(np.float64): tol, np.dtype(np.uint8): tol, np.dtype(np.int32): tol, np.dtype(np.int64): tol} assert len(ctx_list) > 1 if isinstance(sym, Symbol): sym = [sym]*len(ctx_list) else: assert len(sym) == len(ctx_list) output_names = sym[0].list_outputs() arg_names = sym[0].list_arguments() exe_list = [] for s, ctx in zip(sym, ctx_list): assert s.list_arguments() == arg_names assert s.list_outputs() == output_names exe_list.append(s.simple_bind(grad_req=grad_req, **ctx)) arg_params = {} if arg_params is None else arg_params aux_params = {} if aux_params is None else aux_params for n, arr in exe_list[0].arg_dict.items(): if n not in arg_params: if use_uniform: arg_params[n] = np.random.uniform(low=-0.92, high=0.92, size=arr.shape).astype(rand_type) else: arg_params[n] = np.random.normal(size=arr.shape, scale=scale).astype(rand_type) for n, arr in exe_list[0].aux_dict.items(): if n not in aux_params: aux_params[n] = 0 for exe in exe_list: for name, arr in exe.arg_dict.items(): arr[:] = arg_params[name] for name, arr in exe.aux_dict.items(): arr[:] = aux_params[name] # We need to initialize the gradient arrays if it's add. if (grad_req == "add"): for arr in exe.grad_arrays: arr[:] = np.zeros(arr.shape, dtype=arr.dtype) dtypes = [np.dtype(exe.outputs[0].dtype) for exe in exe_list] max_idx = np.argmax(dtypes) gt = ground_truth if gt is None: gt = exe_list[max_idx].output_dict.copy() if grad_req != 'null': gt.update(exe_list[max_idx].grad_dict) # test for exe in exe_list: exe.forward(is_train=False) for i, exe in enumerate(exe_list): if i == max_idx: continue for name, arr in zip(output_names, exe.outputs): gtarr = gt[name].astype(dtypes[i]).asnumpy() arr = arr.asnumpy() try: assert_almost_equal(arr, gtarr, rtol=tol[dtypes[i]], atol=tol[dtypes[i]], equal_nan=equal_nan) except AssertionError as e: print('Predict Err: ctx %d vs ctx %d at %s'%(i, max_idx, name)) traceback.print_exc() if raise_on_err: raise e else: print(str(e)) # train if grad_req != 'null': for exe in exe_list: exe.forward(is_train=True) exe.backward(exe.outputs) for i, exe in enumerate(exe_list): if i == max_idx: continue curr = zip(output_names + arg_names, exe.outputs + exe.grad_arrays) for name, arr in curr: if gt[name] is None: assert arr is None continue gtarr = gt[name].astype(dtypes[i]).asnumpy() arr = arr.asnumpy() try: assert_almost_equal(arr, gtarr, rtol=tol[dtypes[i]], atol=tol[dtypes[i]], equal_nan=equal_nan) except AssertionError as e: print('Train Err: ctx %d vs ctx %d at %s'%(i, max_idx, name)) traceback.print_exc() if raise_on_err: raise e else: print(str(e)) return gt
[docs]def list_gpus(): """Return a list of GPUs Returns ------- list of int: If there are n GPUs, then return a list [0,1,...,n-1]. Otherwise returns []. """ re = '' nvidia_smi = ['nvidia-smi', '/usr/bin/nvidia-smi', '/usr/local/nvidia/bin/nvidia-smi'] for cmd in nvidia_smi: try: re = subprocess.check_output([cmd, "-L"], universal_newlines=True) except (subprocess.CalledProcessError, OSError): pass return range(len([i for i in re.split('\n') if 'GPU' in i]))
[docs]def download(url, fname=None, dirname=None, overwrite=False, retries=5): """Download an given URL Parameters ---------- url : str URL to download fname : str, optional filename of the downloaded file. If None, then will guess a filename from url. dirname : str, optional output directory name. If None, then guess from fname or use the current directory overwrite : bool, optional Default is false, which means skipping download if the local file exists. If true, then download the url to overwrite the local file if exists. retries : integer, default 5 The number of times to attempt the download in case of failure or non 200 return codes Returns ------- str The filename of the downloaded file """ assert retries >= 0, "Number of retries should be at least 0" if fname is None: fname = url.split('/')[-1] if dirname is None: dirname = os.path.dirname(fname) else: fname = os.path.join(dirname, fname) if dirname != "": if not os.path.exists(dirname): try: logging.info('create directory %s', dirname) os.makedirs(dirname) except OSError as exc: if exc.errno != errno.EEXIST: raise OSError('failed to create ' + dirname) if not overwrite and os.path.exists(fname): logging.info("%s exists, skipping download", fname) return fname while retries+1 > 0: # Disable pyling too broad Exception # pylint: disable=W0703 try: r = requests.get(url, stream=True) assert r.status_code == 200, "failed to open %s" % url with open(fname, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks f.write(chunk) break except Exception as e: retries -= 1 if retries <= 0: raise e else: print("download failed, retrying, {} attempt{} left" .format(retries, 's' if retries > 1 else '')) logging.info("downloaded %s into %s successfully", url, fname) return fname
[docs]def get_mnist(): """Download and load the MNIST dataset Returns ------- dict A dict containing the data """ def read_data(label_url, image_url): with gzip.open(mx.test_utils.download(label_url)) as flbl: struct.unpack(">II", flbl.read(8)) label = np.frombuffer(flbl.read(), dtype=np.int8) with gzip.open(mx.test_utils.download(image_url), 'rb') as fimg: _, _, rows, cols = struct.unpack(">IIII", fimg.read(16)) image = np.frombuffer(fimg.read(), dtype=np.uint8).reshape(len(label), rows, cols) image = image.reshape(image.shape[0], 1, 28, 28).astype(np.float32)/255 return (label, image) # changed to mxnet.io for more stable hosting # path = 'http://yann.lecun.com/exdb/mnist/' path = 'http://data.mxnet.io/data/mnist/' (train_lbl, train_img) = read_data( path+'train-labels-idx1-ubyte.gz', path+'train-images-idx3-ubyte.gz') (test_lbl, test_img) = read_data( path+'t10k-labels-idx1-ubyte.gz', path+'t10k-images-idx3-ubyte.gz') return {'train_data':train_img, 'train_label':train_lbl, 'test_data':test_img, 'test_label':test_lbl}
[docs]def get_mnist_pkl(): """Downloads MNIST dataset as a pkl.gz into a directory in the current directory with the name `data` """ if not os.path.isdir("data"): os.makedirs('data') if not os.path.exists('data/mnist.pkl.gz'): download('http://deeplearning.net/data/mnist/mnist.pkl.gz', dirname='data')
[docs]def get_mnist_ubyte(): """Downloads ubyte version of the MNIST dataset into a directory in the current directory with the name `data` and extracts all files in the zip archive to this directory. """ if not os.path.isdir("data"): os.makedirs('data') if (not os.path.exists('data/train-images-idx3-ubyte')) or \ (not os.path.exists('data/train-labels-idx1-ubyte')) or \ (not os.path.exists('data/t10k-images-idx3-ubyte')) or \ (not os.path.exists('data/t10k-labels-idx1-ubyte')): zip_file_path = download('http://data.mxnet.io/mxnet/data/mnist.zip', dirname='data') with zipfile.ZipFile(zip_file_path) as zf: zf.extractall('data')
[docs]def get_cifar10(): """Downloads CIFAR10 dataset into a directory in the current directory with the name `data`, and then extracts all files into the directory `data/cifar`. """ if not os.path.isdir("data"): os.makedirs('data') if (not os.path.exists('data/cifar/train.rec')) or \ (not os.path.exists('data/cifar/test.rec')) or \ (not os.path.exists('data/cifar/train.lst')) or \ (not os.path.exists('data/cifar/test.lst')): zip_file_path = download('http://data.mxnet.io/mxnet/data/cifar10.zip', dirname='data') with zipfile.ZipFile(zip_file_path) as zf: zf.extractall('data')
[docs]def get_mnist_iterator(batch_size, input_shape, num_parts=1, part_index=0): """Returns training and validation iterators for MNIST dataset """ get_mnist_ubyte() flat = False if len(input_shape) == 3 else True train_dataiter = mx.io.MNISTIter( image="data/train-images-idx3-ubyte", label="data/train-labels-idx1-ubyte", input_shape=input_shape, batch_size=batch_size, shuffle=True, flat=flat, num_parts=num_parts, part_index=part_index) val_dataiter = mx.io.MNISTIter( image="data/t10k-images-idx3-ubyte", label="data/t10k-labels-idx1-ubyte", input_shape=input_shape, batch_size=batch_size, flat=flat, num_parts=num_parts, part_index=part_index) return (train_dataiter, val_dataiter)
[docs]def get_zip_data(data_dir, url, data_origin_name): """Download and extract zip data. Parameters ---------- data_dir : str Absolute or relative path of the directory name to store zip files url : str URL to download data from data_origin_name : str Name of the downloaded zip file Examples -------- >>> get_zip_data("data_dir", "http://files.grouplens.org/datasets/movielens/ml-10m.zip", "ml-10m.zip") """ data_origin_name = os.path.join(data_dir, data_origin_name) if not os.path.exists(data_origin_name): download(url, dirname=data_dir, overwrite=False) zip_file = zipfile.ZipFile(data_origin_name) zip_file.extractall(path=data_dir)
[docs]def get_bz2_data(data_dir, data_name, url, data_origin_name): """Download and extract bz2 data. Parameters ---------- data_dir : str Absolute or relative path of the directory name to store bz2 files data_name : str Name of the output file in which bz2 contents will be extracted url : str URL to download data from data_origin_name : str Name of the downloaded b2 file Examples -------- >>> get_bz2_data("data_dir", "kdda.t", "https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2", "kdda.t.bz2") """ data_name = os.path.join(data_dir, data_name) data_origin_name = os.path.join(data_dir, data_origin_name) if not os.path.exists(data_name): download(url, fname=data_origin_name, dirname=data_dir, overwrite=False) bz_file = bz2.BZ2File(data_origin_name, 'rb') with open(data_name, 'wb') as fout: for line in bz_file: fout.write(line) bz_file.close() os.remove(data_origin_name)
[docs]def set_env_var(key, val, default_val=""): """Set environment variable Parameters ---------- key : str Env var to set val : str New value assigned to the env var default_val : str, optional Default value returned if the env var doesn't exist Returns ------- str The value of env var before it is set to the new value """ prev_val = os.environ.get(key, default_val) os.environ[key] = val return prev_val
[docs]def same_array(array1, array2): """Check whether two NDArrays sharing the same memory block Parameters ---------- array1 : NDArray First NDArray to be checked array2 : NDArray Second NDArray to be checked Returns ------- bool Whether two NDArrays share the same memory """ array1[:] += 1 if not same(array1.asnumpy(), array2.asnumpy()): array1[:] -= 1 return False array1[:] -= 1 return same(array1.asnumpy(), array2.asnumpy())
@contextmanager
[docs]def discard_stderr(): """ Discards error output of a routine if invoked as: with discard_stderr(): ... """ with open(os.devnull, 'w') as bit_bucket: try: stderr_fileno = sys.stderr.fileno() old_stderr = os.dup(stderr_fileno) try: os.dup2(bit_bucket.fileno(), stderr_fileno) yield finally: os.dup2(old_stderr, stderr_fileno) except AttributeError: # On some systems is stderr not a file descriptor but actually a virtual pipeline # that can not be copied yield
[docs]class DummyIter(mx.io.DataIter): """A dummy iterator that always returns the same batch of data (the first data batch of the real data iter). This is usually used for speed testing. Parameters ---------- real_iter: mx.io.DataIter The real data iterator where the first batch of data comes from """ def __init__(self, real_iter): super(DummyIter, self).__init__() self.real_iter = real_iter self.provide_data = real_iter.provide_data self.provide_label = real_iter.provide_label self.batch_size = real_iter.batch_size self.the_batch = next(real_iter) def __iter__(self): return self
[docs] def next(self): """Get a data batch from iterator. The first data batch of real iter is always returned. StopIteration will never be raised. Returns ------- DataBatch The data of next batch. """ return self.the_batch
[docs]def gen_buckets_probs_with_ppf(ppf, nbuckets): """Generate the buckets and probabilities for chi_square test when the ppf (Quantile function) is specified. Parameters ---------- ppf : function The Quantile function that takes a probability and maps it back to a value. It's the inverse of the cdf function nbuckets : int size of the buckets Returns ------- buckets : list of tuple The generated buckets probs : list The generate probabilities """ assert nbuckets > 0 probs = [1.0 / nbuckets for _ in range(nbuckets)] buckets = [(ppf(i / float(nbuckets)), ppf((i + 1) / float(nbuckets))) for i in range(nbuckets)] return buckets, probs
[docs]def mean_check(generator, mu, sigma, nsamples=1000000): """Test the generator by matching the mean. We test the sample mean by checking if it falls inside the range (mu - 3 * sigma / sqrt(n), mu + 3 * sigma / sqrt(n)) References:: @incollection{goucher2009beautiful, title={Beautiful Testing: Leading Professionals Reveal How They Improve Software}, author={Goucher, Adam and Riley, Tim}, year={2009}, chapter=10 } Examples:: generator = lambda x: np.random.normal(0, 1.0, size=x) mean_check_ret = mean_check(generator, 0, 1.0) Parameters ---------- generator : function The generator function. It's expected to generate N i.i.d samples by calling generator(N). mu : float sigma : float nsamples : int Returns ------- ret : bool Whether the mean test succeeds """ samples = np.array(generator(nsamples)) sample_mean = samples.mean() ret = (sample_mean > mu - 3 * sigma / np.sqrt(nsamples)) and\ (sample_mean < mu + 3 * sigma / np.sqrt(nsamples)) return ret
[docs]def get_im2rec_path(home_env="MXNET_HOME"): """Get path to the im2rec.py tool Parameters ---------- home_env : str Env variable that holds the path to the MXNET folder Returns ------- str The path to im2rec.py """ # Check first if the path to MXNET is passed as an env variable if home_env in os.environ: mxnet_path = os.environ[home_env] else: # Else use currently imported mxnet as reference mxnet_path = os.path.dirname(mx.__file__) # If MXNet was installed through pip, the location of im2rec.py im2rec_path = os.path.join(mxnet_path, 'tools', 'im2rec.py') if os.path.isfile(im2rec_path): return im2rec_path # If MXNet has been built locally im2rec_path = os.path.join(mxnet_path, '..', '..', 'tools', 'im2rec.py') if os.path.isfile(im2rec_path): return im2rec_path raise IOError('Could not find path to tools/im2rec.py')
[docs]def var_check(generator, sigma, nsamples=1000000): """Test the generator by matching the variance. It will need a large number of samples and is not recommended to use We test the sample variance by checking if it falls inside the range (sigma^2 - 3 * sqrt(2 * sigma^4 / (n-1)), sigma^2 + 3 * sqrt(2 * sigma^4 / (n-1))) References:: @incollection{goucher2009beautiful, title={Beautiful Testing: Leading Professionals Reveal How They Improve Software}, author={Goucher, Adam and Riley, Tim}, year={2009}, chapter=10 } Examples:: generator = lambda x: np.random.normal(0, 1.0, size=x) var_check_ret = var_check(generator, 0, 1.0) Parameters ---------- generator : function The generator function. It's expected to generate N i.i.d samples by calling generator(N). sigma : float nsamples : int Returns ------- ret : bool Whether the variance test succeeds """ samples = np.array(generator(nsamples)) sample_var = samples.var(ddof=1) ret = (sample_var > sigma ** 2 - 3 * np.sqrt(2 * sigma ** 4 / (nsamples - 1))) and\ (sample_var < sigma ** 2 + 3 * np.sqrt(2 * sigma ** 4 / (nsamples - 1))) return ret
[docs]def chi_square_check(generator, buckets, probs, nsamples=1000000): """Run the chi-square test for the generator. The generator can be both continuous and discrete. If the generator is continuous, the buckets should contain tuples of (range_min, range_max) \ and the probs should be the corresponding ideal probability within the specific ranges. \ Otherwise, the buckets should be the possible output of the discrete distribution and the \ probs should be groud-truth probability. Usually the user is required to specify the probs parameter. After obtatining the p value, we could further use the standard p > 0.05 threshold to get \ the final result. Examples:: buckets, probs = gen_buckets_probs_with_ppf(lambda x: ss.norm.ppf(x, 0, 1), 5) generator = lambda x: np.random.normal(0, 1.0, size=x) p = chi_square_check(generator=generator, buckets=buckets, probs=probs) assert(p > 0.05) Parameters ---------- generator: function A function that is assumed to generate i.i.d samples from a specific distribution. generator(N) should generate N random samples. buckets: list of tuple or list of number The buckets to run the chi-square the test. Make sure that the buckets cover the whole range of the distribution. Also, the buckets must be in ascending order and have no intersection probs: list or tuple The ground-truth probability of the random value fall in a specific bucket. nsamples:int The number of samples to generate for the testing Returns ------- p : float p value that the generator has the expected distribution. A higher value indicates a larger confidence obs_freq : list Observed frequency of buckets expected_freq : list The expected (ground-truth) frequency of the buckets """ if not ss: raise ImportError("scipy is not available." " Please check if the scipy python bindings are installed.") assert isinstance(buckets, list) samples = generator(nsamples) assert len(probs) == len(buckets) if isinstance(buckets[0], (list, tuple)): # Check whether the buckets are valid and fill them into a npy array continuous_dist = True buckets_npy = np.zeros((len(buckets) * 2, ), dtype=np.float32) for i, _ in enumerate(buckets): assert(buckets[i][0] <= buckets[i][1]) if i < len(buckets) - 1: assert(buckets[i][1] <= buckets[i + 1][0]) buckets_npy[i * 2] = buckets[i][0] buckets_npy[i * 2 + 1] = buckets[i][1] else: continuous_dist = False buckets_npy = np.array(buckets) expected_freq = (nsamples * np.array(probs, dtype=np.float32)).astype(np.int32) if continuous_dist: sample_bucket_ids = np.searchsorted(buckets_npy, samples, side='right') else: sample_bucket_ids = samples if continuous_dist: sample_bucket_ids = sample_bucket_ids // 2 obs_freq = np.zeros(shape=len(buckets), dtype=np.int) for i in range(len(buckets)): obs_freq[i] = (sample_bucket_ids == i).sum() _, p = ss.chisquare(f_obs=obs_freq, f_exp=expected_freq) return p, obs_freq, expected_freq
[docs]def verify_generator(generator, buckets, probs, nsamples=1000000, nrepeat=5, success_rate=0.15): """Verify whether the generator is correct using chi-square testing. The test is repeated for "nrepeat" times and we check if the success rate is above the threshold (25% by default). Parameters ---------- generator: function A function that is assumed to generate i.i.d samples from a specific distribution. generator(N) should generate N random samples. buckets: list of tuple or list of number The buckets to run the chi-square the test. Make sure that the buckets cover the whole range of the distribution. Also, the buckets must be in ascending order and have no intersection probs: list or tuple The ground-truth probability of the random value fall in a specific bucket. nsamples: int The number of samples to generate for the testing nrepeat: int The times to repeat the test success_rate: float The desired success rate Returns ------- cs_ret_l: list The p values of the chi-square test. """ cs_ret_l = [] obs_freq_l = [] expected_freq_l = [] for _ in range(nrepeat): cs_ret, obs_freq, expected_freq = chi_square_check(generator=generator, buckets=buckets, probs=probs, nsamples=nsamples) cs_ret_l.append(cs_ret) obs_freq_l.append(obs_freq) expected_freq_l.append(expected_freq) success_num = (np.array(cs_ret_l) > 0.05).sum() if success_num < nrepeat * success_rate: raise AssertionError("Generator test fails, Chi-square p=%s, obs_freq=%s, expected_freq=%s." "\nbuckets=%s, probs=%s" % (str(cs_ret_l), str(obs_freq_l), str(expected_freq_l), str(buckets), str(probs))) return cs_ret_l
[docs]def compare_ndarray_tuple(t1, t2, rtol=None, atol=None): """Compare ndarray tuple.""" if t1 is not None and t2 is not None: if isinstance(t1, tuple): for s1, s2 in zip(t1, t2): compare_ndarray_tuple(s1, s2, rtol, atol) else: assert_almost_equal(t1.asnumpy(), t2.asnumpy(), rtol=rtol, atol=atol)
[docs]def compare_optimizer(opt1, opt2, shape, dtype, w_stype='default', g_stype='default', rtol=1e-4, atol=1e-5, compare_states=True): """Compare opt1 and opt2.""" if w_stype == 'default': w2 = mx.random.uniform(shape=shape, ctx=default_context(), dtype=dtype) w1 = w2.copyto(default_context()) elif w_stype == 'row_sparse' or w_stype == 'csr': w2 = rand_ndarray(shape, w_stype, density=1, dtype=dtype) w1 = w2.copyto(default_context()).tostype('default') else: raise Exception("type not supported yet") if g_stype == 'default': g2 = mx.random.uniform(shape=shape, ctx=default_context(), dtype=dtype) g1 = g2.copyto(default_context()) elif g_stype == 'row_sparse' or g_stype == 'csr': g2 = rand_ndarray(shape, g_stype, dtype=dtype) g1 = g2.copyto(default_context()).tostype('default') else: raise Exception("type not supported yet") state1 = opt1.create_state_multi_precision(0, w1) state2 = opt2.create_state_multi_precision(0, w2) if compare_states: compare_ndarray_tuple(state1, state2) opt1.update_multi_precision(0, w1, g1, state1) opt2.update_multi_precision(0, w2, g2, state2) if compare_states: compare_ndarray_tuple(state1, state2, rtol=rtol, atol=atol) assert_almost_equal(w1.asnumpy(), w2.asnumpy(), rtol=rtol, atol=atol)
[docs]class EnvManager(object): """Environment variable setter and unsetter via with idiom""" def __init__(self, key, val): self._key = key self._next_val = val self._prev_val = None def __enter__(self): self._prev_val = os.environ.get(self._key) os.environ[self._key] = self._next_val def __exit__(self, ptype, value, trace): if self._prev_val: os.environ[self._key] = self._prev_val else: del os.environ[self._key]