KVStore API¶
Basic Push and Pull¶
Provides basic operation over multiple devices (GPUs) on a single device.
Initialization¶
Let’s consider a simple example. It initializes
a (int
, NDArray
) pair into the store, and then pulls the value out.
>>> kv = mx.kv.create('local') # create a local kv store.
>>> shape = (2,3)
>>> kv.init(3, mx.nd.ones(shape)*2)
>>> a = mx.nd.zeros(shape)
>>> kv.pull(3, out = a)
>>> print a.asnumpy()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
Push, Aggregation, and Updater¶
For any key that’s been initialized, you can push a new value with the same shape to the key, as follows:
>>> kv.push(3, mx.nd.ones(shape)*8)
>>> kv.pull(3, out = a) # pull out the value
>>> print a.asnumpy()
[[ 8. 8. 8.]
[ 8. 8. 8.]]
The data that you want to push can be stored on any device. Furthermore, you can push multiple values into the same key, where KVStore first sums all of these values, and then pushes the aggregated value, as follows:
>>> gpus = [mx.gpu(i) for i in range(4)]
>>> b = [mx.nd.ones(shape, gpu) for gpu in gpus]
>>> kv.push(3, b)
>>> kv.pull(3, out = a)
>>> print a.asnumpy()
[[ 4. 4. 4.]
[ 4. 4. 4.]]
For each push command, KVStore applies the pushed value to the value stored by an
updater
. The default updater is ASSIGN
. You can replace the default to
control how data is merged.
>>> def update(key, input, stored):
>>> print "update on key: %d" % key
>>> stored += input * 2
>>> kv._set_updater(update)
>>> kv.pull(3, out=a)
>>> print a.asnumpy()
[[ 4. 4. 4.]
[ 4. 4. 4.]]
>>> kv.push(3, mx.nd.ones(shape))
update on key: 3
>>> kv.pull(3, out=a)
>>> print a.asnumpy()
[[ 6. 6. 6.]
[ 6. 6. 6.]]
Pull¶
You’ve already seen how to pull a single key-value pair. Similar to the way that you use the push command, you can pull the value into several devices with a single call.
>>> b = [mx.nd.ones(shape, gpu) for gpu in gpus]
>>> kv.pull(3, out = b)
>>> print b[1].asnumpy()
[[ 6. 6. 6.]
[ 6. 6. 6.]]
List Key-Value Pairs¶
All of the operations that we’ve discussed so far are performed on a single key. KVStore also provides the interface for generating a list of key-value pairs. For a single device, use the following:
>>> keys = [5, 7, 9]
>>> kv.init(keys, [mx.nd.ones(shape)]*len(keys))
>>> kv.push(keys, [mx.nd.ones(shape)]*len(keys))
update on key: 5
update on key: 7
update on key: 9
>>> b = [mx.nd.zeros(shape)]*len(keys)
>>> kv.pull(keys, out = b)
>>> print b[1].asnumpy()
[[ 3. 3. 3.]
[ 3. 3. 3.]]
For multiple devices:
>>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys)
>>> kv.push(keys, b)
update on key: 5
update on key: 7
update on key: 9
>>> kv.pull(keys, out = b)
>>> print b[1][1].asnumpy()
[[ 11. 11. 11.]
[ 11. 11. 11.]]
API Reference¶
Key value store interface of MXNet for parameter synchronization.
-
class
mxnet.kvstore.
KVStore
(handle)[source]¶ A key-value store for synchronization of values, over multiple devices.
-
init
(key, value)[source]¶ Initializes a single or a sequence of key-value pairs into the store.
For each key, one must init it before calling push or pull. When multiple workers invoke init for the same key, only the value supplied by worker with rank 0 is used. This function returns after data has been initialized successfully.
Parameters: - key (str, int, or sequence of str or int) – The keys.
- value (NDArray, RowSparseNDArray or sequence of NDArray or RowSparseNDArray) – Values corresponding to the keys.
Examples
>>> # init a single key-value pair >>> shape = (2,3) >>> kv = mx.kv.create('local') >>> kv.init('3', mx.nd.ones(shape)*2) >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]]
>>> # init a list of key-value pairs >>> keys = ['5', '7', '9'] >>> kv.init(keys, [mx.nd.ones(shape)]*len(keys))
>>> # init a row_sparse value >>> kv.init('4', mx.nd.ones(shape).tostype('row_sparse')) >>> b = mx.nd.sparse.zeros('row_sparse', shape) >>> kv.row_sparse_pull('4', row_ids=mx.nd.array([0, 1]), out=b) >>> print b
-
push
(key, value, priority=0)[source]¶ Pushes a single or a sequence of key-value pairs into the store.
This function returns immediately after adding an operator to the engine. The actual operation is executed asynchronously. If there are consecutive pushes to the same key, there is no guarantee on the serialization of pushes. The execution of a push does not guarantee that all previous pushes are finished. There is no synchronization between workers. One can use
_barrier()
to sync all workers.Parameters: - key (str, int, or sequence of str or int) – Keys.
- value (NDArray, RowSparseNDArray, list of NDArray or RowSparseNDArray,) – or list of list of NDArray or RowSparseNDArray Values corresponding to the keys.
- priority (int, optional) – The priority of the push operation. Higher priority push operations are likely to be executed before other push actions.
Examples
>>> # push a single key-value pair >>> kv.push('3', mx.nd.ones(shape)*8) >>> kv.pull('3', out=a) # pull out the value >>> print a.asnumpy() [[ 8. 8. 8.] [ 8. 8. 8.]]
>>> # aggregate the value and the push >>> gpus = [mx.gpu(i) for i in range(4)] >>> b = [mx.nd.ones(shape, gpu) for gpu in gpus] >>> kv.push('3', b) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]]
>>> # push a list of keys. >>> # single device >>> keys = ['4', '5', '6'] >>> kv.push(keys, [mx.nd.ones(shape)]*len(keys)) >>> b = [mx.nd.zeros(shape)]*len(keys) >>> kv.pull(keys, out=b) >>> print b[1].asnumpy() [[ 1. 1. 1.] [ 1. 1. 1.]]
>>> # multiple devices: >>> keys = ['7', '8', '9'] >>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys) >>> kv.push(keys, b) >>> kv.pull(keys, out=b) >>> print b[1][1].asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]]
>>> # push a row_sparse value >>> b = mx.nd.sparse.zeros('row_sparse', shape) >>> kv.init('10', mx.nd.sparse.zeros('row_sparse', shape)) >>> kv.push('10', mx.nd.ones(shape).tostype('row_sparse')) >>> # pull out the value >>> kv.row_sparse_pull('10', row_ids=mx.nd.array([0, 1]), out=b) >>> print b
-
pull
(key, out=None, priority=0)[source]¶ Pulls a single value or a sequence of values from the store.
This function returns immediately after adding an operator to the engine. Subsequent attempts to read from the out variable will be blocked until the pull operation completes.
pull is executed asynchronously after all previous pull calls and only the last push call for the same input key(s) are finished.
The returned values are guaranteed to be the latest values in the store.
For RowSparseNDArray values, this call is ignored, please use
row_sparse_pull
instead.Parameters: - key (str, int, or sequence of str or int) – Keys.
- out (NDArray or list of NDArray or list of list of NDArray) – Values corresponding to the keys.
- priority (int, optional) – The priority of the pull operation. Higher priority pull operations are likely to be executed before other pull actions.
Examples
>>> # pull a single key-value pair >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]]
>>> # pull into multiple devices >>> b = [mx.nd.ones(shape, gpu) for gpu in gpus] >>> kv.pull('3', out=b) >>> print b[1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]]
>>> # pull a list of key-value pairs. >>> # On single device >>> keys = ['5', '7', '9'] >>> b = [mx.nd.zeros(shape)]*len(keys) >>> kv.pull(keys, out=b) >>> print b[1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # On multiple devices >>> keys = ['6', '8', '10'] >>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys) >>> kv.pull(keys, out=b) >>> print b[1][1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]]
-
row_sparse_pull
(key, out=None, priority=0, row_ids=None)[source]¶ Pulls a single RowSparseNDArray value or a sequence of RowSparseNDArray values from the store with specified row_ids.
row_sparse_pull is executed asynchronously after all previous pull/row_sparse_pull calls and the last push call for the same input key(s) are finished.
The returned values are guaranteed to be the latest values in the store.
Parameters: - key (str, int, or sequence of str or int) – Keys.
- out (RowSparseNDArray or list of RowSparseNDArray or list of list of RowSparseNDArray) – Values corresponding to the keys. The stype is expected to be row_sparse
- priority (int, optional) – The priority of the pull operation. Higher priority pull operations are likely to be executed before other pull actions.
- row_ids (NDArray or list of NDArray) – The row_ids for which to pull for each value. Each row_id is an 1D NDArray whose values don’t have to be unique nor sorted.
Examples
>>> shape = (3, 3) >>> kv.init('3', mx.nd.ones(shape).tostype('row_sparse')) >>> a = mx.nd.sparse.zeros('row_sparse', shape) >>> row_ids = mx.nd.array([0, 2], dtype='int64') >>> kv.row_sparse_pull('3', out=a, row_ids=row_ids) >>> print a.asnumpy() [[ 1. 1. 1.] [ 0. 0. 0.] [ 1. 1. 1.]] >>> duplicate_row_ids = mx.nd.array([2, 2], dtype='int64') >>> kv.row_sparse_pull('3', out=a, row_ids=duplicate_row_ids) >>> print a.asnumpy() [[ 0. 0. 0.] [ 0. 0. 0.] [ 1. 1. 1.]] >>> unsorted_row_ids = mx.nd.array([1, 0], dtype='int64') >>> kv.row_sparse_pull('3', out=a, row_ids=unsorted_row_ids) >>> print a.asnumpy() [[ 1. 1. 1.] [ 1. 1. 1.] [ 0. 0. 0.]]
-
set_optimizer
(optimizer)[source]¶ Registers an optimizer with the kvstore.
When using a single machine, this function updates the local optimizer. If using multiple machines and this operation is invoked from a worker node, it will serialized the optimizer with pickle and send it to all servers. The function returns after all servers have been updated.
Parameters: optimizer (Optimizer) – The new optimizer for the store Examples
>>> kv = mx.kv.create() >>> shape = (2, 2) >>> weight = mx.nd.zeros(shape) >>> kv.init(3, weight) >>> # set the optimizer for kvstore as the default SGD optimizer >>> kv.set_optimizer(mx.optimizer.SGD()) >>> grad = mx.nd.ones(shape) >>> kv.push(3, grad) >>> kv.pull(3, out = weight) >>> # weight is updated via gradient descent >>> weight.asnumpy() array([[-0.01, -0.01], [-0.01, -0.01]], dtype=float32)
-
type
¶ Returns the type of this kvstore.
Returns: type – the string type Return type: str
-
rank
¶ Returns the rank of this worker node.
Returns: rank – The rank of this node, which is in range [0, num_workers()) Return type: int
-
num_workers
¶ Returns the number of worker nodes.
Returns: size – The number of worker nodes. Return type: int
-
save_optimizer_states
(fname, dump_optimizer=False)[source]¶ Saves the optimizer (updater) state to a file. This is often used when checkpointing the model during training.
Parameters: - fname (str) – Path to the output states file.
- dump_optimizer (bool, default False) – Whether to also save the optimizer itself. This would also save optimizer information such as learning rate and weight decay schedules.
-
-
mxnet.kvstore.
create
(name='local')[source]¶ Creates a new KVStore.
For single machine training, there are two commonly used types:
local
: Copies all gradients to CPU memory and updates weights there.device
: Aggregates gradients and updates weights on GPUs. With this setting, the KVStore also attempts to use GPU peer-to-peer communication, potentially accelerating the communication.For distributed training, KVStore also supports a number of types:
dist_sync
: Behaves similarly tolocal
but with one major difference. Withdist_sync
, batch-size now means the batch size used on each machine. So if there aren
machines and we use batch sizeb
, thendist_sync
behaves likelocal
with batch sizen * b
.dist_device_sync
: Identical todist_sync
with the difference similar todevice
vslocal
.dist_async
: Performs asynchronous updates. The weights are updated whenever gradients are received from any machine. No two updates happen on the same weight at the same time. However, the order is not guaranteed.Parameters: name ({'local', 'device', 'dist_sync', 'dist_device_sync', 'dist_async'}) – The type of KVStore. Returns: kv – The created KVStore. Return type: KVStore