SasRec
SasRec
- class replay.nn.sequential.SasRec(body, loss)
A model using the SasRec architecture as a hidden state generator. The hidden states are multiplied by the item embeddings, resulting in logits for each of the items.
Source paper: https://arxiv.org/pdf/1808.09781.
Example:
from replay.data import FeatureHint, FeatureSource, FeatureType from replay.data.nn import TensorFeatureInfo, TensorFeatureSource, TensorSchema from replay.nn.agg import SumAggregator from replay.nn.embedding import SequenceEmbedding from replay.nn.mask import DefaultAttentionMask from replay.nn.loss import CESampled from replay.nn.sequential import PositionAwareAggregator, SasRecTransformerLayer tensor_schema = TensorSchema( [ TensorFeatureInfo( "item_id", is_seq=True, feature_type=FeatureType.CATEGORICAL, embedding_dim=256, padding_value=NUM_UNIQUE_ITEMS, cardinality=NUM_UNIQUE_ITEMS+1, feature_hint=FeatureHint.ITEM_ID, feature_sources=[TensorFeatureSource(FeatureSource.INTERACTIONS, "item_id")] ), ] ) body = SasRecBody( embedder=SequenceEmbedding( schema=tensor_schema, ), embedding_aggregator=PositionAwareAggregator( embedding_aggregator=SumAggregator(embedding_dim=256), max_sequence_length=100, dropout=0.2, ), attn_mask_builder=DefaultAttentionMask( reference_feature_name=tensor_schema.item_id_feature_name, num_heads=2, ), encoder=SasRecTransformerLayer( embedding_dim=256, num_heads=2, num_blocks=2, dropout=0.3, activation="relu", ), output_normalization=torch.nn.LayerNorm(256), ) sasrec = SasRec( body=body, loss=CESampled(padding_idx=tensor_schema.item_id_features.item().padding_value) )
- __init__(body, loss)
- Parameters
body (SasRecBody) – An instance of SasRecBody.
loss (LossProto) – An object of a class that performs loss calculation based on hidden states from the model, positive and optionally negative labels.
- forward(feature_tensors, padding_mask, candidates_to_score=None, positive_labels=None, negative_labels=None, target_padding_mask=None)
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.
padding_mask (BoolTensor) – A mask of shape
(batch_size, sequence_length)indicating which elements withinkeyto ignore for the purpose of attention (i.e. treat as “padding”).Falsevalue indicates that the correspondingkeyvalue will be ignored.candidates_to_score (Optional[LongTensor]) –
a tensor containing item IDs for which you need to get logits at the inference stage.
Note: you must take into account the padding value when creating the tensor.
The tensor participates in calculations only on the inference stage. You don’t have to submit an argument at training stage, but if it is submitted, then no effect will be provided.
Default:
None.positive_labels (Optional[LongTensor]) –
a tensor containing positive labels for calculating the loss.
You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.
Default:
None.negative_labels (Optional[LongTensor]) –
a tensor containing negative labels for calculating the loss.
Note: Before run make sure that your loss supports calculations with negative labels.
You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.
Default:
None.target_padding_mask (Optional[BoolTensor]) –
A mask of shape
(batch_size, sequence_length, num_positives)indicating elements frompositive_labelsto ignore during loss calculation.Falsevalue indicates that the corresponding value will be ignored.You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.
Default:
None.
- Returns
During training, the model will return an object of the
TrainOutputcontainer class. At the inference stage, theInferenceOutputclass will be returned.- Return type
Union[TrainOutput, InferenceOutput]
SasRec Building Blocks
SasRecBody
- class replay.nn.sequential.SasRecBody(embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)
Implementation of the architecture of the SasRec model.
It can include various self-written blocks for modifying the model, but the sequence of applying layers is fixed in accordance with the original architecture.
- __init__(embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)
- Parameters
embedder (EmbedderProto) – An object of a class that performs the logic of generating embeddings from an input set of tensors.
embedding_aggregator (AggregatorProto) –
An object of a class that performs the logic of aggregating multiple embeddings.
For example, it can be a
sum, amean, or aconcatenation.attn_mask_builder (AttentionMaskProto) – An object of a class that performs the logic of generating an attention mask based on the features and padding mask given to the model.
encoder (EncoderProto) – An object of a class that performs the logic of generating a hidden embedding representation based on features, padding masks, attention mask, and aggregated embedding.
output_normalization (NormalizerProto) –
An object of a class that performs the logic of normalization of the hidden state obtained from the encoder.
For example, it may be a
torch.nn.LayerNormortorch.nn.RMSNorm.
- forward(feature_tensors, padding_mask)
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.
padding_mask (BoolTensor) – A mask of shape
(batch_size, sequence_length)indicating which elements withinkeyto ignore for the purpose of attention (i.e. treat as “padding”).Falsevalue indicates that the correspondingkeyvalue will be ignored.
- Returns
The final hidden state.
Expected shape:
(batch_size, sequence_length, embedding_dim)- Return type
Tensor
SasRecTransformerLayer
- class replay.nn.sequential.SasRecTransformerLayer(embedding_dim, num_heads, num_blocks, dropout, activation='gelu')
SasRec vanilla layer. Layer consists of Multi-Head Attention followed by a Point-Wise Feed-Forward Network.
Source paper: https://arxiv.org/pdf/1808.09781.pdf
- __init__(embedding_dim, num_heads, num_blocks, dropout, activation='gelu')
- Parameters
embedding_dim (int) – Total dimension of the model. Must be divisible by num_heads.
num_heads (int) – Number of parallel attention heads.
num_blocks (int) – Number of Transformer blocks.
dropout (float) – probability of an element to be zeroed.
activation (Literal['relu', 'gelu']) – the name of the activation function. Default:
"gelu".
- forward(feature_tensors, input_embeddings, padding_mask, attention_mask)
- Parameters
input_embeddings (Tensor) – Input tensor of shape
(batch_size, sequence_length, embedding_dim).padding_mask (BoolTensor) – A mask of shape
(batch_size, sequence_length)indicating which elements withinkeyto ignore for the purpose of attention (i.e. treat as “padding”).Falsevalue indicates that the correspondingkeyvalue will be ignored.attention_mask (FloatTensor) –
Causal-like mask for attention pattern, where
-infforPAD,0- otherwise.Possible shapes:
(batch_size * num_heads, sequence_length, sequence_length)(batch_size, num_heads, sequence_length, sequence_length)
- Returns
torch.Tensor: Output tensor after processing through the layer.
- Return type
Tensor
PositionAwareAggregator
- class replay.nn.sequential.PositionAwareAggregator(embedding_aggregator, max_sequence_length, dropout)
The layer for aggregating embeddings and adding positional encoding.
- __init__(embedding_aggregator, max_sequence_length, dropout)
- Parameters
embedding_aggregator (AggregatorProto) –
An object of a class that performs the logic of aggregating multiple embeddings.
For example, it can be a
sum, amean, or aconcatenation.max_sequence_length (int) – Max length of sequence.
dropout (float) – probability of an element to be zeroed.
- forward(feature_tensors)
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to pass into
embedding_aggregator.- Returns
Aggregated embeddings with positional encoding.
- Return type
Tensor
MultiHead Differential Attention
MultiHeadDifferentialAttention
- class replay.nn.attention.MultiHeadDifferentialAttention(embedding_dim, num_heads, lambda_init, bias=False, kdim=None, vdim=None)
Multi-Head Differential Attention Mechanism. Replaces the conventional softmax attention with a differential attention. Incorporattes a causal mask (if other not specified) to ensure autoregressive behavior.
Source paper: https://arxiv.org/pdf/2410.05258
- __init__(embedding_dim, num_heads, lambda_init, bias=False, kdim=None, vdim=None)
- Parameters
embedding_dim (int) – Total dimension of the model. Must be divisible by
num_heads.num_heads (int) – Number of parallel attention heads.
lambda_init (float) – Initial value for lambda.
bias (bool) – If specified, adds bias to input / output projection layers. Default:
False.kdim (Optional[int]) – Total number of features for keys. Default:
None(useskdim=embedding_dim).vdim (Optional[int]) – Total number of features for values. Default:
None(usesvdim=embedding_dim).
- forward(query, key, value, attn_mask)
Forward pass for Multi-Head Differential Attention.
- Parameters
query (Tensor) – Query sequence of shape
(batch_size, sequence_length, embedding_dim).key (Tensor) – Key sequence of shape
(batch_size, sequence_length, embedding_dim).value (Tensor) – Value sequence of shape
(batch_size, sequence_length, embedding_dim).attn_mask (FloatTensor) –
attention mask, where
-infforPAD,0- otherwise.Possible shapes:
(batch_size * num_heads, sequence_length, sequence_length)(batch_size, num_heads, sequence_length, sequence_length)
- Returns
torch.Tensor: Output tensor after applying differential attention.
- Return type
Tensor
DiffTransformerBlock
- class replay.nn.sequential.DiffTransformerBlock(embedding_dim, num_heads, lambda_init)
Single Block of the DiffTransformer Architecture. Consists of Multi-Head Differential Attention followed by a SwiGLU Feed-Forward Network.
Source paper: https://arxiv.org/pdf/2410.05258
- __init__(embedding_dim, num_heads, lambda_init)
- Parameters
embedding_dim (int) – Total dimension of the model. Must be divisible by
num_heads.num_heads (int) – Number of parallel attention heads.
lambda_init (float) – Initial value for lambda.
- forward(input_embeddings, attention_mask)
Forward pass for a single differential transformer block.
- Parameters
input_embeddings (Tensor) – Input tensor of shape
(batch_size, sequence_length, embedding_dim).attention_mask (FloatTensor) –
Causal-like mask for attention pattern, where
-infforPAD,0- otherwise.Possible shapes:
(batch_size * num_heads, sequence_length, sequence_length)(batch_size, num_heads, sequence_length, sequence_length)
- Returns
Output tensor after processing through the block.
- Return type
Tensor
DiffTransformerLayer
- class replay.nn.sequential.DiffTransformerLayer(embedding_dim, num_heads, num_blocks)
Stacked blocks of the DiffTransformer Architecture. Single block consists of Multi-Head Differential Attention followed by a SwiGLU Feed-Forward Network.
Source paper: https://arxiv.org/pdf/2410.05258
Reference: https://github.com/nanowell/Differential-Transformer-PyTorch/blob/main/DiffTransformer.py
- __init__(embedding_dim, num_heads, num_blocks)
- Parameters
embedding_dim (int) – Total dimension of the model. Must be divisible by num_heads.
num_heads (int) – Number of parallel attention heads.
num_blocks (int) – Number of Transformer blocks.
- forward(input_embeddings, attention_mask)
- Parameters
input_embeddings (Tensor) – Input tensor of shape
(batch_size, sequence_length, embedding_dim).attention_mask (FloatTensor) –
Causal-like mask for attention pattern, where
-infforPAD,0- otherwise.Possible shapes:
(batch_size * num_heads, sequence_length, sequence_length)(batch_size, num_heads, sequence_length, sequence_length)
- Returns
Output tensor after processing through the layer.
- Return type
Tensor
SasRec Transforms
- replay.nn.transform.template.make_default_sasrec_transforms(tensor_schema, query_column='query_id')
Creates a valid transformation pipeline for SasRec data batches.
- Generated pipeline expects input dataset to contain the following columns:
Query ID column, specified by
query_column.Item ID column, specified in the tensor schema.
- Parameters
tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.
query_column (str) – Name of the column containing query IDs. Default:
"query_id".
- Returns
dict of transforms specified for every dataset split (train, validation, test, predict).
- Return type
dict[str, list[torch.nn.modules.module.Module]]
TwoTower
TwoTower
- class replay.nn.sequential.TwoTower(body, loss, context_merger=None)
Implementation generic Two-Tower architecture with two independent “towers” (encoders) which encode separate inputs. In recommender systems they are typically query tower and item tower. The output hidden states of each “tower” are fused via dot product in the model head.
Source paper: https://doi.org/10.1145/3366424.3386195
Example:
from replay.data import FeatureHint, FeatureSource, FeatureType from replay.data.nn import TensorFeatureInfo, TensorFeatureSource, TensorSchema from replay.nn.agg import SumAggregator from replay.nn.embedding import SequenceEmbedding from replay.nn.ffn import SwiGLUEncoder from replay.nn.mask import DefaultAttentionMask from replay.nn.loss import CESampled from replay.nn.sequential import PositionAwareAggregator, SasRecTransformerLayer from replay.nn.sequential.twotower import FeaturesReader tensor_schema = TensorSchema( [ TensorFeatureInfo( "item_id", is_seq=True, feature_type=FeatureType.CATEGORICAL, embedding_dim=256, padding_value=NUM_UNIQUE_ITEMS, cardinality=NUM_UNIQUE_ITEMS, feature_hint=FeatureHint.ITEM_ID, feature_sources=[TensorFeatureSource(FeatureSource.INTERACTIONS, "item_id")] ), ] ) common_aggregator = SumAggregator(embedding_dim=256) body = TwoTowerBody( schema=tensor_schema, embedder=SequenceEmbedding(schema=tensor_schema), attn_mask_builder=DefaultAttentionMask( reference_feature_name=tensor_schema.item_id_feature_name, num_heads=2, ), query_tower_feature_names=tensor_schema.names, item_tower_feature_names=tensor_schema.names, query_embedding_aggregator=PositionAwareAggregator( embedding_aggregator=common_aggregator, max_sequence_length=100, dropout=0.2, ), item_embedding_aggregator=common_aggregator, query_encoder=SasRecTransformerLayer( embedding_dim=256, num_heads=2, num_blocks=2, dropout=0.3, activation="relu", ), query_tower_output_normalization=torch.nn.LayerNorm(256), item_encoder=SwiGLUEncoder(embedding_dim=256, hidden_dim=2*256), item_features_reader=FeaturesReader( schema=tensor_schema, metadata={"item_id": {}}, path="item_features.parquet", ), ) twotower = TwoTower( body=body, loss=CESampled(ignore_index=tensor_schema["item_id"].padding_value), )
- __init__(body, loss, context_merger=None)
- Parameters
body (TwoTowerBody) – An instance of TwoTowerBody.
loss (LossProto) – An object of a class that performs loss calculation based on hidden states from the model, positive and optionally negative labels.
context_merger (Optional[ContextMergerProto]) – An object of class that performs fusing query encoder hidden state with input feature tensors. Default:
None.
- forward(feature_tensors, padding_mask, candidates_to_score=None, positive_labels=None, negative_labels=None, target_padding_mask=None)
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.
padding_mask (BoolTensor) – A mask of shape
(batch_size, sequence_length)indicating which elements withinkeyto ignore for the purpose of attention (i.e. treat as “padding”).Falsevalue indicates that the correspondingkeyvalue will be ignored.candidates_to_score (Optional[LongTensor]) –
a tensor containing item IDs for which you need to get logits at the inference stage.
Note: you must take into account the padding value when creating the tensor.
The tensor participates in calculations only on the inference stage. You don’t have to submit an argument at training stage, but if it is submitted, then no effect will be provided.
Default:
None.positive_labels (Optional[LongTensor]) –
a tensor containing positive labels for calculating the loss.
You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.
Default:
None.negative_labels (Optional[LongTensor]) –
a tensor containing negative labels for calculating the loss.
Note: Before run make sure that your loss supports calculations with negative labels.
You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.
Default:
None.target_padding_mask (Optional[BoolTensor]) –
A mask of shape
(batch_size, sequence_length, num_positives)indicating elements frompositive_labelsto ignore during loss calculation.Falsevalue indicates that the corresponding value will be ignored.You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.
Default:
None.
- Returns
During training, the model will return an object of the
TrainOutputcontainer class. At the inference stage, theInferenceOutputclass will be returned.- Return type
Union[TrainOutput, InferenceOutput]
- classmethod from_params(schema, item_features_reader, embedding_dim=192, num_heads=4, num_blocks=2, max_sequence_length=50, dropout=0.3, excluded_features=None, categorical_list_feature_aggregation_method='sum')
Class method for fast creating an instance of TwoTower with typical types of blocks and user provided parameters.
The item “tower” is a SwiGLU encoder (MLP with SwiGLU activation),
the user “tower” is a SasRec transformer layers, and loss is Cross-Entropy loss.
Embeddings of every feature in both “towers” are aggregated via sum. The same features are be used in both “towers”, that is, the features specified in the tensor schema with the exception of excluded_features.
To create an instance of TwoTower with other types of blocks, please use the class constructor.
- Parameters
schema (TensorSchema) – tensor schema object with metainformation about features.
item_features_reader (FeaturesReaderProtocol) –
A class that implements reading features, processing them, and converting them to
torch.Tensorfor ItemTower. You can usereplay.nn.sequential.twotower.FeaturesReaderas a standard class.But you can implement your own feature processing, just follow the
replay.nn.sequential.twotower.FeaturesReaderProtocolprotocol.embedding_dim (int) – embeddings dimension in both towers. Default:
192.num_heads (int) – number of heads in user tower SasRec layers. Default:
4.num_blocks (int) – number of blocks in user tower SasRec layers. Default:
2.max_sequence_length (int) – maximun length of sequence in user tower SasRec layers. Default:
50.dropout (float) – dropout value in both towers. Default:
0.3excluded_features (Optional[list[str]]) – A list containing the names of features for which you do not need to generate an embedding. Fragments from this list are expected to be contained in
schema. Default:None.categorical_list_feature_aggregation_method (str) – Mode to aggregate tokens in token item representation (categorical list only). Default:
"sum".
- Returns
an instance of TwoTower class.
- Return type
TwoTower Building Blocks
TwoTowerBody
- class replay.nn.sequential.TwoTowerBody(schema, embedder, attn_mask_builder, query_tower_feature_names, item_tower_feature_names, query_embedding_aggregator, item_embedding_aggregator, query_encoder, query_tower_output_normalization, item_encoder, item_features_reader)
Foundation for Two-Tower model which creates query “tower” and item “tower”.
For usage of two tower model, an instance of this class should be passed into TwoTower with any loss from Losses.
QueryTower
- class replay.nn.sequential.QueryTower(feature_names, embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)
Query Tower of Two-Tower model.
- __init__(feature_names, embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)
- Parameters
feature_names (Sequence[str]) – sequence of names used in query tower.
embedder (EmbedderProto) – An object of a class that performs the logic of generating embeddings from an input batch.
embedding_aggregator (AggregatorProto) – An object of a class that performs the logic of aggregating multiple embeddings of query tower.
attn_mask_builder (AttentionMaskProto) – An object of a class that performs the logic of generating an attention mask based on the features and padding mask given to the model.
encoder (QueryEncoderProto) – An object of a class that performs the logic of generating a query hidden embedding representation based on features, padding masks, attention mask, and aggregated embedding of
query_tower_feature_names. It’s supposed to be a transformer.output_normalization (NormalizerProto) –
An object of a class that performs the logic of normalization of the hidden state obtained from the query encoder.
For example, it can be a
torch.nn.LayerNormortorch.nn.RMSNorm.
- forward(feature_tensors, padding_mask)
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.
padding_mask (BoolTensor) – A mask of shape
(batch_size, sequence_length)indicating which elements withinkeyto ignore for the purpose of attention (i.e. treat as “padding”).Falsevalue indicates that the correspondingkeyvalue will be ignored.
- Returns
The final hidden state.
Expected shape:
(batch_size, sequence_length, embedding_dim)- Return type
Tensor
ItemTower
- class replay.nn.sequential.ItemTower(schema, item_features_reader, feature_names, embedder, embedding_aggregator, encoder)
Item Tower of Two-Tower model.
Note: ItemTower loads feature tensors of all items into memory.
- __init__(schema, item_features_reader, feature_names, embedder, embedding_aggregator, encoder)
- Parameters
schema (TensorSchema) – tensor schema object with metainformation about features.
item_features_reader (FeaturesReaderProtocol) –
A class that implements reading features, processing them, and converting them to
torch.Tensorfor ItemTower. You can usereplay.nn.sequential.twotower.FeaturesReaderas a standard class.But you can implement your own feature processing, just follow the
replay.nn.sequential.twotower.FeaturesReaderProtocolprotocol.feature_names (Sequence[str]) – sequence of names used in item tower.
embedder (EmbedderProto) – An object of a class that performs the logic of generating embeddings from an input batch.
embedding_aggregator (AggregatorProto) – An object of a class that performs the logic of aggregating multiple embeddings of item tower.
encoder (ItemEncoderProto) – An object of a class that performs the logic of generating an item hidden embedding representation based on features and aggregated embeddings of
item_tower_feature_names. Item encoder uses item reference which is created based onitem_features_path.
- forward(candidates_to_score=None)
- Parameters
candidates_to_score (Optional[LongTensor]) – IDs of items using for obtaining item embeddings from item tower. If is setted to
None, all item embeddings from item tower will be returned. Default:None.- Returns
item embeddings.
Expected shape:
(candidates_to_score, embedding_dim),(items_num, embedding_dim)ifcandidates_to_scoreisNone.
TwoTower Transforms
- replay.nn.transform.template.make_default_twotower_transforms(tensor_schema, query_column='query_id')
Creates a valid transformation pipeline for TwoTower data batches.
- Generated pipeline expects input dataset to contain the following columns:
Query ID column, specified by
query_column.Item ID column, specified in the tensor schema.
- Parameters
tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.
query_column (str) – Name of the column containing query IDs. Default:
"query_id".
- Returns
dict of transforms specified for every dataset split (train, validation, test, predict).
- Return type
dict[str, list[torch.nn.modules.module.Module]]
Losses
BCE, BCESampled, CESampled, LogInCE, LogInCESampled, LogOutCE support the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence). Source of multi-positive labels: https://arxiv.org/abs/2205.04507
BCE
- class replay.nn.loss.BCE(**kwargs)
Pointwise Binary Cross-Entropy loss. Calculates loss over all items catalog.
The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).
- forward(model_embeddings, positive_labels, target_padding_mask)
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – labels of positive events of shape
(batch_size, sequence_length, num_positives).target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape
(batch_size, sequence_length, num_positives).
- Returns
computed loss value.
- Return type
Tensor
BCESampled
- class replay.nn.loss.BCESampled(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
Sampled Pointwise Binary Cross-Entropy loss (BCE with negative sampling). Calculates loss between one positive item and K negatively sampled items.
The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).
- __init__(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
- Parameters
log_epsilon (float) – correction to avoid zero in the logarithm during loss calculating. Default:
1e-6.clamp_border (float) – upper bound for clamping loss tensor, lower bound will be setted to -clamp_border. Default:
100.0.negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default:
-100.
- forward(model_embeddings, positive_labels, negative_labels, target_padding_mask)
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – labels of positive events of shape
(batch_size, sequence_length, num_positives).negative_labels (LongTensor) –
labels of sampled negative events. Expected shape:
(batch_size, sequence_length, num_negatives)(batch_size, num_negatives)(num_negatives)- a case where the same negative events are used for the entire batch.
target_padding_mask (BoolTensor) – padding mask corresponding for
positive_labelsof shape(batch_size, sequence_length, num_positives)
- Returns
computed loss value.
- Return type
Tensor
CE
- class replay.nn.loss.CE(**kwargs)
Full Cross-Entropy loss Calculates loss over all items catalog.
- __init__(**kwargs)
To calculate the loss,
torch.nn.CrossEntropyLossis used. You can pass all parameters for initializing the object via kwargs.
- forward(model_embeddings, positive_labels, target_padding_mask)
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – labels of positive events of shape
(batch_size, sequence_length, num_positives).target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape
(batch_size, sequence_length, num_positives).
- Returns
computed loss value.
- Return type
Tensor
CESampled
- class replay.nn.loss.CESampled(negative_labels_ignore_index=-100, **kwargs)
Sampled Cross-Entropy loss (Cross-Entropy with negative sampling). Calculates loss between one positive item and K negatively sampled items.
The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).
- __init__(negative_labels_ignore_index=-100, **kwargs)
To calculate the loss,
torch.nn.CrossEntropyLossis used. You can pass all parameters for initializing the object via kwargs.- Parameters
negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default:
-100.
- forward(model_embeddings, positive_labels, negative_labels, target_padding_mask)
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – labels of positive events of shape
(batch_size, sequence_length, num_positives).negative_labels (LongTensor) –
labels of sampled negative events.
- Expected shape:
(batch_size, sequence_length, num_negatives)(batch_size, num_negatives)(num_negatives)- a case where the same negative events are used for the entire batch.
target_padding_mask (BoolTensor) – padding mask corresponding for
positive_labelsof shape(batch_size, sequence_length, num_positives)
- Returns
computed loss value.
- Return type
Tensor
LogInCE
- class replay.nn.loss.LogInCE(cardinality, log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
LogInCE loss.
\[L_{\text{InfoNCE}} = -\log \frac{\sum_{p \in P} \exp(\mathrm{sim}(q, p))}{\sum_{p \in P} \exp(\mathrm{sim}(q, p)) + \sum_{n \in N} \exp(\mathrm{sim}(q, n))},\]where q – query embedding, P – set of positive logits, N – set of negative logits, \(sim(\cdot, \cdot)\) – similaruty function.
The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).
- __init__(cardinality, log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
- Parameters
cardinality (int) – number of unique items in vocabulary (catalog). The specified cardinality value must not take into account the padding value.
log_epsilon (float) – correction to avoid zero in the logarithm during loss calculating. Default:
1e-6.clamp_border (float) – upper bound for clamping loss tensor, lower bound will be setted to
-clamp_border. Default:100.0.negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default:
-100.
- forward(model_embeddings, positive_labels, target_padding_mask)
Note: At forward pass, the whole catalog of items is used as negatives. Next, negative logits, corresponding to positions where negative labels coincide with positive ones, are masked.
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – ground truth labels of positive events of shape (batch_size, sequence_length, num_positives).
target_padding_mask (BoolTensor) – padding mask corresponding for
positive_labelsof shape (batch_size, sequence_length, num_positives).
- Returns
computed loss value.
- Return type
Tensor
LogInCESampled
- class replay.nn.loss.LogInCESampled(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
Sampled version of LogInCE (Log InfoNCE) loss (with negative sampling items).
\[L_{\text{InfoNCE}} = -\log \frac{\sum_{p \in P} \exp(\mathrm{sim}(q, p))}{\sum_{p \in P} \exp(\mathrm{sim}(q, p)) + \sum_{n \in N_{\text{sampled}}} \exp(\mathrm{sim}(q, n))},\]where q – query embedding, P – set of positive logits, \(N_sampled\) – set of negative logits, \(sim(\cdot, \cdot)\) – similaruty function.
Same as
LogInCE, the difference in the set of negatives.The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).
- __init__(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
- Parameters
log_epsilon (float) – correction to avoid zero in the logarithm during loss calculating. Default: 1e-6.
clamp_border (float) – upper bound for clamping loss tensor, lower bound will be setted to -clamp_border. Default: 100.0.
negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default:
-100.
- forward(model_embeddings, positive_labels, negative_labels, target_padding_mask)
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – labels of positive events of shape
(batch_size, sequence_length, num_positives).negative_labels (LongTensor) –
labels of sampled negative events.
- Expected shape:
(batch_size, sequence_length, num_negatives)(batch_size, num_negatives)(num_negatives)- a case where the same negative events are used for the entire batch.
target_padding_mask (BoolTensor) – padding mask corresponding for
positive_labelsof shape(batch_size, sequence_length, num_positives)
- Returns
computed loss value.
- Return type
Tensor
LogOutCE
- class replay.nn.loss.LogOutCE(cardinality, negative_labels_ignore_index=-100, **kwargs)
LogOutCE loss.
\[L_{\text{InfoNCE}} = - \sum_{p \in P} \log \frac{ \exp(\mathrm{sim}(q, p))} {\exp(\mathrm{sim}(q, p)) + \sum_{n \in N} \exp(\mathrm{sim}(q, n))}.\]where q – query embedding, P – set of positive logits, N – set of negative logits, \(sim(\cdot, \cdot)\) – similaruty function.
The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).
- __init__(cardinality, negative_labels_ignore_index=-100, **kwargs)
To calculate the loss,
torch.nn.CrossEntropyLossis used. You can pass all parameters for initializing the object via kwargs.- Parameters
cardinality (int) – number of unique items in vocabulary (catalog). The specified cardinality value must not take into account the padding value.
negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default:
-100.
- forward(model_embeddings, positive_labels, target_padding_mask)
Note: At forward pass, the whole catalog of items is used as negatives. Next, negative logits, corresponding to positions where negative labels coincide with positive ones, are masked.
- Parameters
model_embeddings (Tensor) – model output of shape
(batch_size, sequence_length, embedding_dim).positive_labels (LongTensor) – ground truth labels of positive events of shape (batch_size, sequence_length, num_positives).
target_padding_mask (BoolTensor) – padding mask corresponding for
positive_labelsof shape (batch_size, sequence_length, num_positives).
- Returns
computed loss value.
- Return type
Tensor
Scalable Cross Entropy
SCEParams
- class replay.models.nn.loss.SCEParams(n_buckets, bucket_size_x, bucket_size_y, mix_x=False)
Set of parameters for ScalableCrossEntropyLoss.
- Parameters
n_buckets (int) – Number of buckets into which samples will be distributed.
bucket_size_x (int) – Number of item hidden representations that will be in each bucket.
bucket_size_y (int) – Number of item embeddings that will be in each bucket.
mix_x (bool) – Whether a randomly generated matrix will be multiplied by the model output matrix or not. Default:
False.
ScalableCrossEntropyLoss
- class replay.models.nn.loss.ScalableCrossEntropyLoss(sce_params)
- __call__(embeddings, positive_labels, all_embeddings, padding_mask, tokens_mask=None)
ScalableCrossEntropyLoss computation.
- Parameters
embeddings (Tensor) – Matrix of the last transformer block outputs.
positive_labels (LongTensor) – Positive labels.
all_embeddings (Tensor) – Matrix of all item embeddings.
padding_mask (BoolTensor) – Padding mask.
tokens_mask (Optional[BoolTensor]) – Tokens mask (need only for Bert4Rec). Default:
None.
- Return type
Tensor
- __init__(sce_params)
ScalableCrossEntropyLoss for Sequential Recommendations with Large Item Catalogs. Reference article may be found at https://arxiv.org/pdf/2409.18721.
- Parameters
SCEParams – Dataclass with ScalableCrossEntropyLoss parameters. Dataclass contains following values:
Model Building Blocks
Building blocks for neural network models.
Embeddings
SequenceEmbedding
- class replay.nn.embedding.SequenceEmbedding(schema, excluded_features=None, categorical_list_feature_aggregation_method='sum')
The embedding generation class for all types of features given into the sequential models.
The embedding size for each feature will be taken from
TensorSchema(from field namedembedding_dim). For numerical features, it is expected that the last dimension of the tensor will be equal totensor_dimfield inTensorSchema.Keep in mind that the first dimension of the every categorical embedding (the size of embedding table) will equal to the
cardinality+ 1. This is necessary to take into account the padding value.- __init__(schema, excluded_features=None, categorical_list_feature_aggregation_method='sum')
- Parameters
schema (TensorSchema) – TensorSchema containing meta information about all the features for which you need to generate an embedding.
excluded_features (Optional[list[str]]) – A list containing the names of features for which you do not need to generate an embedding. Fragments from this list are expected to be contained in
schema. Default:None.categorical_list_feature_aggregation_method (Literal['sum', 'mean', 'max']) – Mode to aggregate tokens in token item representation (categorical list only). Default:
"sum".
- property embeddings_dim: dict[str, int]
Returns the embedding dimensions for each of the features in the schema.
- forward(feature_tensor, feature_names=None)
- Parameters
feature_tensor (Mapping[str, Tensor]) – a dictionary of tensors to generate embedding. It is expected that the keys from this dictionary match the names of the features in the given
schema.feature_names (Optional[Sequence[str]]) –
A custom list of features for which embeddings need to be generated. It is expected that the values from this list match the names of the features in the given
schema.Default:
None. This means that the names of the features from theschemawill be used.
- Returns
a dictionary with tensors that contains embeddings.
- Return type
Mapping[str, Tensor]
- get_item_weights(indices=None)
Getting the embedding weights for a feature that matches the item id feature with the name specified in the
schema. It is expected that embeddings for this feature will definitely exist. Note: the row corresponding to the padding will be excluded from the returned weights. This logic will work if givenindicesisNone.- Parameters
indices (Optional[LongTensor]) – Items indices.
- Returns
Embeddings for specific items.
- Return type
Tensor
CategoricalEmbedding
- class replay.nn.embedding.CategoricalEmbedding(feature_info, categorical_list_feature_aggregation_method='sum')
The embedding generation class for categorical features. It supports working with single features for each event in sequence, as well as several (categorical list).
When using this class, keep in mind that the first dimension of the embedding (the size of embedding table) will equal to the
cardinality+ 1. This is necessary to take into account the padding value.- __init__(feature_info, categorical_list_feature_aggregation_method='sum')
- Parameters
feature_info (TensorFeatureInfo) – Meta information about the feature.
categorical_list_feature_aggregation_method (Literal['sum', 'mean', 'max']) – Mode to aggregate tokens in token item representation (categorical list only). One of {sum, mean, max} Default:
"sum".
- property embedding_dim: int
Embedding dimension after applying the layer
- forward(indices)
- Parameters
indices (LongTensor) – Items indices.
- Returns
Embeddings for specific items.
- Return type
Tensor
- property weight: Tensor
Returns the weights of the embedding layer, excluding the row that corresponds to the padding.
NumericalEmbedding
- class replay.nn.embedding.NumericalEmbedding(feature_info)
The embedding generation class for numerical features. It supports working with single features for each event in sequence, as well as several (numerical list).
Note: if the
embedding_dimfield inTensorFeatureInfofor an incoming feature matches its last dimension (tensor_dimfield inTensorFeatureInfo), then transformation will not be applied.- __init__(feature_info)
- Parameters
feature_info (TensorFeatureInfo) – Meta information about the feature.
- property embedding_dim: int
Embedding dimension after applying the layer
- forward(values)
Numerical embedding forward pass.
Note: if the
embedding_dimfor an incoming feature matches its last dimension (tensor_dim), then transformation will not be applied.- Parameters
values (FloatTensor) – feature values.
- Returns
Embeddings for specific items.
- Return type
Tensor
- property weight: Tensor
Returns the weight of the applied layer. If
embedding_dimmatchestensor_dim, then the identity matrix will be returned.
Aggregators
The main purpose of these modules is to aggregate embeddings. But in general, you can use them to aggregate any type of tensors.
SumAggregator
- class replay.nn.agg.SumAggregator(embedding_dim)
The class summarizes the incoming embeddings. Note that for successful aggregation, the dimensions of all embeddings must match.
- __init__(embedding_dim)
- Parameters
embedding_dim (int) – The last dimension of incoming and outcoming embeddings.
- property embedding_dim: int
The dimension of the output embedding
- forward(feature_tensors)
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to sum up. The dimensions of all tensors in the dictionary must match.
- Returns
torch.Tensor. The last dimension of the tensor is
embedding_dim.- Return type
Tensor
ConcatAggregator
- class replay.nn.agg.ConcatAggregator(input_embedding_dims, output_embedding_dim)
The class concatenates incoming embeddings by the last dimension.
If you need to concatenate several embeddings, then a linear layer will be applied to get the last dimension equal to
embedding_dim.If only one embedding comes to the input, then its last dimension is expected to be equal to
embedding_dim.- __init__(input_embedding_dims, output_embedding_dim)
- Parameters
input_embedding_dims (list[int]) – Dimensions of incoming embeddings.
output_embedding_dim (int) – The dimension of the output embedding after concatenation.
- property embedding_dim: int
The dimension of the output embedding
- forward(feature_tensors)
To ensure the deterministic nature of the result, the embeddings are concatenated in the ascending order of the keys in the dictionary.
- Parameters
feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to concatenate.
- Returns
The last dimension of the tensor is
embedding_dim.- Return type
Tensor
Feed Forward Networks
PointWiseFeedForward
- class replay.nn.ffn.PointWiseFeedForward(embedding_dim, dropout, activation='gelu')
Point wise feed forward network layer.
Source paper: https://arxiv.org/pdf/1808.09781.pdf
- __init__(embedding_dim, dropout, activation='gelu')
- Parameters
embedding_dim (int) – Dimension of the input features.
dropout (float) – probability of an element to be zeroed.
activation (Literal['relu', 'gelu']) – the name of the activation function. Default:
"gelu".
- forward(input_embeddings)
- Parameters
input_embeddings (LongTensor) – Query feature tensor.
- Returns
Output tensors.
- Return type
LongTensor
SwiGLU
- class replay.nn.ffn.SwiGLU(embedding_dim, hidden_dim)
SwiGLU Activation Function. Combines the Swish activation with Gated Linear Units.
- __init__(embedding_dim, hidden_dim)
- Parameters
embedding_dim (int) – Dimension of the input features.
hidden_dim (int) – Dimension of hidden layer. According to the original source, it is recommended to set the size of the hidden layer as \(2 \cdot \text{embedding_dim}\).
- forward(input_embeddings)
Forward pass for SwiGLU.
- Parameters
input_embeddings (Tensor) – Input tensor of shape
(batch_size, sequence_length, embedding_dim).- Returns
Output tensor of shape
(batch_size, sequence_length, embedding_dim).- Return type
Tensor
SwiGLUEncoder
- class replay.nn.ffn.SwiGLUEncoder(embedding_dim, hidden_dim)
MLP block consists of SwiGLU Feed-Forward network followed by a RMSNorm layer with skip connection.
- __init__(embedding_dim, hidden_dim)
- Parameters
embedding_dim (int) – Dimension of the input features.
- forward(input_embeddings)
- Parameters
input_embeddings (Tensor) – Input tensor of shape
(batch_size, sequence_length, embedding_dim).- Returns
Output tensor of shape
(batch_size, sequence_length, embedding_dim).- Return type
Tensor
Attention Masks
DefaultAttentionMask
- class replay.nn.mask.DefaultAttentionMask(reference_feature_name, num_heads)
Constructs a float lower-triangular attenstion mask of shape
(batch_size * num_heads, sequence_length, sequence_length), where-inffor<PAD>,0- otherwise.- __call__(feature_tensor, padding_mask)
- Parameters
feature_tensor (Mapping[str, Tensor]) – dict of features tensors.
padding_mask (BoolTensor) – Padding mask where
0-<PAD>,1- otherwise.
- Returns
Float attention mask of shape
(B * num_heads, L, L), where-inffor<PAD>,0- otherwise.- Return type
FloatTensor
- __init__(reference_feature_name, num_heads)
- Parameters
reference_feature_name (str) – To build a mask, you need a reference tensor. So you need to pass the name of the tensor, which will definitely be in the dictionary of feature tensors. The second dimension (1 in zero indexing) of the tensor will be used to construct the attention mask.
num_heads (int) – Number of attention heads.
Transformer Heads
EmbeddingTyingHead
- class replay.nn.head.EmbeddingTyingHead
The model head for calculating the output logits as a dot product between the model hidden state and the item embeddings. The module supports both 2-d and 3-d tensors for the hidden state and the item embeddings.
As a result of the work, the scores for each item will be obtained.
- forward(hidden_states, item_embeddings)
- Parameters
hidden_states (Tensor) – hidden state of shape
(batch_size, embedding_dim)or(batch_size, sequence_length, embedding_dim).item_embeddings (Tensor) – item embeddings of shape
(num_items, embedding_dim)or(batch_size, num_items, embedding_dim).
- Returns
logits of shape
(batch_size, num_items)or(batch_size, sequence_length, num_items).- Return type
Tensor
Universal Lighting module
LightningModule
- class replay.nn.lightning.LightningModule(model, optimizer_factory=None, lr_scheduler_factory=None)
A universal wrapper class above the PyTorch model for working with Lightning library.
Pay attention to the format of the
forwardfunction’s return value.- __init__(model, optimizer_factory=None, lr_scheduler_factory=None)
- Parameters
model (Module) –
Initialized model.
Expected result of the model’s
forwardfunction is an object of theTrainOutputclass after training stage andInferenceOutputafter inference stage.optimizer_factory (Optional[BaseOptimizerFactory]) – Optimizer factory. Default:
None.lr_scheduler_factory (Optional[BaseLRSchedulerFactory]) – Learning rate schedule factory. Default:
None.
- property candidates_to_score: Optional[LongTensor]
- Getter
Returns a tensor containing the candidate IDs. The tensor will be used during the inference stage of the model.
If the parameter was not previously set,
Nonewill be returned.- Setter
A one-dimensional tensor containing candidate IDs is expected.
- forward(batch)
Implementation of the forward function.
- Parameters
batch (dict) – A dictionary containing all the necessary information to run the forward function on the model. The dictionary keys must match the names of the arguments in the model’s forward function. Keys that do not match the arguments of the model’s forward function are filtered out. If the model supports calculating logits for custom candidates on the inference stage, then you can submit them inside the batch or using the
candidates_to_scorefield.- Returns
During training, the model will return an object of the
TrainOutputcontainer class or its successor. At the inference stage, theInferenceOutputclass or its successor will be returned.- Return type
Union[TrainOutput, InferenceOutput]
TrainOutput
- class replay.nn.output.TrainOutput
Storing outputs from models training stage.
- Parameters
loss –
a tensor containing the calculated loss.
It is important that the tensor contains a gradient to call back propagation from the outside.
hidden_states –
Tuple of torch.Tensor.
One for the output of the embeddings, if the model has an embedding layer, + one for the output of each layer.
Expected shape:
(batch_size, sequence_length, hidden_size).
InferenceOutput
- class replay.nn.output.InferenceOutput
Storing outputs from models inference stage.
- Parameters
logits –
Sequence of hidden-states at the output of the last layer of the model.
Expected shape:
(batch_size, sequence_length, hidden_size).hidden_states –
Tuple of torch.Tensor (one for the output of the embeddings, if the model has an embedding layer, + one for the output of each layer).
Expected shape:
(batch_size, sequence_length, hidden_size).
Transforms for ParquetModule
This submodule contains a set of standard PyTorch tensor transformations necessary for neural network models.
These Transforms are intended for use with the ParquetModule (Lightning DataModule). For applying specify a sequence of transformations for every data split as ParquetModule’s transforms parameter.
Specified transformations will be applied per batch on device, then the resulting batch will be used as model input.
CopyTransform
- class replay.nn.transform.CopyTransform(mapping)
Copies a set of columns according to the provided mapping. All copied columns are detached from the graph to prevent erroneous differentiation.
Example:
>>> input_batch = {"item_id_mask": torch.BoolTensor([False, True, True])} >>> transform = CopyTransform({"item_id_mask" : "padding_id"}) >>> output_batch = transform(input_batch) >>> output_batch {'item_id_mask': tensor([False, True, True]), 'padding_id': tensor([False, True, True])}
- __init__(mapping)
- Parameters
mapping (dict[str, str]) – A dictionary maps which source tensors will be copied into the batch with new names. Tensors with new names will be copies of original ones, original tensors are stayed in batch.
GroupTransform
- class replay.nn.transform.GroupTransform(mapping)
Combines existing tensors from a batch moving them to the common groups. The name of the shared keys and the keys to be moved are specified in
mapping.Example:
>>> input_batch = { ... "item_id": torch.LongTensor([[30, 22, 1]]), ... "item_feature": torch.LongTensor([[1, 11, 11]]) ... } >>> transform = GroupTransform({"feature_tensors" : ["item_id", "item_feature"]}) >>> output_batch = transform(input_batch) >>> output_batch {'feature_tensors': {'item_id': tensor([[30, 22, 1]]), 'item_feature': tensor([[ 1, 11, 11]])}}
- __init__(mapping)
- Parameters
mapping (dict[str, list[str]]) – A dict mapping new names to a list of existing names for grouping.
RenameTransform
- class replay.nn.transform.RenameTransform(mapping)
Renames specific feature columns into new ones. Changes names in original dict, not creates a new dict. Example:
>>> input_batch = {"item_id_mask": torch.BoolTensor([False, True, True])} >>> transform = RenameTransform({"item_id_mask" : "padding_id"}) >>> output_batch = transform(input_batch) >>> output_batch {'padding_id': tensor([False, True, True])}
- __init__(mapping)
- Parameters
mapping (dict[str, str]) – A dict mapping existing names into new ones.
UnsqueezeTransform
- class replay.nn.transform.UnsqueezeTransform(column_name, dim)
Unsqueeze specified tensor along specified dimension.
Example:
>>> input_batch = {"padding_id": torch.BoolTensor([False, True, True])} >>> transform = UnsqueezeTransform("padding_id", dim=0) >>> output_batch = transform(input_batch) >>> output_batch {'padding_id': tensor([[False, True, True]])}
- __init__(column_name, dim)
- Parameters
column_name (str) – Name of tensor to be unsqueezed.
dim (int) – Dimension along which tensor will be unsqueezed.
NextTokenTransform
- class replay.nn.transform.NextTokenTransform(label_field, shift=1, query_features=['query_id', 'query_id_mask'], out_feature_name='positive_labels', mask_postfix='_mask')
For the tensor specified by key
label_field(typically “item_id”) in the batch, this transform creates a corresponding “labels” tensor with a keyout_feature_namein the batch, shifted forward by the specifiedshiftvalue. This “labels” tensor are a target that model predicts. Padding mask for “labels” is also created. For all the other features exceptedquery_features, lastshiftelements are truncated.This transform is required for the sequential models optimizing next token prediction task.
WARNING: In order to facilitate the shifting, this transform requires extra elements in the sequence. Therefore, when utilizing this transform, ensure you’re reading at least
sequence_length+shiftelements from your dataset. The resulting batch will have the relevant fields trimmed tosequence_length.Example:
>>> input_batch = { ... "user_id": torch.LongTensor([111]), ... "item_id": torch.LongTensor([[5, 0, 7, 4]]), ... "item_id_mask": torch.BoolTensor([[0, 1, 1, 1]]) ... } >>> transform = NextTokenTransform(label_field="item_id", shift=1, query_features="user_id") >>> output_batch = transform(input_batch) >>> output_batch {'user_id': tensor([111]), 'item_id': tensor([[5, 0, 7]]), 'item_id_mask': tensor([[False, True, True]]), 'positive_labels': tensor([[0, 7, 4]]), 'positive_labels_mask': tensor([[True, True, True]])}
- __init__(label_field, shift=1, query_features=['query_id', 'query_id_mask'], out_feature_name='positive_labels', mask_postfix='_mask')
- Parameters
label_field (str) – Name of target feature tensor to convert into labels.
shift (int) – Number of sequence items to shift by. Default: 1.
query_features (Union[List[str], str]) – Name of the query column or list of user features. These columns will be excepted from the shifting and will be stayed unchanged. Default:
["query_id", "query_id_mask"].out_feature_name (str) – The name of result feature in batch. Default:
"positive_labels".mask_postfix (str) – Postfix to append to the mask feature corresponding to resulting feature. Default:
"_mask".
TokenMaskTransform
- class replay.nn.transform.TokenMaskTransform(token_field, out_feature_name='token_mask', mask_prob=0.15, generator=None)
For the feature tensor specified by
token_field, randomly masks items in the sequence based on a uniform distribution with specified probability of masking. In fact, this transform creates mask for the Masked Language Modeling (MLM) task analog in the recommendations.Example:
>>> _ = torch.manual_seed(0) >>> input_tensor = {"padding_id": torch.BoolTensor([0, 1, 1])} >>> transform = TokenMaskTransform("padding_id") >>> output_tensor = transform(input_tensor) >>> output_tensor {'padding_id': tensor([False, True, True]), 'token_mask': tensor([False, True, False])}
- __init__(token_field, out_feature_name='token_mask', mask_prob=0.15, generator=None)
- Parameters
token_field (str) – Name of the column containing the unmasked tokes.
out_feature_name (str) – Name of the resulting mask column. Default:
token_mask.mask_prob (float) – Probability of masking the item, i.e. setting it to
0. Default:0.15.generator (Optional[Generator]) – Random number generator to be used for generating the uniform distribution. Default:
None.
TrimTransform
- class replay.nn.transform.TrimTransform(seq_len, feature_names)
Trims sequences of specified names feature_names keeping the specified sequence length seq_len on the right.
Example:
>>> input_batch = { ... "user_id": torch.LongTensor([111]), ... "item_id": torch.LongTensor([[5, 4, 0, 7, 4]]), ... "seen_ids": torch.LongTensor([[5, 4, 0, 7, 4]]), ... } >>> transform = TrimTransform(seq_len=3, feature_names="item_id") >>> output_batch = transform(input_batch) >>> output_batch {'user_id': tensor([111]), 'item_id': tensor([[0, 7, 4]]), 'seen_ids': tensor([[5, 4, 0, 7, 4]])}
- __init__(seq_len, feature_names)
- Parameters
seq_len (int) – max sequence length used in model. Must be positive.
feature_name – name of feature in batch to be trimmed.
SequenceRollTransform
- class replay.nn.transform.SequenceRollTransform(field_name, roll=-1, padding_value=0)
Rolls the data along axis 1 by the specified amount and fills the remaining positions by specified padding value.
Example:
>>> input_tensor = {"item_id": torch.LongTensor([[2, 3, 1]])} >>> transform = SequenceRollTransform("item_id", roll=-1, padding_value=10) >>> output_tensor = transform(input_tensor) >>> output_tensor {'item_id': tensor([[ 3, 1, 10]])}
- __init__(field_name, roll=-1, padding_value=0)
- Parameters
field_name (str) – Name of the target column from the batch to be rolled.
roll (int) – Number of positions to roll by. Default:
-1.padding_value (int) – The value to use as padding for the sequence. Default:
0.
UniformNegativeSamplingTransform
- class replay.nn.transform.UniformNegativeSamplingTransform(cardinality, num_negative_samples, *, out_feature_name='negative_labels', sample_distribution=None, generator=None)
Transform for global negative sampling.
For every batch, transform generates a vector of size
(num_negative_samples)consisting of random indices sampeled from a range ofcardinality. Unless a custom sample distribution is provided, the indices are weighted equally.Example:
>>> _ = torch.manual_seed(0) >>> input_batch = {"item_id": torch.LongTensor([[1, 0, 4]])} >>> transform = UniformNegativeSamplingTransform(cardinality=4, num_negative_samples=2) >>> output_batch = transform(input_batch) >>> output_batch {'item_id': tensor([[1, 0, 4]]), 'negative_labels': tensor([2, 1])}
- __init__(cardinality, num_negative_samples, *, out_feature_name='negative_labels', sample_distribution=None, generator=None)
- Parameters
cardinality (int) – number of unique items in vocabulary (catalog). The specified cardinality value must not take into account the padding value.
num_negative_samples (int) – The size of negatives vector to generate.
out_feature_name (Optional[str]) – The name of result feature in batch.
sample_distribution (Optional[Tensor]) – The weighs of indices in the vocabulary. If specified, must match the
cardinality. Default:None.generator (Optional[Generator]) – Random number generator to be used for sampling from the distribution. Default:
None.
MultiClassNegativeSamplingTransform
- class replay.nn.transform.MultiClassNegativeSamplingTransform(num_negative_samples, sample_mask, *, negative_selector_name='negative_selector', out_feature_name='negative_labels', generator=None)
Transform for generating negatives using a fixed class-assignment matrix.
For every batch, transform generates a tensor of size
(N, num_negative_samples), where N is number of classes. This tensor consists of random indices sampled using specified fixed class-assignment matrix.Also, transform receives from batch by key a tensor
negative_selector_nameof shape (batch size,), where i-th element in [0, N-1] specifies which class of N is used to select from sampled negatives that corresponds to every i-th batch row (user’s history sequence).The resulting negatives tensor has shape of
(batch_size, num_negative_samples).Example:
>>> _ = torch.manual_seed(0) >>> sample_mask = torch.tensor([ ... [1, 0, 1, 0, 0, 0], ... [0, 0, 0, 1, 1, 0], ... [0, 1, 0, 0, 0, 1], ... ]) >>> input_batch = {"negative_selector": torch.tensor([0, 2, 1, 1, 0])} >>> transform = MultiClassNegativeSamplingTransform( ... num_negative_samples=2, ... sample_mask=sample_mask ... ) >>> output_batch = transform(input_batch) >>> output_batch {'negative_selector': tensor([0, 2, 1, 1, 0]), 'negative_labels': tensor([[2, 0], [5, 1], [3, 4], [3, 4], [2, 0]])}
- __init__(num_negative_samples, sample_mask, *, negative_selector_name='negative_selector', out_feature_name='negative_labels', generator=None)
- Parameters
num_negative_samples (int) – The size of negatives vector to generate.
sample_mask (Tensor) – The class-assignment (indicator) matrix of shape:
(N, number of items in catalog), wheresample_mask[n, i]is a weight (or binary indicator) of assigning item i to class n.negative_selector_name (Optional[str]) – name of tensor in batch of shape (batch size,), where i-th element in [0, N-1] specifies which class of N is used to get negatives corresponding to i-th
query_idin batch.out_feature_name (Optional[str]) – The name of result feature in batch.
generator (Optional[Generator]) – Random number generator to be used for sampling from the distribution. Default:
None.
Standard set of transforms for models
SasRec Transforms
- replay.nn.transform.template.make_default_sasrec_transforms(tensor_schema, query_column='query_id')
Creates a valid transformation pipeline for SasRec data batches.
- Generated pipeline expects input dataset to contain the following columns:
Query ID column, specified by
query_column.Item ID column, specified in the tensor schema.
- Parameters
tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.
query_column (str) – Name of the column containing query IDs. Default:
"query_id".
- Returns
dict of transforms specified for every dataset split (train, validation, test, predict).
- Return type
dict[str, list[torch.nn.modules.module.Module]]
TwoTower Transforms
- replay.nn.transform.template.make_default_twotower_transforms(tensor_schema, query_column='query_id')
Creates a valid transformation pipeline for TwoTower data batches.
- Generated pipeline expects input dataset to contain the following columns:
Query ID column, specified by
query_column.Item ID column, specified in the tensor schema.
- Parameters
tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.
query_column (str) – Name of the column containing query IDs. Default:
"query_id".
- Returns
dict of transforms specified for every dataset split (train, validation, test, predict).
- Return type
dict[str, list[torch.nn.modules.module.Module]]
Easy training, validation and inference with Lightning
Replay provides Callbacks and Postprocessors to make the model training, validation and inference process as convenient as possible.
During training/validation:
You can define the list of validation metrics and the model is determined to be the best and is saved if the metric updates its value during validation.
During inference:
You can get the recommendations in the following formats:
PySpark DataFrame,Pandas DataFrame,Polars DataFrame,PyTorch tensors. Each of the types corresponds a callback. You can filter the results using postprocessors strategy. In addition to outputting logits (scores) from the model, you can output any hidden states usingHiddenStateCallback.
For a better understanding, you should look at examples of using neural network models.
Callbacks
ComputeMetricsCallback
- class replay.nn.lightning.callback.ComputeMetricsCallback(metrics=None, ks=None, postprocessors=None, item_count=None, ground_truth_column='ground_truth', train_column='train')
Callback for validation and testing stages.
If multiple validation/testing dataloaders are used, the suffix of the metric name will contain the serial number of the dataloader.
For the correct calculation of metrics inside the callback, the batch must contain the
ground_truth_columnkey - the padding value of this tensor can be any, the main condition is that the padding value does not overlap with the existing item ID values. For example, these can be negative values.To calculate the
coverageandnoveltymetrics, the batch must additionally contain thetrain_columnkey. The padding value of this tensor can be any, the main condition is that the padding value does not overlap with the existing item ID values. For example, these can be negative values.- __init__(metrics=None, ks=None, postprocessors=None, item_count=None, ground_truth_column='ground_truth', train_column='train')
- Parameters
metrics (Optional[list[Literal['recall', 'precision', 'ndcg', 'map', 'mrr', 'novelty', 'coverage']]]) –
Sequence of metrics to calculate.
Default:
None. This means that the default metrics will be used -Map,NDCG,Recall.ks (Optional[list[int]]) –
highest k scores in ranking.
Default:
None. This means that the defaultkswill be[1, 5, 10, 20].postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model. For example, it can be a softmax operation to logits or set the
-infvalue for some IDs. Default:None.item_count (Optional[int]) – the total number of items in the dataset, required only for
Coveragecalculations. Default:None.ground_truth_column (str) – Name of key in batch that contains ground truth items.
train_column (str) – Name of key in batch that contains items on which the model is trained.
PandasTopItemsCallback
- class replay.nn.lightning.callback.PandasTopItemsCallback(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
A callback that records the result of the model’s forward function at the inference stage in a Pandas Dataframe.
- __init__(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
- Parameters
top_k (int) – Take the
top_kIDs with the highest logit values.query_column (str) – The name of the query column in the resulting dataframe.
item_column (str) – The name of the item column in the resulting dataframe.
rating_column (str) – The name of the rating column in the resulting dataframe. This column will contain the
top_kitems with the highest logit values.postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K ones. For example, it can be a softmax operation to logits or set the
-infvalue for some IDs. Default:None.
- get_result()
- Returns
prediction result
- Return type
_T
PolarsTopItemsCallback
- class replay.nn.lightning.callback.PolarsTopItemsCallback(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
A callback that records the result of the model’s forward function at the inference stage in a Polars Dataframe.
- __init__(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
- Parameters
top_k (int) – Take the
top_kIDs with the highest logit values.query_column (str) – The name of the query column in the resulting dataframe.
item_column (str) – The name of the item column in the resulting dataframe.
rating_column (str) – The name of the rating column in the resulting dataframe. This column will contain the
top_kitems with the highest logit values.postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K ones. For example, it can be a softmax operation to logits or set the
-infvalue for some IDs. Default:None.
- get_result()
- Returns
prediction result
- Return type
_T
SparkTopItemsCallback
- class replay.nn.lightning.callback.SparkTopItemsCallback(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)
A callback that records the result of the model’s forward function at the inference stage in a Spark Dataframe.
- __init__(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)
- Parameters
top_k (int) – Take the
top_kIDs with the highest logit values.query_column (str) – The name of the query column in the resulting dataframe.
item_column (str) – The name of the item column in the resulting dataframe.
rating_column (str) – The name of the rating column in the resulting dataframe. This column will contain the
top_kitems with the highest logit values.spark_session (SparkSession) – Spark session. Required to create a Spark DataFrame.
postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K ones. For example, it can be a softmax operation to logits or set the
-infvalue for some IDs. Default:None.
- get_result()
- Returns
prediction result
- Return type
_T
TorchTopItemsCallback
- class replay.nn.lightning.callback.TorchTopItemsCallback(top_k, postprocessors=None)
A callback that records the result of the model’s forward function at the inference stage in a PyTorch Tensors.
- __init__(top_k, postprocessors=None)
- Parameters
top_k (int) – Take the
top_kIDs with the highest logit values.postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K. For example, it can be a softmax operation to logits or set the
-infvalue for some IDs. Default:None.
- get_result()
- Returns
prediction result
- Return type
_T
Postprocessors
PostprocessorBase
- class replay.nn.lightning.postprocessor.PostprocessorBase
Abstract base class for post processor
- __init__()
- abstract on_prediction(batch, logits)
The method is called externally inside the callback at the prediction (inference) stage.
- Parameters
batch (dict) – the batch sent to the model from the dataloader
logits (Tensor) – logits from the model
- Returns
modified logits
- Return type
Tensor
- abstract on_validation(batch, logits)
The method is called externally inside the callback at the validation stage.
- Parameters
batch (dict) – the batch sent to the model from the dataloader
logits (Tensor) – logits from the model
- Returns
modified logits
- Return type
Tensor
SeenItemsFilter
- class replay.nn.lightning.postprocessor.SeenItemsFilter(item_count, seen_items_column='seen_ids')
Masks (sets logits value to
-inf) the items that already have been seen in the given dataset (i.e. in the sequence of items for that logits are calculated).Should be used in Lightning callbacks for inferencing or metrics computing.
Input example:
logits [B=2 users, I=3 items]:
logits = [[0.1, 0.2, 0.3], # user0 [-0.1, -0.2, -0.3]] # user1
Seen items per user:
seen_items = user0: [1, 0] user1: [1, 2, 1]
Output example:
SeenItemsFilter sets logits of seen items to
-inf:processed_logits = [[ -inf, -inf, 0.3000], # user0 [-0.1000, -inf, -inf]] # user1
- __init__(item_count, seen_items_column='seen_ids')
- Parameters
item_count (int) –
Total number of items that the model knows about (
cardinality). It is recommended to take this value fromTensorSchema.Please note that values outside the range [0, item_count-1] are filtered out (considered as padding).
seen_items_column – Name of the column in batch that contains users’ interactions (seen item ids).
- on_prediction(batch, logits)
The method is called externally inside the callback at the prediction (inference) stage.
- Parameters
batch (dict) – the batch sent to the model from the dataloader
logits (Tensor) – logits from the model
- Returns
modified logits
- Return type
Tensor
- on_validation(batch, logits)
The method is called externally inside the callback at the validation stage.
- Parameters
batch (dict) – the batch sent to the model from the dataloader
logits (Tensor) – logits from the model
- Returns
modified logits
- Return type
Tensor