numpy 当批次需要不同的屏蔽时,如何在变压器模型中屏蔽可变大小的输入?

0x6upsns  于 2022-11-10  发布在  其他
关注(0)|答案(2)|浏览(131)

我正在使用tensorflow.keras制作一个转换器,并且在理解attention_mask如何为MultiHeadAttention层工作时遇到了问题。
我的输入是三维数据。例如,假设我的整个数据集有10个元素,每个元素的长度不超过4个:


# whole data

[
  # first item
  [
    [     1,      2,      3],
    [     1,      2,      3],
    [np.nan, np.nan, np.nan],
    [np.nan, np.nan, np.nan],
  ],
  # second item
  [
    [     1,      2,      3],
    [     5,      8,      2],
    [     3,      7,      8],
    [     4,      6,      2],
  ],
  ... # 8 more items
]

所以,我的面具看起来是这样的:


# assume this is a numpy array

mask = [
  [
    [1, 1, 1],
    [1, 1, 1],
    [0, 0, 0],
    [0, 0, 0],
  ],
  [
    [1, 1, 1],
    [1, 1, 1],
    [1, 1, 1],
    [1, 1, 1],
  ],
  ...
]

所以遮罩的形状现在是[10, 4, 3]。假设我使用batch_size = 5。现在,根据文档,attention_mask形状应该是[B, T, S](Batch_Size,Query_Size,Key_Size)。在本例中应为[5, 4, 4]

问题

如果只计算一次掩码,我应该给哪5项作为掩码?在我看来,这听起来有违常理。我应该如何制作面具?

根据this的回答,HEAD_SIZE也应该考虑在内,因此它们也会这样做:

mask = mask[:, tf.newaxis, tf.newaxis, :]

我测试的内容

我使用attention_mask成功运行转换器的唯一一次是在以下情况下:

mask = np.ones((batch_size, data.shape[1], data.shape[2]))
mask = mask[:, tf.newaxis, tf.newaxis, :]

显然,这个面具没有任何意义,因为它都是一个,但它只是为了测试它是否有正确的形状。

机型

我使用与kerasexample转换器相同的代码进行时间序列分类

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.0, mask=None):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x, attention_mask=mask)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

def build_model(
    n_classes,
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0.0,
    mlp_dropout=0.0,
    input_mask=None,
) -> keras.Model:
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout, input_mask)

    x = layers.GlobalAveragePooling2D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(n_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)
8zzbczxx

8zzbczxx1#

首先,一个更简单的例子来理解MultiHeadAttention掩码。


# Crude Self attention implementation

query = tf.constant([[1], [2], [3], [4]], dtype=tf.float32)  #Shape([4, 1])

scores = tf.matmul(query, query, transpose_b=True) #Shape([4, 4])

# unnormalized, presoftmax score

上面是给定查询的attention scoresattention_mask用于防止注意此乐谱中的某些位置。因此,掩蔽维度应该与注意力得分维度相同。
假设我们决定上面示例中的当前令牌只需要关注其自身和下一个令牌,那么我们可以将掩码定义为:

mask = tf.constant([[1., 1., -np.inf, -np.inf],
        [-np.inf, 1., 1. ,-np.inf],
        [-np.inf, -np.inf, 1., 1.],
        [-np.inf, -np.inf, -np.inf, 1.]])

# apply mask on the score

scores = scores*mask

# softmax

scores = tf.nn.softmax(scores)

# scores, ( 0 indicates no attention)

[[0.26894143, 0.73105854, 0.        , 0.        ],
 [0.        , 0.11920292, 0.880797  , 0.        ],
 [0.        , 0.        , 0.04742587, 0.95257413],
 [0.        , 0.        , 0.        , 1.        ]]

# score weighted queries

value = tf.matmul(scores, query)

# value is a weighted average of the current and next token of ( [[1], [2], [3], [4]])

[[1.7310585], #weighted average of ([1], [2]) (current and next)
 [2.8807971],
 [3.9525743],
 [4.       ]]

批次中的每一项都可以有不同的掩膜吗?
是的,我能想到的一个用例是,当您对同一批次中的不同样品进行填充时,因此可以设置掩码以忽略这些填充。
您的特定情况:掩码必须是(batch_size, 4, 4)。对于批次中的每个项目,掩码可以是相同的。

batch_size = 5
query = keras.Input(shape=(4, 3))
mask_tensor = keras.Input(shape=(4, 4))

# keras layer

mha = keras.layers.MultiHeadAttention(num_heads=1, key_dim=3)
output = mha(query=query, value=query, attention_mask=mask_tensor, return_attention_scores=True)

# Create a model

model = keras.Model([query, mask_tensor], output)

# random query and mask. Note the mask needs to be (1:attention or 0:no attention)

queries = tf.random.normal(shape=(batch_size, 4, 3))
mask_data = tf.random.uniform(maxval=2, shape=(batch_size, 4, 4), dtype=tf.int32)

# calling the model

values, attn_weights = model.predict([queries, mask_data])

# attm_weights.shape

(5, 1, 4, 4)
oxalkeyp

oxalkeyp2#

经过一点研究和看了几个变压器模型的例子,这就解决了我的问题。
1.创建支持掩码的自定义TransformerBlock
1.在TransformerBlockcall方法中添加一个mask参数,并对其进行整形。
1.在TransformerBlock之前增加一个Masking
代码:

class TransformerBlock(layers.Layer):
    def __init__(self, head_size, num_heads, ff_dim, ff_dim2, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size)
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.conv1 = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")
        self.conv2 = layers.Conv1D(filters=ff_dim2, kernel_size=1)
        self.supports_masking = True

    def call(self, inputs, training, mask=None):
        padding_mask = None
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")

        out_norm1 = self.layernorm1(inputs, training=training)
        out_att = self.att(
            out_norm1, out_norm1, training=training, attention_mask=padding_mask
        )
        out_drop1 = self.dropout1(out_att, training=training)
        res = out_drop1 + inputs
        out_norm2 = self.layernorm2(res, training=training)
        out_conv1 = self.conv1(out_norm2, training=training)
        out_drop2 = self.dropout2(out_conv1, training=training)
        out_conv2 = self.conv2(out_drop2, training=training)
        return out_conv2 + res

def build_model(
    n_classes,
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0.0,
    mlp_dropout=0.0,
    mask=None,
) -> keras.Model:
    inputs = keras.Input(shape=input_shape)
    _x = inputs
    if mask is not None:
        _x = layers.Masking(mask_value=mask)(_x)
    for _ in range(num_transformer_blocks):
        _x = TransformerBlock(
            head_size,
            num_heads,
            ff_dim,
            inputs.shape[-1],
            dropout,
        )(_x)

    _x = layers.GlobalAveragePooling2D(data_format="channels_first")(_x)
    for dim in mlp_units:
        _x = layers.Dense(dim, activation="relu")(_x)
        _x = layers.Dropout(mlp_dropout)(_x)
    outputs = layers.Dense(n_classes, activation="softmax")(_x)
    return keras.Model(inputs, outputs)

相关问题