# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# coding: utf-8
# pylint: disable=no-member, invalid-name, protected-access, no-self-use
# pylint: disable=too-many-branches, too-many-arguments, no-self-use
# pylint: disable=too-many-lines, arguments-differ
"""Definition of various recurrent neural network layers."""
from __future__ import print_function
import re
__all__ = ['RNN', 'LSTM', 'GRU']
from ... import ndarray, symbol
from .. import HybridBlock, tensor_types
from . import rnn_cell
class _RNNLayer(HybridBlock):
"""Implementation of recurrent layers."""
def __init__(self, hidden_size, num_layers, layout,
dropout, bidirectional, input_size,
i2h_weight_initializer, h2h_weight_initializer,
i2h_bias_initializer, h2h_bias_initializer,
mode, projection_size, h2r_weight_initializer,
lstm_state_clip_min, lstm_state_clip_max, lstm_state_clip_nan,
**kwargs):
super(_RNNLayer, self).__init__(**kwargs)
assert layout in ('TNC', 'NTC'), \
"Invalid layout %s; must be one of ['TNC' or 'NTC']"%layout
self._hidden_size = hidden_size
self._projection_size = projection_size if projection_size else None
self._num_layers = num_layers
self._mode = mode
self._layout = layout
self._dropout = dropout
self._dir = 2 if bidirectional else 1
self._input_size = input_size
self._i2h_weight_initializer = i2h_weight_initializer
self._h2h_weight_initializer = h2h_weight_initializer
self._i2h_bias_initializer = i2h_bias_initializer
self._h2h_bias_initializer = h2h_bias_initializer
self._h2r_weight_initializer = h2r_weight_initializer
self._lstm_state_clip_min = lstm_state_clip_min
self._lstm_state_clip_max = lstm_state_clip_max
self._lstm_state_clip_nan = lstm_state_clip_nan
self._gates = {'rnn_relu': 1, 'rnn_tanh': 1, 'lstm': 4, 'gru': 3}[mode]
ng, ni, nh = self._gates, input_size, hidden_size
if not projection_size:
for i in range(num_layers):
for j in ['l', 'r'][:self._dir]:
self._register_param('{}{}_i2h_weight'.format(j, i),
shape=(ng*nh, ni),
init=i2h_weight_initializer)
self._register_param('{}{}_h2h_weight'.format(j, i),
shape=(ng*nh, nh),
init=h2h_weight_initializer)
self._register_param('{}{}_i2h_bias'.format(j, i),
shape=(ng*nh,),
init=i2h_bias_initializer)
self._register_param('{}{}_h2h_bias'.format(j, i),
shape=(ng*nh,),
init=h2h_bias_initializer)
ni = nh * self._dir
else:
np = self._projection_size
for i in range(num_layers):
for j in ['l', 'r'][:self._dir]:
self._register_param('{}{}_i2h_weight'.format(j, i),
shape=(ng*nh, ni),
init=i2h_weight_initializer)
self._register_param('{}{}_h2h_weight'.format(j, i),
shape=(ng*nh, np),
init=h2h_weight_initializer)
self._register_param('{}{}_i2h_bias'.format(j, i),
shape=(ng*nh,),
init=i2h_bias_initializer)
self._register_param('{}{}_h2h_bias'.format(j, i),
shape=(ng*nh,),
init=h2h_bias_initializer)
self._register_param('{}{}_h2r_weight'.format(j, i),
shape=(np, nh),
init=h2r_weight_initializer)
ni = np * self._dir
def _register_param(self, name, shape, init):
p = self.params.get(name, shape=shape, init=init,
allow_deferred_init=True)
setattr(self, name, p)
return p
def __repr__(self):
s = '{name}({mapping}, {_layout}'
if self._num_layers != 1:
s += ', num_layers={_num_layers}'
if self._dropout != 0:
s += ', dropout={_dropout}'
if self._dir == 2:
s += ', bidirectional'
s += ')'
shape = self.l0_i2h_weight.shape
mapping = '{0} -> {1}'.format(shape[1] if shape[1] else None, shape[0] // self._gates)
return s.format(name=self.__class__.__name__,
mapping=mapping,
**self.__dict__)
def _collect_params_with_prefix(self, prefix=''):
if prefix:
prefix += '.'
pattern = re.compile(r'(l|r)(\d)_(i2h|h2h)_(weight|bias)\Z')
def convert_key(m, bidirectional): # for compatibility with old parameter format
d, l, g, t = [m.group(i) for i in range(1, 5)]
if bidirectional:
return '_unfused.{}.{}_cell.{}_{}'.format(l, d, g, t)
else:
return '_unfused.{}.{}_{}'.format(l, g, t)
bidirectional = any(pattern.match(k).group(1) == 'r' for k in self._reg_params)
ret = {prefix + convert_key(pattern.match(key), bidirectional) : val
for key, val in self._reg_params.items()}
for name, child in self._children.items():
ret.update(child._collect_params_with_prefix(prefix + name))
return ret
def state_info(self, batch_size=0):
raise NotImplementedError
def _unfuse(self):
"""Unfuses the fused RNN in to a stack of rnn cells."""
assert not self._projection_size, "_unfuse does not support projection layer yet!"
assert not self._lstm_state_clip_min and not self._lstm_state_clip_max, \
"_unfuse does not support state clipping yet!"
get_cell = {'rnn_relu': lambda **kwargs: rnn_cell.RNNCell(self._hidden_size,
activation='relu',
**kwargs),
'rnn_tanh': lambda **kwargs: rnn_cell.RNNCell(self._hidden_size,
activation='tanh',
**kwargs),
'lstm': lambda **kwargs: rnn_cell.LSTMCell(self._hidden_size,
**kwargs),
'gru': lambda **kwargs: rnn_cell.GRUCell(self._hidden_size,
**kwargs)}[self._mode]
stack = rnn_cell.HybridSequentialRNNCell(prefix=self.prefix, params=self.params)
with stack.name_scope():
ni = self._input_size
for i in range(self._num_layers):
kwargs = {'input_size': ni,
'i2h_weight_initializer': self._i2h_weight_initializer,
'h2h_weight_initializer': self._h2h_weight_initializer,
'i2h_bias_initializer': self._i2h_bias_initializer,
'h2h_bias_initializer': self._h2h_bias_initializer}
if self._dir == 2:
stack.add(rnn_cell.BidirectionalCell(
get_cell(prefix='l%d_'%i, **kwargs),
get_cell(prefix='r%d_'%i, **kwargs)))
else:
stack.add(get_cell(prefix='l%d_'%i, **kwargs))
if self._dropout > 0 and i != self._num_layers - 1:
stack.add(rnn_cell.DropoutCell(self._dropout))
ni = self._hidden_size * self._dir
return stack
def begin_state(self, batch_size=0, func=ndarray.zeros, **kwargs):
"""Initial state for this cell.
Parameters
----------
batch_size: int
Only required for `NDArray` API. Size of the batch ('N' in layout).
Dimension of the input.
func : callable, default `ndarray.zeros`
Function for creating initial state.
For Symbol API, func can be `symbol.zeros`, `symbol.uniform`,
`symbol.var` etc. Use `symbol.var` if you want to directly
feed input as states.
For NDArray API, func can be `ndarray.zeros`, `ndarray.ones`, etc.
**kwargs :
Additional keyword arguments passed to func. For example
`mean`, `std`, `dtype`, etc.
Returns
-------
states : nested list of Symbol
Starting states for the first RNN step.
"""
states = []
for i, info in enumerate(self.state_info(batch_size)):
if info is not None:
info.update(kwargs)
else:
info = kwargs
states.append(func(name='%sh0_%d'%(self.prefix, i), **info))
return states
def hybrid_forward(self, F, inputs, states=None, **kwargs):
if F is ndarray:
batch_size = inputs.shape[self._layout.find('N')]
skip_states = states is None
if skip_states:
if F is ndarray:
states = self.begin_state(batch_size, ctx=inputs.context, dtype=inputs.dtype)
else:
states = self.begin_state(0, func=symbol.zeros)
if isinstance(states, tensor_types):
states = [states]
if F is ndarray:
for state, info in zip(states, self.state_info(batch_size)):
if state.shape != info['shape']:
raise ValueError(
"Invalid recurrent state shape. Expecting %s, got %s."%(
str(info['shape']), str(state.shape)))
out = self._forward_kernel(F, inputs, states, **kwargs)
# out is (output, state)
return out[0] if skip_states else out
def _forward_kernel(self, F, inputs, states, **kwargs):
""" forward using CUDNN or CPU kenrel"""
if self._layout == 'NTC':
inputs = F.swapaxes(inputs, dim1=0, dim2=1)
if self._projection_size is None:
params = (kwargs['{}{}_{}_{}'.format(d, l, g, t)].reshape(-1)
for t in ['weight', 'bias']
for l in range(self._num_layers)
for d in ['l', 'r'][:self._dir]
for g in ['i2h', 'h2h'])
else:
params = (kwargs['{}{}_{}_{}'.format(d, l, g, t)].reshape(-1)
for t in ['weight', 'bias']
for l in range(self._num_layers)
for d in ['l', 'r'][:self._dir]
for g in ['i2h', 'h2h', 'h2r']
if g != 'h2r' or t != 'bias')
params = F._internal._rnn_param_concat(*params, dim=0)
rnn = F.RNN(inputs, params, *states, state_size=self._hidden_size,
projection_size=self._projection_size,
num_layers=self._num_layers, bidirectional=self._dir == 2,
p=self._dropout, state_outputs=True, mode=self._mode,
lstm_state_clip_min=self._lstm_state_clip_min,
lstm_state_clip_max=self._lstm_state_clip_max,
lstm_state_clip_nan=self._lstm_state_clip_nan)
if self._mode == 'lstm':
outputs, states = rnn[0], [rnn[1], rnn[2]]
else:
outputs, states = rnn[0], [rnn[1]]
if self._layout == 'NTC':
outputs = F.swapaxes(outputs, dim1=0, dim2=1)
return outputs, states
[docs]class RNN(_RNNLayer):
r"""Applies a multi-layer Elman RNN with `tanh` or `ReLU` non-linearity to an input sequence.
For each element in the input sequence, each layer computes the following
function:
.. math::
h_t = \tanh(w_{ih} * x_t + b_{ih} + w_{hh} * h_{(t-1)} + b_{hh})
where :math:`h_t` is the hidden state at time `t`, and :math:`x_t` is the output
of the previous layer at time `t` or :math:`input_t` for the first layer.
If nonlinearity='relu', then `ReLU` is used instead of `tanh`.
Parameters
----------
hidden_size: int
The number of features in the hidden state h.
num_layers: int, default 1
Number of recurrent layers.
activation: {'relu' or 'tanh'}, default 'relu'
The activation function to use.
layout : str, default 'TNC'
The format of input and output tensors. T, N and C stand for
sequence length, batch size, and feature dimensions respectively.
dropout: float, default 0
If non-zero, introduces a dropout layer on the outputs of each
RNN layer except the last layer.
bidirectional: bool, default False
If `True`, becomes a bidirectional RNN.
i2h_weight_initializer : str or Initializer
Initializer for the input weights matrix, used for the linear
transformation of the inputs.
h2h_weight_initializer : str or Initializer
Initializer for the recurrent weights matrix, used for the linear
transformation of the recurrent state.
i2h_bias_initializer : str or Initializer
Initializer for the bias vector.
h2h_bias_initializer : str or Initializer
Initializer for the bias vector.
input_size: int, default 0
The number of expected features in the input x.
If not specified, it will be inferred from input.
prefix : str or None
Prefix of this `Block`.
params : ParameterDict or None
Shared Parameters for this `Block`.
Inputs:
- **data**: input tensor with shape `(sequence_length, batch_size, input_size)`
when `layout` is "TNC". For other layouts, dimensions are permuted accordingly
using transpose() operator which adds performance overhead. Consider creating
batches in TNC layout during data batching step.
- **states**: initial recurrent state tensor with shape
`(num_layers, batch_size, num_hidden)`. If `bidirectional` is True,
shape will instead be `(2*num_layers, batch_size, num_hidden)`. If
`states` is None, zeros will be used as default begin states.
Outputs:
- **out**: output tensor with shape `(sequence_length, batch_size, num_hidden)`
when `layout` is "TNC". If `bidirectional` is True, output shape will instead
be `(sequence_length, batch_size, 2*num_hidden)`
- **out_states**: output recurrent state tensor with the same shape as `states`.
If `states` is None `out_states` will not be returned.
Examples
--------
>>> layer = mx.gluon.rnn.RNN(100, 3)
>>> layer.initialize()
>>> input = mx.nd.random.uniform(shape=(5, 3, 10))
>>> # by default zeros are used as begin state
>>> output = layer(input)
>>> # manually specify begin state.
>>> h0 = mx.nd.random.uniform(shape=(3, 3, 100))
>>> output, hn = layer(input, h0)
"""
def __init__(self, hidden_size, num_layers=1, activation='relu',
layout='TNC', dropout=0, bidirectional=False,
i2h_weight_initializer=None, h2h_weight_initializer=None,
i2h_bias_initializer='zeros', h2h_bias_initializer='zeros',
input_size=0, **kwargs):
super(RNN, self).__init__(hidden_size, num_layers, layout,
dropout, bidirectional, input_size,
i2h_weight_initializer, h2h_weight_initializer,
i2h_bias_initializer, h2h_bias_initializer,
'rnn_'+activation, None, None, None, None, False,
**kwargs)
def state_info(self, batch_size=0):
return [{'shape': (self._num_layers * self._dir, batch_size, self._hidden_size),
'__layout__': 'LNC'}]
[docs]class LSTM(_RNNLayer):
r"""Applies a multi-layer long short-term memory (LSTM) RNN to an input sequence.
For each element in the input sequence, each layer computes the following
function:
.. math::
\begin{array}{ll}
i_t = sigmoid(W_{ii} x_t + b_{ii} + W_{hi} h_{(t-1)} + b_{hi}) \\
f_t = sigmoid(W_{if} x_t + b_{if} + W_{hf} h_{(t-1)} + b_{hf}) \\
g_t = \tanh(W_{ig} x_t + b_{ig} + W_{hc} h_{(t-1)} + b_{hg}) \\
o_t = sigmoid(W_{io} x_t + b_{io} + W_{ho} h_{(t-1)} + b_{ho}) \\
c_t = f_t * c_{(t-1)} + i_t * g_t \\
h_t = o_t * \tanh(c_t)
\end{array}
where :math:`h_t` is the hidden state at time `t`, :math:`c_t` is the
cell state at time `t`, :math:`x_t` is the hidden state of the previous
layer at time `t` or :math:`input_t` for the first layer, and :math:`i_t`,
:math:`f_t`, :math:`g_t`, :math:`o_t` are the input, forget, cell, and
out gates, respectively.
Parameters
----------
hidden_size: int
The number of features in the hidden state h.
num_layers: int, default 1
Number of recurrent layers.
layout : str, default 'TNC'
The format of input and output tensors. T, N and C stand for
sequence length, batch size, and feature dimensions respectively.
dropout: float, default 0
If non-zero, introduces a dropout layer on the outputs of each
RNN layer except the last layer.
bidirectional: bool, default False
If `True`, becomes a bidirectional RNN.
i2h_weight_initializer : str or Initializer
Initializer for the input weights matrix, used for the linear
transformation of the inputs.
h2h_weight_initializer : str or Initializer
Initializer for the recurrent weights matrix, used for the linear
transformation of the recurrent state.
i2h_bias_initializer : str or Initializer, default 'lstmbias'
Initializer for the bias vector. By default, bias for the forget
gate is initialized to 1 while all other biases are initialized
to zero.
h2h_bias_initializer : str or Initializer
Initializer for the bias vector.
projection_size: int, default None
The number of features after projection.
h2r_weight_initializer : str or Initializer, default None
Initializer for the projected recurrent weights matrix, used for the linear
transformation of the recurrent state to the projected space.
state_clip_min : float or None, default None
Minimum clip value of LSTM states. This option must be used together with
state_clip_max. If None, clipping is not applied.
state_clip_max : float or None, default None
Maximum clip value of LSTM states. This option must be used together with
state_clip_min. If None, clipping is not applied.
state_clip_nan : boolean, default False
Whether to stop NaN from propagating in state by clipping it to min/max.
If the clipping range is not specified, this option is ignored.
input_size: int, default 0
The number of expected features in the input x.
If not specified, it will be inferred from input.
prefix : str or None
Prefix of this `Block`.
params : `ParameterDict` or `None`
Shared Parameters for this `Block`.
Inputs:
- **data**: input tensor with shape `(sequence_length, batch_size, input_size)`
when `layout` is "TNC". For other layouts, dimensions are permuted accordingly
using transpose() operator which adds performance overhead. Consider creating
batches in TNC layout during data batching step.
- **states**: a list of two initial recurrent state tensors. Each has shape
`(num_layers, batch_size, num_hidden)`. If `bidirectional` is True,
shape will instead be `(2*num_layers, batch_size, num_hidden)`. If
`states` is None, zeros will be used as default begin states.
Outputs:
- **out**: output tensor with shape `(sequence_length, batch_size, num_hidden)`
when `layout` is "TNC". If `bidirectional` is True, output shape will instead
be `(sequence_length, batch_size, 2*num_hidden)`
- **out_states**: a list of two output recurrent state tensors with the same
shape as in `states`. If `states` is None `out_states` will not be returned.
Examples
--------
>>> layer = mx.gluon.rnn.LSTM(100, 3)
>>> layer.initialize()
>>> input = mx.nd.random.uniform(shape=(5, 3, 10))
>>> # by default zeros are used as begin state
>>> output = layer(input)
>>> # manually specify begin state.
>>> h0 = mx.nd.random.uniform(shape=(3, 3, 100))
>>> c0 = mx.nd.random.uniform(shape=(3, 3, 100))
>>> output, hn = layer(input, [h0, c0])
"""
def __init__(self, hidden_size, num_layers=1, layout='TNC',
dropout=0, bidirectional=False, input_size=0,
i2h_weight_initializer=None, h2h_weight_initializer=None,
i2h_bias_initializer='zeros', h2h_bias_initializer='zeros',
projection_size=None, h2r_weight_initializer=None,
state_clip_min=None, state_clip_max=None, state_clip_nan=False,
**kwargs):
super(LSTM, self).__init__(hidden_size, num_layers, layout,
dropout, bidirectional, input_size,
i2h_weight_initializer, h2h_weight_initializer,
i2h_bias_initializer, h2h_bias_initializer,
'lstm', projection_size, h2r_weight_initializer,
state_clip_min, state_clip_max, state_clip_nan,
**kwargs)
def state_info(self, batch_size=0):
if self._projection_size is None:
return [{'shape': (self._num_layers * self._dir, batch_size, self._hidden_size),
'__layout__': 'LNC'},
{'shape': (self._num_layers * self._dir, batch_size, self._hidden_size),
'__layout__': 'LNC'}]
else:
return [{'shape': (self._num_layers * self._dir, batch_size, self._projection_size),
'__layout__': 'LNC'},
{'shape': (self._num_layers * self._dir, batch_size, self._hidden_size),
'__layout__': 'LNC'}]
[docs]class GRU(_RNNLayer):
r"""Applies a multi-layer gated recurrent unit (GRU) RNN to an input sequence.
Note: this is an implementation of the cuDNN version of GRUs
(slight modification compared to Cho et al. 2014; the reset gate :math:`r_t`
is applied after matrix multiplication).
For each element in the input sequence, each layer computes the following
function:
.. math::
\begin{array}{ll}
r_t = sigmoid(W_{ir} x_t + b_{ir} + W_{hr} h_{(t-1)} + b_{hr}) \\
i_t = sigmoid(W_{ii} x_t + b_{ii} + W_{hi} h_{(t-1)} + b_{hi}) \\
n_t = \tanh(W_{in} x_t + b_{in} + r_t * (W_{hn} h_{(t-1)} + b_{hn})) \\
h_t = (1 - i_t) * n_t + i_t * h_{(t-1)} \\
\end{array}
where :math:`h_t` is the hidden state at time `t`, :math:`x_t` is the hidden
state of the previous layer at time `t` or :math:`input_t` for the first layer,
and :math:`r_t`, :math:`i_t`, :math:`n_t` are the reset, input, and new gates, respectively.
Parameters
----------
hidden_size: int
The number of features in the hidden state h
num_layers: int, default 1
Number of recurrent layers.
layout : str, default 'TNC'
The format of input and output tensors. T, N and C stand for
sequence length, batch size, and feature dimensions respectively.
dropout: float, default 0
If non-zero, introduces a dropout layer on the outputs of each
RNN layer except the last layer
bidirectional: bool, default False
If True, becomes a bidirectional RNN.
i2h_weight_initializer : str or Initializer
Initializer for the input weights matrix, used for the linear
transformation of the inputs.
h2h_weight_initializer : str or Initializer
Initializer for the recurrent weights matrix, used for the linear
transformation of the recurrent state.
i2h_bias_initializer : str or Initializer
Initializer for the bias vector.
h2h_bias_initializer : str or Initializer
Initializer for the bias vector.
input_size: int, default 0
The number of expected features in the input x.
If not specified, it will be inferred from input.
prefix : str or None
Prefix of this `Block`.
params : ParameterDict or None
Shared Parameters for this `Block`.
Inputs:
- **data**: input tensor with shape `(sequence_length, batch_size, input_size)`
when `layout` is "TNC". For other layouts, dimensions are permuted accordingly
using transpose() operator which adds performance overhead. Consider creating
batches in TNC layout during data batching step.
- **states**: initial recurrent state tensor with shape
`(num_layers, batch_size, num_hidden)`. If `bidirectional` is True,
shape will instead be `(2*num_layers, batch_size, num_hidden)`. If
`states` is None, zeros will be used as default begin states.
Outputs:
- **out**: output tensor with shape `(sequence_length, batch_size, num_hidden)`
when `layout` is "TNC". If `bidirectional` is True, output shape will instead
be `(sequence_length, batch_size, 2*num_hidden)`
- **out_states**: output recurrent state tensor with the same shape as `states`.
If `states` is None `out_states` will not be returned.
Examples
--------
>>> layer = mx.gluon.rnn.GRU(100, 3)
>>> layer.initialize()
>>> input = mx.nd.random.uniform(shape=(5, 3, 10))
>>> # by default zeros are used as begin state
>>> output = layer(input)
>>> # manually specify begin state.
>>> h0 = mx.nd.random.uniform(shape=(3, 3, 100))
>>> output, hn = layer(input, h0)
"""
def __init__(self, hidden_size, num_layers=1, layout='TNC',
dropout=0, bidirectional=False, input_size=0,
i2h_weight_initializer=None, h2h_weight_initializer=None,
i2h_bias_initializer='zeros', h2h_bias_initializer='zeros',
**kwargs):
super(GRU, self).__init__(hidden_size, num_layers, layout,
dropout, bidirectional, input_size,
i2h_weight_initializer, h2h_weight_initializer,
i2h_bias_initializer, h2h_bias_initializer,
'gru', None, None, None, None, False,
**kwargs)
def state_info(self, batch_size=0):
return [{'shape': (self._num_layers * self._dir, batch_size, self._hidden_size),
'__layout__': 'LNC'}]