|
- # -*- coding: utf-8 -*-
- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
- #
- # Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- import math
- import numbers
- from abc import abstractmethod
- from typing import Optional, Tuple
-
- import numpy as np
-
- from ..core._imperative_rt.core2 import apply
- from ..core.ops import builtin
- from ..core.ops.builtin import BatchNorm
- from ..functional import stack, zeros
- from ..tensor import Parameter, Tensor
- from . import init
- from .module import Module
-
-
- class RNNCellBase(Module):
- def __init__(
- self, input_size: int, hidden_size: int, bias: bool, num_chunks: int,
- ) -> None:
- # num_chunks indicates the number of gates
- super(RNNCellBase, self).__init__()
-
- self.input_size = input_size
- self.hidden_size = hidden_size
- self.bias = bias
-
- # initialize weights
- self.gate_hidden_size = num_chunks * hidden_size
- self.weight_ih = Parameter(
- np.zeros((self.gate_hidden_size, input_size), dtype=np.float32)
- )
- self.weight_hh = Parameter(
- np.zeros((self.gate_hidden_size, hidden_size), dtype=np.float32)
- )
- if bias:
- self.bias_ih = Parameter(
- np.zeros((self.gate_hidden_size), dtype=np.float32)
- )
- self.bias_hh = Parameter(
- np.zeros((self.gate_hidden_size), dtype=np.float32)
- )
- else:
- self.bias_ih = zeros(shape=(self.gate_hidden_size))
- self.bias_hh = zeros(shape=(self.gate_hidden_size))
- self.reset_parameters()
- # if bias is False self.bias will remain zero
-
- def reset_parameters(self) -> None:
- stdv = 1.0 / math.sqrt(self.hidden_size)
- for weight in self.parameters():
- init.uniform_(weight, -stdv, stdv)
-
- @abstractmethod
- def forward(self, input: Tensor, hx: Optional[Tensor] = None) -> Tensor:
- raise NotImplementedError("forward not implemented !")
-
-
- class RNNCell(RNNCellBase):
-
- r"""An Elman RNN cell with tanh or ReLU non-linearity.
-
- .. math::
-
- h' = \tanh(W_{ih} x + b_{ih} + W_{hh} h + b_{hh})
-
- If :attr:`nonlinearity` is `'relu'`, then ReLU is used in place of tanh.
-
- Args:
- input_size: The number of expected features in the input `x`
- hidden_size: The number of features in the hidden state `h`
- bias: If ``False``, then the layer does not use bias weights `b_ih` and `b_hh`.
- Default: ``True``
- nonlinearity: The non-linearity to use. Can be either ``'tanh'`` or ``'relu'``. Default: ``'tanh'``
-
- Inputs: input, hidden
- - **input** of shape `(batch, input_size)`: tensor containing input features
- - **hidden** of shape `(batch, hidden_size)`: tensor containing the initial hidden
- state for each element in the batch.
- Defaults to zero if not provided.
-
- Outputs: h'
- - **h'** of shape `(batch, hidden_size)`: tensor containing the next hidden state
- for each element in the batch
-
- Shape:
- - Input1: :math:`(N, H_{in})` tensor containing input features where
- :math:`H_{in}` = `input_size`
- - Input2: :math:`(N, H_{out})` tensor containing the initial hidden
- state for each element in the batch where :math:`H_{out}` = `hidden_size`
- Defaults to zero if not provided.
- - Output: :math:`(N, H_{out})` tensor containing the next hidden state
- for each element in the batch
-
-
- Examples:
-
- .. code-block::
-
- import numpy as np
- import megengine as mge
- import megengine.module as M
-
- m = M.RNNCell(10, 20)
- inp = mge.tensor(np.random.randn(3, 10), dtype=np.float32)
- hx = mge.tensor(np.random.randn(3, 20), dtype=np.float32)
- out = m(inp, hx)
- print(out.numpy().shape)
-
- Outputs:
-
- .. code-block::
-
- (3, 20)
-
- """
-
- def __init__(
- self,
- input_size: int,
- hidden_size: int,
- bias: bool = True,
- nonlinearity: str = "tanh",
- ) -> None:
- self.nonlinearity = nonlinearity
- super(RNNCell, self).__init__(input_size, hidden_size, bias, num_chunks=1)
-
- def forward(self, input: Tensor, hx: Optional[Tensor] = None) -> Tensor:
- if hx is None:
- hx = zeros(shape=(input.shape[0], self.gate_hidden_size),)
- op = builtin.RNNCell(nonlineMode=self.nonlinearity)
- return apply(
- op, input, self.weight_ih, self.bias_ih, hx, self.weight_hh, self.bias_hh
- )[0]
-
-
- class LSTMCell(RNNCellBase):
-
- r"""A long short-term memory (LSTM) cell.
-
- .. math::
-
- \begin{array}{ll}
- i = \sigma(W_{ii} x + b_{ii} + W_{hi} h + b_{hi}) \\
- f = \sigma(W_{if} x + b_{if} + W_{hf} h + b_{hf}) \\
- g = \tanh(W_{ig} x + b_{ig} + W_{hg} h + b_{hg}) \\
- o = \sigma(W_{io} x + b_{io} + W_{ho} h + b_{ho}) \\
- c' = f * c + i * g \\
- h' = o * \tanh(c') \\
- \end{array}
-
- where :math:`\sigma` is the sigmoid function, and :math:`*` is the Hadamard product.
-
- Args:
- input_size: The number of expected features in the input `x`
- hidden_size: The number of features in the hidden state `h`
- bias: If ``False``, then the layer does not use bias weights `b_ih` and
- `b_hh`. Default: ``True``
-
- Inputs: input, (h_0, c_0)
- - **input** of shape `(batch, input_size)`: tensor containing input features
- - **h_0** of shape `(batch, hidden_size)`: tensor containing the initial hidden
- state for each element in the batch.
- - **c_0** of shape `(batch, hidden_size)`: tensor containing the initial cell state
- for each element in the batch.
-
- If `(h_0, c_0)` is not provided, both **h_0** and **c_0** default to zero.
-
- Outputs: (h_1, c_1)
- - **h_1** of shape `(batch, hidden_size)`: tensor containing the next hidden state
- for each element in the batch
- - **c_1** of shape `(batch, hidden_size)`: tensor containing the next cell state
- for each element in the batch
-
- Examples:
-
- .. code-block::
-
- import numpy as np
- import megengine as mge
- import megengine.module as M
-
- m = M.LSTMCell(10, 20)
- inp = mge.tensor(np.random.randn(3, 10), dtype=np.float32)
- hx = mge.tensor(np.random.randn(3, 20), dtype=np.float32)
- cx = mge.tensor(np.random.randn(3, 20), dtype=np.float32)
- hy, cy = m(inp, (hx, cx))
- print(hy.numpy().shape)
- print(cy.numpy().shape)
-
- Outputs:
-
- .. code-block::
-
- (3, 20)
- (3, 20)
-
- """
-
- def __init__(self, input_size: int, hidden_size: int, bias: bool = True,) -> None:
- super(LSTMCell, self).__init__(input_size, hidden_size, bias, num_chunks=4)
-
- def forward(
- self, input: Tensor, hx: Optional[Tuple[Tensor, Tensor]] = None
- ) -> Tuple[Tensor, Tensor]:
- # hx: (h, c)
- if hx is None:
- h = zeros(shape=(input.shape[0], self.hidden_size))
- c = zeros(shape=(input.shape[0], self.hidden_size))
- else:
- h, c = hx
- op = builtin.LSTMCell()
- return apply(
- op, input, self.weight_ih, self.bias_ih, h, self.weight_hh, self.bias_hh, c
- )[:2]
-
-
- class RNNBase(Module):
- def __init__(
- self,
- input_size: int,
- hidden_size: int,
- num_layers: int = 1,
- bias: bool = True,
- batch_first: bool = False,
- dropout: float = 0,
- bidirectional: bool = False,
- proj_size: int = 0,
- ) -> None:
- super(RNNBase, self).__init__()
- self.input_size = input_size
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- self.bias = bias
- self.batch_first = batch_first
- self.dropout = float(dropout)
- self.bidirectional = bidirectional
- self.num_directions = 2 if self.bidirectional else 1
- self.proj_size = proj_size
-
- # check validity of dropout
- if (
- not isinstance(dropout, numbers.Number)
- or not 0 <= dropout <= 1
- or isinstance(dropout, bool)
- ):
- raise ValueError(
- "Dropout should be a float in [0, 1], which indicates the probability "
- "of an element to be zero"
- )
-
- if proj_size < 0:
- raise ValueError(
- "proj_size should be a positive integer or zero to disable projections"
- )
- elif proj_size >= hidden_size:
- raise ValueError("proj_size has to be smaller than hidden_size")
-
- self.cells = []
- for layer in range(self.num_layers):
- self.cells.append([])
- for _ in range(self.num_directions):
- self.cells[layer].append(self.create_cell(layer))
- # parameters have been initialized during the creation of the cells
- # if flatten, then delete cells
- self._flatten_parameters(self.cells)
-
- def _flatten_parameters(self, cells):
- gate_hidden_size = cells[0][0].gate_hidden_size
- size_dim1 = 0
- for layer in range(self.num_layers):
- for direction in range(self.num_directions):
- size_dim1 += cells[layer][direction].weight_ih.shape[1]
- size_dim1 += cells[layer][direction].weight_hh.shape[1]
- if self.bias:
- size_dim1 += 2 * self.num_directions * self.num_layers
-
- self._flatten_weights = Parameter(
- np.zeros((gate_hidden_size, size_dim1), dtype=np.float32)
- )
- self.reset_parameters()
-
- def reset_parameters(self) -> None:
- stdv = 1.0 / math.sqrt(self.hidden_size)
- for weight in self.parameters():
- init.uniform_(weight, -stdv, stdv)
-
- @abstractmethod
- def create_cell(self, layer):
- raise NotImplementedError("Cell not implemented !")
-
- @abstractmethod
- def init_hidden(self):
- raise NotImplementedError("init_hidden not implemented !")
-
- @abstractmethod
- def get_output_from_hidden(self, hx):
- raise NotImplementedError("get_output_from_hidden not implemented !")
-
- @abstractmethod
- def apply_op(self, input, hx):
- raise NotImplementedError("apply_op not implemented !")
-
- def _apply_fn_to_hx(self, hx, fn):
- return fn(hx)
-
- def _stack_h_n(self, h_n):
- return stack(h_n, axis=0)
-
- def forward(self, input: Tensor, hx=None):
- if self.batch_first:
- batch_size = input.shape[0]
- input = input.transpose((1, 0, 2)) # [seq_len, batch_size, dim]
- else:
- batch_size = input.shape[1]
- if hx is None:
- hx = self.init_hidden(batch_size)
-
- output, h = self.apply_op(input, hx)
- if self.batch_first:
- output = output.transpose((1, 0, 2))
- return output, h
-
-
- class RNN(RNNBase):
-
- r"""Applies a multi-layer Elman RNN with :math:`\tanh` or :math:`\text{ReLU}` non-linearity to an
- input sequence.
-
-
- For each element in the input sequence, each layer computes the following
- function:
-
- .. math::
- h_t = \tanh(W_{ih} x_t + b_{ih} + W_{hh} h_{(t-1)} + b_{hh})
-
- where :math:`h_t` is the hidden state at time `t`, :math:`x_t` is
- the input at time `t`, and :math:`h_{(t-1)}` is the hidden state of the
- previous layer at time `t-1` or the initial hidden state at time `0`.
- If :attr:`nonlinearity` is ``'relu'``, then :math:`\text{ReLU}` is used instead of :math:`\tanh`.
-
- Args:
- input_size: The number of expected features in the input `x`
- hidden_size: The number of features in the hidden state `h`
- num_layers: Number of recurrent layers. E.g., setting ``num_layers=2``
- would mean stacking two RNNs together to form a `stacked RNN`,
- with the second RNN taking in outputs of the first RNN and
- computing the final results. Default: 1
- nonlinearity: The non-linearity to use. Can be either ``'tanh'`` or ``'relu'``. Default: ``'tanh'``
- bias: If ``False``, then the layer does not use bias weights `b_ih` and `b_hh`.
- Default: ``True``
- batch_first: If ``True``, then the input and output tensors are provided
- as `(batch, seq, feature)` instead of `(seq, batch, feature)`.
- Note that this does not apply to hidden or cell states. See the
- Inputs/Outputs sections below for details. Default: ``False``
- dropout: If non-zero, introduces a `Dropout` layer on the outputs of each
- RNN layer except the last layer, with dropout probability equal to
- :attr:`dropout`. Default: 0
- bidirectional: If ``True``, becomes a bidirectional RNN. Default: ``False``
-
- Inputs: input, h_0
- * **input**: tensor of shape :math:`(L, N, H_{in})` when ``batch_first=False`` or
- :math:`(N, L, H_{in})` when ``batch_first=True`` containing the features of
- the input sequence. The input can also be a packed variable length sequence.
- See :func:`torch.nn.utils.rnn.pack_padded_sequence` or
- :func:`torch.nn.utils.rnn.pack_sequence` for details.
- * **h_0**: tensor of shape :math:`(D * \text{num\_layers}, N, H_{out})` containing the initial hidden
- state for each element in the batch. Defaults to zeros if not provided.
-
- where:
-
- .. math::
- \begin{aligned}
- N ={} & \text{batch size} \\
- L ={} & \text{sequence length} \\
- D ={} & 2 \text{ if bidirectional=True otherwise } 1 \\
- H_{in} ={} & \text{input\_size} \\
- H_{out} ={} & \text{hidden\_size}
- \end{aligned}
-
- Outputs: output, h_n
- * **output**: tensor of shape :math:`(L, N, D * H_{out})` when ``batch_first=False`` or
- :math:`(N, L, D * H_{out})` when ``batch_first=True`` containing the output features
- `(h_t)` from the last layer of the RNN, for each `t`. If a
- :class:`torch.nn.utils.rnn.PackedSequence` has been given as the input, the output
- will also be a packed sequence.
- * **h_n**: tensor of shape :math:`(D * \text{num\_layers}, N, H_{out})` containing the final hidden state
- for each element in the batch.
-
-
- Examples:
-
- .. code-block::
-
- import numpy as np
- import megengine as mge
- import megengine.module as M
-
- m = M.RNN(10,20,2,batch_first=False,nonlinearity="relu",bias=True,bidirectional=True)
- inp = mge.tensor(np.random.randn(6, 30, 10), dtype=np.float32)
- hx = mge.tensor(np.random.randn(4, 30, 20), dtype=np.float32)
- out, hn = m(inp, hx)
- print(out.numpy().shape)
-
- Outputs:
-
- .. code-block::
-
- (6, 30, 40)
-
- """
-
- def __init__(self, *args, **kwargs) -> None:
- self.nonlinearity = kwargs.pop("nonlinearity", "tanh")
- super(RNN, self).__init__(*args, **kwargs)
-
- def create_cell(self, layer):
- if layer == 0:
- input_size = self.input_size
- else:
- input_size = self.num_directions * self.hidden_size
- return RNNCell(input_size, self.hidden_size, self.bias, self.nonlinearity)
-
- def init_hidden(self, batch_size):
- hidden_shape = (
- self.num_directions * self.num_layers,
- batch_size,
- self.hidden_size,
- )
- return zeros(shape=hidden_shape)
-
- def get_output_from_hidden(self, hx):
- return hx
-
- def apply_op(self, input, hx):
- fwd_mode = (
- BatchNorm.FwdMode.TRAINING if self.training else BatchNorm.FwdMode.INFERENCE
- )
-
- op = builtin.RNN(
- num_layers=self.num_layers,
- bidirectional=self.bidirectional,
- bias=self.bias,
- hidden_size=self.hidden_size,
- dropout=self.dropout,
- nonlineMode=self.nonlinearity,
- fwd_mode=fwd_mode,
- )
- output, h = apply(op, input, hx, self._flatten_weights)[:2]
- output = output + h.sum() * 0
- h = h + output.sum() * 0
- return output, h
-
-
- class LSTM(RNNBase):
-
- r"""Applies a multi-layer long short-term memory LSTM to an input
- sequence.
-
-
- For each element in the input sequence, each layer computes the following
- function:
-
- .. math::
- \begin{array}{ll} \\
- i_t = \sigma(W_{ii} x_t + b_{ii} + W_{hi} h_{t-1} + b_{hi}) \\
- f_t = \sigma(W_{if} x_t + b_{if} + W_{hf} h_{t-1} + b_{hf}) \\
- g_t = \tanh(W_{ig} x_t + b_{ig} + W_{hg} h_{t-1} + b_{hg}) \\
- o_t = \sigma(W_{io} x_t + b_{io} + W_{ho} h_{t-1} + b_{ho}) \\
- c_t = f_t \odot c_{t-1} + i_t \odot g_t \\
- h_t = o_t \odot \tanh(c_t) \\
- \end{array}
-
- where :math:`h_t` is the hidden state at time `t`, :math:`c_t` is the cell
- state at time `t`, :math:`x_t` is the input at time `t`, :math:`h_{t-1}`
- is the hidden state of the layer at time `t-1` or the initial hidden
- state at time `0`, and :math:`i_t`, :math:`f_t`, :math:`g_t`,
- :math:`o_t` are the input, forget, cell, and output gates, respectively.
- :math:`\sigma` is the sigmoid function, and :math:`\odot` is the Hadamard product.
-
- In a multilayer LSTM, the input :math:`x^{(l)}_t` of the :math:`l` -th layer
- (:math:`l >= 2`) is the hidden state :math:`h^{(l-1)}_t` of the previous layer multiplied by
- dropout :math:`\delta^{(l-1)}_t` where each :math:`\delta^{(l-1)}_t` is a Bernoulli random
- variable which is :math:`0` with probability :attr:`dropout`.
-
- If ``proj_size > 0`` is specified, LSTM with projections will be used. This changes
- the LSTM cell in the following way. First, the dimension of :math:`h_t` will be changed from
- ``hidden_size`` to ``proj_size`` (dimensions of :math:`W_{hi}` will be changed accordingly).
- Second, the output hidden state of each layer will be multiplied by a learnable projection
- matrix: :math:`h_t = W_{hr}h_t`. Note that as a consequence of this, the output
- of LSTM network will be of different shape as well. See Inputs/Outputs sections below for exact
- dimensions of all variables. You can find more details in https://arxiv.org/abs/1402.1128.
-
- Args:
- input_size: The number of expected features in the input `x`
- hidden_size: The number of features in the hidden state `h`
- num_layers: Number of recurrent layers. E.g., setting ``num_layers=2``
- would mean stacking two LSTMs together to form a `stacked LSTM`,
- with the second LSTM taking in outputs of the first LSTM and
- computing the final results. Default: 1
- bias: If ``False``, then the layer does not use bias weights `b_ih` and `b_hh`.
- Default: ``True``
- batch_first: If ``True``, then the input and output tensors are provided
- as `(batch, seq, feature)` instead of `(seq, batch, feature)`.
- Note that this does not apply to hidden or cell states. See the
- Inputs/Outputs sections below for details. Default: ``False``
- dropout: If non-zero, introduces a `Dropout` layer on the outputs of each
- LSTM layer except the last layer, with dropout probability equal to
- :attr:`dropout`. Default: 0
- bidirectional: If ``True``, becomes a bidirectional LSTM. Default: ``False``
- proj_size: If ``> 0``, will use LSTM with projections of corresponding size. Default: 0
-
- Inputs: input, (h_0, c_0)
- * **input**: tensor of shape :math:`(L, N, H_{in})` when ``batch_first=False`` or
- :math:`(N, L, H_{in})` when ``batch_first=True`` containing the features of
- the input sequence. The input can also be a packed variable length sequence.
- See :func:`torch.nn.utils.rnn.pack_padded_sequence` or
- :func:`torch.nn.utils.rnn.pack_sequence` for details.
- * **h_0**: tensor of shape :math:`(D * \text{num\_layers}, N, H_{out})` containing the
- initial hidden state for each element in the batch.
- Defaults to zeros if (h_0, c_0) is not provided.
- * **c_0**: tensor of shape :math:`(D * \text{num\_layers}, N, H_{cell})` containing the
- initial cell state for each element in the batch.
- Defaults to zeros if (h_0, c_0) is not provided.
-
- where:
-
- .. math::
- \begin{aligned}
- N ={} & \text{batch size} \\
- L ={} & \text{sequence length} \\
- D ={} & 2 \text{ if bidirectional=True otherwise } 1 \\
- H_{in} ={} & \text{input\_size} \\
- H_{cell} ={} & \text{hidden\_size} \\
- H_{out} ={} & \text{proj\_size if } \text{proj\_size}>0 \text{ otherwise hidden\_size} \\
- \end{aligned}
-
- Outputs: output, (h_n, c_n)
- * **output**: tensor of shape :math:`(L, N, D * H_{out})` when ``batch_first=False`` or
- :math:`(N, L, D * H_{out})` when ``batch_first=True`` containing the output features
- `(h_t)` from the last layer of the LSTM, for each `t`. If a
- :class:`torch.nn.utils.rnn.PackedSequence` has been given as the input, the output
- will also be a packed sequence.
- * **h_n**: tensor of shape :math:`(D * \text{num\_layers}, N, H_{out})` containing the
- final hidden state for each element in the batch.
- * **c_n**: tensor of shape :math:`(D * \text{num\_layers}, N, H_{cell})` containing the
- final cell state for each element in the batch.
-
- Examples:
-
- .. code-block::
-
- import numpy as np
- import megengine as mge
- import megengine.module as M
-
- m = M.LSTM(10, 20, 2, batch_first=False, bidirectional=True, bias=True)
- inp = mge.tensor(np.random.randn(6, 30, 10), dtype=np.float32)
- hx = mge.tensor(np.random.randn(4, 30, 20), dtype=np.float32)
- cx = mge.tensor(np.random.randn(4, 30, 20), dtype=np.float32)
- out, (hn, cn) = m(inp,(hx,cx))
- print(out.numpy().shape)
-
- Outputs:
-
- .. code-block::
-
- (6, 30, 40)
-
- """
-
- def __init__(self, *args, **kwargs) -> None:
- super(LSTM, self).__init__(*args, **kwargs)
-
- def create_cell(self, layer):
- if layer == 0:
- input_size = self.input_size
- else:
- input_size = self.num_directions * self.hidden_size
- return LSTMCell(input_size, self.hidden_size, self.bias)
-
- def init_hidden(self, batch_size):
- hidden_shape = (
- self.num_directions * self.num_layers,
- batch_size,
- self.hidden_size,
- )
- h = zeros(shape=hidden_shape)
- c = zeros(shape=hidden_shape)
- return (h, c)
-
- def get_output_from_hidden(self, hx):
- return hx[0]
-
- def apply_op(self, input, hx):
- fwd_mode = (
- BatchNorm.FwdMode.TRAINING if self.training else BatchNorm.FwdMode.INFERENCE
- )
- op = builtin.LSTM(
- num_layers=self.num_layers,
- bidirectional=self.bidirectional,
- bias=self.bias,
- hidden_size=self.hidden_size,
- proj_size=self.proj_size,
- dropout=self.dropout,
- fwd_mode=fwd_mode,
- )
- output, h, c = apply(op, input, hx[0], hx[1], self._flatten_weights)[:3]
- placeholders = [output.sum() * 0, h.sum() * 0, c.sum() * 0]
- output = output + placeholders[1] + placeholders[2]
- h = h + placeholders[0] + placeholders[2]
- c = c + placeholders[0] + placeholders[1]
- return output, (h, c)
-
- def _apply_fn_to_hx(self, hx, fn):
- return (fn(hx[0]), fn(hx[1]))
-
- def _stack_h_n(self, h_n):
- h = [tup[0] for tup in h_n]
- c = [tup[1] for tup in h_n]
- return (stack(h, axis=0), stack(c, axis=0))
|