From 1a24fb29c13da3ef57e5fc41add3fd60cf413996 Mon Sep 17 00:00:00 2001 From: Megvii Engine Team Date: Tue, 22 Sep 2020 15:51:21 +0800 Subject: [PATCH] perf(mge/allreduce): put allreduce on another cuda stream GitOrigin-RevId: 2e778dfa0444ac2c2870b9dcfa72cfe7271fbc1a --- imperative/python/megengine/distributed/helper.py | 3 +++ imperative/python/megengine/functional/param_pack.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/imperative/python/megengine/distributed/helper.py b/imperative/python/megengine/distributed/helper.py index 05db40c1..81cfc77b 100644 --- a/imperative/python/megengine/distributed/helper.py +++ b/imperative/python/megengine/distributed/helper.py @@ -88,6 +88,7 @@ class AllreduceCallback: self._futures_dict = dict() self._packing_list = defaultdict(list) self._packing_size = defaultdict(int) + self._grad_origin_device = dict() def _pack(self, dtype): grad_list = [self._gradients_dict[p] for p in self._packing_list[dtype]] @@ -109,6 +110,7 @@ class AllreduceCallback: self._params.append(param) self._futures_dict[param] = TensorFuture(ack=False) self._gradients_dict[param] = grad + self._grad_origin_device[param] = str(grad.device) dtype_str = str(np.dtype(param.dtype)) dtype_size = np.dtype(param.dtype).itemsize @@ -123,6 +125,7 @@ class AllreduceCallback: self._pack(dtype) for param in self._params: grad = self._gradients_dict[param] + grad = copy(grad, self._grad_origin_device[param]) self._futures_dict[param].set(grad) self._reset() diff --git a/imperative/python/megengine/functional/param_pack.py b/imperative/python/megengine/functional/param_pack.py index d7d52085..0ad3a11b 100644 --- a/imperative/python/megengine/functional/param_pack.py +++ b/imperative/python/megengine/functional/param_pack.py @@ -27,7 +27,7 @@ def pack_allreduce_split(pack_list, shapes, group, reduce_method): offsets_val = get_offsets(shapes) offsets = Tensor(offsets_val) packed_grads = param_pack_concat(pack_list, offsets, offsets_val) - packed_grads = all_reduce_sum(packed_grads, group) + packed_grads = all_reduce_sum(packed_grads, group, group.comp_node) if reduce_method == "mean": packed_grads /= group.size grads = param_pack_split(packed_grads, offsets_val, shapes)