SasRec

SasRec

class replay.nn.sequential.SasRec(body, loss)

A model using the SasRec architecture as a hidden state generator. The hidden states are multiplied by the item embeddings, resulting in logits for each of the items.

Source paper: https://arxiv.org/pdf/1808.09781.

Example:

from replay.data import FeatureHint, FeatureSource, FeatureType
from replay.data.nn import TensorFeatureInfo, TensorFeatureSource, TensorSchema
from replay.nn.agg import SumAggregator
from replay.nn.embedding import SequenceEmbedding
from replay.nn.mask import DefaultAttentionMask
from replay.nn.loss import CESampled
from replay.nn.sequential import PositionAwareAggregator, SasRecTransformerLayer

tensor_schema = TensorSchema(
    [
        TensorFeatureInfo(
            "item_id",
            is_seq=True,
            feature_type=FeatureType.CATEGORICAL,
            embedding_dim=256,
            padding_value=NUM_UNIQUE_ITEMS,
            cardinality=NUM_UNIQUE_ITEMS+1,
            feature_hint=FeatureHint.ITEM_ID,
            feature_sources=[TensorFeatureSource(FeatureSource.INTERACTIONS, "item_id")]
        ),
    ]
)

body = SasRecBody(
    embedder=SequenceEmbedding(
        schema=tensor_schema,
    ),
    embedding_aggregator=PositionAwareAggregator(
        embedding_aggregator=SumAggregator(embedding_dim=256),
        max_sequence_length=100,
        dropout=0.2,
    ),
    attn_mask_builder=DefaultAttentionMask(
        reference_feature_name=tensor_schema.item_id_feature_name,
        num_heads=2,
    ),
    encoder=SasRecTransformerLayer(
        embedding_dim=256,
        num_heads=2,
        num_blocks=2,
        dropout=0.3,
        activation="relu",
    ),
    output_normalization=torch.nn.LayerNorm(256),
)
sasrec = SasRec(
    body=body,
    loss=CESampled(padding_idx=tensor_schema.item_id_features.item().padding_value)
)
__init__(body, loss)
Parameters
  • body (SasRecBody) – An instance of SasRecBody.

  • loss (LossProto) – An object of a class that performs loss calculation based on hidden states from the model, positive and optionally negative labels.

forward(feature_tensors, padding_mask, candidates_to_score=None, positive_labels=None, negative_labels=None, target_padding_mask=None)
Parameters
  • feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.

  • padding_mask (BoolTensor) – A mask of shape (batch_size, sequence_length) indicating which elements within key to ignore for the purpose of attention (i.e. treat as “padding”). False value indicates that the corresponding key value will be ignored.

  • candidates_to_score (Optional[LongTensor]) –

    a tensor containing item IDs for which you need to get logits at the inference stage.

    Note: you must take into account the padding value when creating the tensor.

    The tensor participates in calculations only on the inference stage. You don’t have to submit an argument at training stage, but if it is submitted, then no effect will be provided.

    Default: None.

  • positive_labels (Optional[LongTensor]) –

    a tensor containing positive labels for calculating the loss.

    You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.

    Default: None.

  • negative_labels (Optional[LongTensor]) –

    a tensor containing negative labels for calculating the loss.

    Note: Before run make sure that your loss supports calculations with negative labels.

    You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.

    Default: None.

  • target_padding_mask (Optional[BoolTensor]) –

    A mask of shape (batch_size, sequence_length, num_positives) indicating elements from positive_labels to ignore during loss calculation. False value indicates that the corresponding value will be ignored.

    You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.

    Default: None.

Returns

During training, the model will return an object of the TrainOutput container class. At the inference stage, the InferenceOutput class will be returned.

Return type

Union[TrainOutput, InferenceOutput]

SasRec Building Blocks

SasRecBody

class replay.nn.sequential.SasRecBody(embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)

Implementation of the architecture of the SasRec model.

It can include various self-written blocks for modifying the model, but the sequence of applying layers is fixed in accordance with the original architecture.

__init__(embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)
Parameters
  • embedder (EmbedderProto) – An object of a class that performs the logic of generating embeddings from an input set of tensors.

  • embedding_aggregator (AggregatorProto) –

    An object of a class that performs the logic of aggregating multiple embeddings.

    For example, it can be a sum, a mean, or a concatenation.

  • attn_mask_builder (AttentionMaskProto) – An object of a class that performs the logic of generating an attention mask based on the features and padding mask given to the model.

  • encoder (EncoderProto) – An object of a class that performs the logic of generating a hidden embedding representation based on features, padding masks, attention mask, and aggregated embedding.

  • output_normalization (NormalizerProto) –

    An object of a class that performs the logic of normalization of the hidden state obtained from the encoder.

    For example, it may be a torch.nn.LayerNorm or torch.nn.RMSNorm.

forward(feature_tensors, padding_mask)
Parameters
  • feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.

  • padding_mask (BoolTensor) – A mask of shape (batch_size, sequence_length) indicating which elements within key to ignore for the purpose of attention (i.e. treat as “padding”). False value indicates that the corresponding key value will be ignored.

Returns

The final hidden state.

Expected shape: (batch_size, sequence_length, embedding_dim)

Return type

Tensor

SasRecTransformerLayer

class replay.nn.sequential.SasRecTransformerLayer(embedding_dim, num_heads, num_blocks, dropout, activation='gelu')

SasRec vanilla layer. Layer consists of Multi-Head Attention followed by a Point-Wise Feed-Forward Network.

Source paper: https://arxiv.org/pdf/1808.09781.pdf

__init__(embedding_dim, num_heads, num_blocks, dropout, activation='gelu')
Parameters
  • embedding_dim (int) – Total dimension of the model. Must be divisible by num_heads.

  • num_heads (int) – Number of parallel attention heads.

  • num_blocks (int) – Number of Transformer blocks.

  • dropout (float) – probability of an element to be zeroed.

  • activation (Literal['relu', 'gelu']) – the name of the activation function. Default: "gelu".

forward(feature_tensors, input_embeddings, padding_mask, attention_mask)
Parameters
  • input_embeddings (Tensor) – Input tensor of shape (batch_size, sequence_length, embedding_dim).

  • padding_mask (BoolTensor) – A mask of shape (batch_size, sequence_length) indicating which elements within key to ignore for the purpose of attention (i.e. treat as “padding”). False value indicates that the corresponding key value will be ignored.

  • attention_mask (FloatTensor) –

    Causal-like mask for attention pattern, where -inf for PAD, 0 - otherwise.

    Possible shapes:

    1. (batch_size * num_heads, sequence_length, sequence_length)

    2. (batch_size, num_heads, sequence_length, sequence_length)

Returns

torch.Tensor: Output tensor after processing through the layer.

Return type

Tensor

PositionAwareAggregator

class replay.nn.sequential.PositionAwareAggregator(embedding_aggregator, max_sequence_length, dropout)

The layer for aggregating embeddings and adding positional encoding.

__init__(embedding_aggregator, max_sequence_length, dropout)
Parameters
  • embedding_aggregator (AggregatorProto) –

    An object of a class that performs the logic of aggregating multiple embeddings.

    For example, it can be a sum, a mean, or a concatenation.

  • max_sequence_length (int) – Max length of sequence.

  • dropout (float) – probability of an element to be zeroed.

forward(feature_tensors)
Parameters

feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to pass into embedding_aggregator.

Returns

Aggregated embeddings with positional encoding.

Return type

Tensor

MultiHead Differential Attention

MultiHeadDifferentialAttention

class replay.nn.attention.MultiHeadDifferentialAttention(embedding_dim, num_heads, lambda_init, bias=False, kdim=None, vdim=None)

Multi-Head Differential Attention Mechanism. Replaces the conventional softmax attention with a differential attention. Incorporattes a causal mask (if other not specified) to ensure autoregressive behavior.

Source paper: https://arxiv.org/pdf/2410.05258

__init__(embedding_dim, num_heads, lambda_init, bias=False, kdim=None, vdim=None)
Parameters
  • embedding_dim (int) – Total dimension of the model. Must be divisible by num_heads.

  • num_heads (int) – Number of parallel attention heads.

  • lambda_init (float) – Initial value for lambda.

  • bias (bool) – If specified, adds bias to input / output projection layers. Default: False.

  • kdim (Optional[int]) – Total number of features for keys. Default: None (uses kdim=embedding_dim).

  • vdim (Optional[int]) – Total number of features for values. Default: None (uses vdim=embedding_dim).

forward(query, key, value, attn_mask)

Forward pass for Multi-Head Differential Attention.

Parameters
  • query (Tensor) – Query sequence of shape (batch_size, sequence_length, embedding_dim).

  • key (Tensor) – Key sequence of shape (batch_size, sequence_length, embedding_dim).

  • value (Tensor) – Value sequence of shape (batch_size, sequence_length, embedding_dim).

  • attn_mask (FloatTensor) –

    attention mask, where -inf for PAD, 0 - otherwise.

    Possible shapes:

    1. (batch_size * num_heads, sequence_length, sequence_length)

    2. (batch_size, num_heads, sequence_length, sequence_length)

Returns

torch.Tensor: Output tensor after applying differential attention.

Return type

Tensor

DiffTransformerBlock

class replay.nn.sequential.DiffTransformerBlock(embedding_dim, num_heads, lambda_init)

Single Block of the DiffTransformer Architecture. Consists of Multi-Head Differential Attention followed by a SwiGLU Feed-Forward Network.

Source paper: https://arxiv.org/pdf/2410.05258

__init__(embedding_dim, num_heads, lambda_init)
Parameters
  • embedding_dim (int) – Total dimension of the model. Must be divisible by num_heads.

  • num_heads (int) – Number of parallel attention heads.

  • lambda_init (float) – Initial value for lambda.

forward(input_embeddings, attention_mask)

Forward pass for a single differential transformer block.

Parameters
  • input_embeddings (Tensor) – Input tensor of shape (batch_size, sequence_length, embedding_dim).

  • attention_mask (FloatTensor) –

    Causal-like mask for attention pattern, where -inf for PAD, 0 - otherwise.

    Possible shapes:

    1. (batch_size * num_heads, sequence_length, sequence_length)

    2. (batch_size, num_heads, sequence_length, sequence_length)

Returns

Output tensor after processing through the block.

Return type

Tensor

DiffTransformerLayer

class replay.nn.sequential.DiffTransformerLayer(embedding_dim, num_heads, num_blocks)

Stacked blocks of the DiffTransformer Architecture. Single block consists of Multi-Head Differential Attention followed by a SwiGLU Feed-Forward Network.

Source paper: https://arxiv.org/pdf/2410.05258

Reference: https://github.com/nanowell/Differential-Transformer-PyTorch/blob/main/DiffTransformer.py

__init__(embedding_dim, num_heads, num_blocks)
Parameters
  • embedding_dim (int) – Total dimension of the model. Must be divisible by num_heads.

  • num_heads (int) – Number of parallel attention heads.

  • num_blocks (int) – Number of Transformer blocks.

forward(input_embeddings, attention_mask)
Parameters
  • input_embeddings (Tensor) – Input tensor of shape (batch_size, sequence_length, embedding_dim).

  • attention_mask (FloatTensor) –

    Causal-like mask for attention pattern, where -inf for PAD, 0 - otherwise.

    Possible shapes:

    1. (batch_size * num_heads, sequence_length, sequence_length)

    2. (batch_size, num_heads, sequence_length, sequence_length)

Returns

Output tensor after processing through the layer.

Return type

Tensor

SasRec Transforms

replay.nn.transform.template.make_default_sasrec_transforms(tensor_schema, query_column='query_id')

Creates a valid transformation pipeline for SasRec data batches.

Generated pipeline expects input dataset to contain the following columns:
  1. Query ID column, specified by query_column.

  2. Item ID column, specified in the tensor schema.

Parameters
  • tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.

  • query_column (str) – Name of the column containing query IDs. Default: "query_id".

Returns

dict of transforms specified for every dataset split (train, validation, test, predict).

Return type

dict[str, list[torch.nn.modules.module.Module]]

TwoTower

TwoTower

class replay.nn.sequential.TwoTower(body, loss, context_merger=None)

Implementation generic Two-Tower architecture with two independent “towers” (encoders) which encode separate inputs. In recommender systems they are typically query tower and item tower. The output hidden states of each “tower” are fused via dot product in the model head.

Source paper: https://doi.org/10.1145/3366424.3386195

Example:

from replay.data import FeatureHint, FeatureSource, FeatureType
from replay.data.nn import TensorFeatureInfo, TensorFeatureSource, TensorSchema
from replay.nn.agg import SumAggregator
from replay.nn.embedding import SequenceEmbedding
from replay.nn.ffn import SwiGLUEncoder
from replay.nn.mask import DefaultAttentionMask
from replay.nn.loss import CESampled
from replay.nn.sequential import PositionAwareAggregator, SasRecTransformerLayer
from replay.nn.sequential.twotower import FeaturesReader

tensor_schema = TensorSchema(
    [
        TensorFeatureInfo(
            "item_id",
            is_seq=True,
            feature_type=FeatureType.CATEGORICAL,
            embedding_dim=256,
            padding_value=NUM_UNIQUE_ITEMS,
            cardinality=NUM_UNIQUE_ITEMS,
            feature_hint=FeatureHint.ITEM_ID,
            feature_sources=[TensorFeatureSource(FeatureSource.INTERACTIONS, "item_id")]
        ),
    ]
)

common_aggregator = SumAggregator(embedding_dim=256)

body = TwoTowerBody(
    schema=tensor_schema,
    embedder=SequenceEmbedding(schema=tensor_schema),
    attn_mask_builder=DefaultAttentionMask(
        reference_feature_name=tensor_schema.item_id_feature_name,
        num_heads=2,
    ),
    query_tower_feature_names=tensor_schema.names,
    item_tower_feature_names=tensor_schema.names,
    query_embedding_aggregator=PositionAwareAggregator(
        embedding_aggregator=common_aggregator,
        max_sequence_length=100,
        dropout=0.2,
    ),
    item_embedding_aggregator=common_aggregator,
    query_encoder=SasRecTransformerLayer(
       embedding_dim=256,
       num_heads=2,
       num_blocks=2,
       dropout=0.3,
       activation="relu",
    ),
    query_tower_output_normalization=torch.nn.LayerNorm(256),
    item_encoder=SwiGLUEncoder(embedding_dim=256, hidden_dim=2*256),
    item_features_reader=FeaturesReader(
        schema=tensor_schema,
        metadata={"item_id": {}},
        path="item_features.parquet",
    ),
)
twotower = TwoTower(
    body=body,
    loss=CESampled(ignore_index=tensor_schema["item_id"].padding_value),
)
__init__(body, loss, context_merger=None)
Parameters
  • body (TwoTowerBody) – An instance of TwoTowerBody.

  • loss (LossProto) – An object of a class that performs loss calculation based on hidden states from the model, positive and optionally negative labels.

  • context_merger (Optional[ContextMergerProto]) – An object of class that performs fusing query encoder hidden state with input feature tensors. Default: None.

forward(feature_tensors, padding_mask, candidates_to_score=None, positive_labels=None, negative_labels=None, target_padding_mask=None)
Parameters
  • feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.

  • padding_mask (BoolTensor) – A mask of shape (batch_size, sequence_length) indicating which elements within key to ignore for the purpose of attention (i.e. treat as “padding”). False value indicates that the corresponding key value will be ignored.

  • candidates_to_score (Optional[LongTensor]) –

    a tensor containing item IDs for which you need to get logits at the inference stage.

    Note: you must take into account the padding value when creating the tensor.

    The tensor participates in calculations only on the inference stage. You don’t have to submit an argument at training stage, but if it is submitted, then no effect will be provided.

    Default: None.

  • positive_labels (Optional[LongTensor]) –

    a tensor containing positive labels for calculating the loss.

    You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.

    Default: None.

  • negative_labels (Optional[LongTensor]) –

    a tensor containing negative labels for calculating the loss.

    Note: Before run make sure that your loss supports calculations with negative labels.

    You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.

    Default: None.

  • target_padding_mask (Optional[BoolTensor]) –

    A mask of shape (batch_size, sequence_length, num_positives) indicating elements from positive_labels to ignore during loss calculation. False value indicates that the corresponding value will be ignored.

    You don’t have to submit an argument at inference stage, but if it is submitted, then no effect will be provided.

    Default: None.

Returns

During training, the model will return an object of the TrainOutput container class. At the inference stage, the InferenceOutput class will be returned.

Return type

Union[TrainOutput, InferenceOutput]

classmethod from_params(schema, item_features_reader, embedding_dim=192, num_heads=4, num_blocks=2, max_sequence_length=50, dropout=0.3, excluded_features=None, categorical_list_feature_aggregation_method='sum')

Class method for fast creating an instance of TwoTower with typical types of blocks and user provided parameters.

The item “tower” is a SwiGLU encoder (MLP with SwiGLU activation),

the user “tower” is a SasRec transformer layers, and loss is Cross-Entropy loss.

Embeddings of every feature in both “towers” are aggregated via sum. The same features are be used in both “towers”, that is, the features specified in the tensor schema with the exception of excluded_features.

To create an instance of TwoTower with other types of blocks, please use the class constructor.

Parameters
  • schema (TensorSchema) – tensor schema object with metainformation about features.

  • item_features_reader (FeaturesReaderProtocol) –

    A class that implements reading features, processing them, and converting them to torch.Tensor for ItemTower. You can use replay.nn.sequential.twotower.FeaturesReader as a standard class.

    But you can implement your own feature processing, just follow the replay.nn.sequential.twotower.FeaturesReaderProtocol protocol.

  • embedding_dim (int) – embeddings dimension in both towers. Default: 192.

  • num_heads (int) – number of heads in user tower SasRec layers. Default: 4.

  • num_blocks (int) – number of blocks in user tower SasRec layers. Default: 2.

  • max_sequence_length (int) – maximun length of sequence in user tower SasRec layers. Default: 50.

  • dropout (float) – dropout value in both towers. Default: 0.3

  • excluded_features (Optional[list[str]]) – A list containing the names of features for which you do not need to generate an embedding. Fragments from this list are expected to be contained in schema. Default: None.

  • categorical_list_feature_aggregation_method (str) – Mode to aggregate tokens in token item representation (categorical list only). Default: "sum".

Returns

an instance of TwoTower class.

Return type

TwoTower

TwoTower Building Blocks

TwoTowerBody

class replay.nn.sequential.TwoTowerBody(schema, embedder, attn_mask_builder, query_tower_feature_names, item_tower_feature_names, query_embedding_aggregator, item_embedding_aggregator, query_encoder, query_tower_output_normalization, item_encoder, item_features_reader)

Foundation for Two-Tower model which creates query “tower” and item “tower”.

For usage of two tower model, an instance of this class should be passed into TwoTower with any loss from Losses.

QueryTower

class replay.nn.sequential.QueryTower(feature_names, embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)

Query Tower of Two-Tower model.

__init__(feature_names, embedder, embedding_aggregator, attn_mask_builder, encoder, output_normalization)
Parameters
  • feature_names (Sequence[str]) – sequence of names used in query tower.

  • embedder (EmbedderProto) – An object of a class that performs the logic of generating embeddings from an input batch.

  • embedding_aggregator (AggregatorProto) – An object of a class that performs the logic of aggregating multiple embeddings of query tower.

  • attn_mask_builder (AttentionMaskProto) – An object of a class that performs the logic of generating an attention mask based on the features and padding mask given to the model.

  • encoder (QueryEncoderProto) – An object of a class that performs the logic of generating a query hidden embedding representation based on features, padding masks, attention mask, and aggregated embedding of query_tower_feature_names. It’s supposed to be a transformer.

  • output_normalization (NormalizerProto) –

    An object of a class that performs the logic of normalization of the hidden state obtained from the query encoder.

    For example, it can be a torch.nn.LayerNorm or torch.nn.RMSNorm.

forward(feature_tensors, padding_mask)
Parameters
  • feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to generate embeddings.

  • padding_mask (BoolTensor) – A mask of shape (batch_size, sequence_length) indicating which elements within key to ignore for the purpose of attention (i.e. treat as “padding”). False value indicates that the corresponding key value will be ignored.

Returns

The final hidden state.

Expected shape: (batch_size, sequence_length, embedding_dim)

Return type

Tensor

ItemTower

class replay.nn.sequential.ItemTower(schema, item_features_reader, feature_names, embedder, embedding_aggregator, encoder)

Item Tower of Two-Tower model.

Note: ItemTower loads feature tensors of all items into memory.

__init__(schema, item_features_reader, feature_names, embedder, embedding_aggregator, encoder)
Parameters
  • schema (TensorSchema) – tensor schema object with metainformation about features.

  • item_features_reader (FeaturesReaderProtocol) –

    A class that implements reading features, processing them, and converting them to torch.Tensor for ItemTower. You can use replay.nn.sequential.twotower.FeaturesReader as a standard class.

    But you can implement your own feature processing, just follow the replay.nn.sequential.twotower.FeaturesReaderProtocol protocol.

  • feature_names (Sequence[str]) – sequence of names used in item tower.

  • embedder (EmbedderProto) – An object of a class that performs the logic of generating embeddings from an input batch.

  • embedding_aggregator (AggregatorProto) – An object of a class that performs the logic of aggregating multiple embeddings of item tower.

  • encoder (ItemEncoderProto) – An object of a class that performs the logic of generating an item hidden embedding representation based on features and aggregated embeddings of item_tower_feature_names. Item encoder uses item reference which is created based on item_features_path.

forward(candidates_to_score=None)
Parameters

candidates_to_score (Optional[LongTensor]) – IDs of items using for obtaining item embeddings from item tower. If is setted to None, all item embeddings from item tower will be returned. Default: None.

Returns

item embeddings.

Expected shape:

  • (candidates_to_score, embedding_dim),

  • (items_num, embedding_dim) if candidates_to_score is None.

TwoTower Transforms

replay.nn.transform.template.make_default_twotower_transforms(tensor_schema, query_column='query_id')

Creates a valid transformation pipeline for TwoTower data batches.

Generated pipeline expects input dataset to contain the following columns:
  1. Query ID column, specified by query_column.

  2. Item ID column, specified in the tensor schema.

Parameters
  • tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.

  • query_column (str) – Name of the column containing query IDs. Default: "query_id".

Returns

dict of transforms specified for every dataset split (train, validation, test, predict).

Return type

dict[str, list[torch.nn.modules.module.Module]]

Losses

BCE, BCESampled, CESampled, LogInCE, LogInCESampled, LogOutCE support the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence). Source of multi-positive labels: https://arxiv.org/abs/2205.04507

BCE

class replay.nn.loss.BCE(**kwargs)

Pointwise Binary Cross-Entropy loss. Calculates loss over all items catalog.

The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).

forward(model_embeddings, positive_labels, target_padding_mask)
Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – labels of positive events of shape (batch_size, sequence_length, num_positives).

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives).

Returns

computed loss value.

Return type

Tensor

BCESampled

class replay.nn.loss.BCESampled(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)

Sampled Pointwise Binary Cross-Entropy loss (BCE with negative sampling). Calculates loss between one positive item and K negatively sampled items.

The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).

__init__(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
Parameters
  • log_epsilon (float) – correction to avoid zero in the logarithm during loss calculating. Default: 1e-6.

  • clamp_border (float) – upper bound for clamping loss tensor, lower bound will be setted to -clamp_border. Default: 100.0.

  • negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default: -100.

forward(model_embeddings, positive_labels, negative_labels, target_padding_mask)
Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – labels of positive events of shape (batch_size, sequence_length, num_positives).

  • negative_labels (LongTensor) –

    labels of sampled negative events. Expected shape:

    • (batch_size, sequence_length, num_negatives)

    • (batch_size, num_negatives)

    • (num_negatives) - a case where the same negative events are used for the entire batch.

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives)

Returns

computed loss value.

Return type

Tensor

CE

class replay.nn.loss.CE(**kwargs)

Full Cross-Entropy loss Calculates loss over all items catalog.

__init__(**kwargs)

To calculate the loss, torch.nn.CrossEntropyLoss is used. You can pass all parameters for initializing the object via kwargs.

forward(model_embeddings, positive_labels, target_padding_mask)
Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – labels of positive events of shape (batch_size, sequence_length, num_positives).

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives).

Returns

computed loss value.

Return type

Tensor

CESampled

class replay.nn.loss.CESampled(negative_labels_ignore_index=-100, **kwargs)

Sampled Cross-Entropy loss (Cross-Entropy with negative sampling). Calculates loss between one positive item and K negatively sampled items.

The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).

__init__(negative_labels_ignore_index=-100, **kwargs)

To calculate the loss, torch.nn.CrossEntropyLoss is used. You can pass all parameters for initializing the object via kwargs.

Parameters

negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default: -100.

forward(model_embeddings, positive_labels, negative_labels, target_padding_mask)
Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – labels of positive events of shape (batch_size, sequence_length, num_positives).

  • negative_labels (LongTensor) –

    labels of sampled negative events.

    Expected shape:
    • (batch_size, sequence_length, num_negatives)

    • (batch_size, num_negatives)

    • (num_negatives) - a case where the same negative events are used for the entire batch.

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives)

Returns

computed loss value.

Return type

Tensor

LogInCE

class replay.nn.loss.LogInCE(cardinality, log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)

LogInCE loss.

\[L_{\text{InfoNCE}} = -\log \frac{\sum_{p \in P} \exp(\mathrm{sim}(q, p))}{\sum_{p \in P} \exp(\mathrm{sim}(q, p)) + \sum_{n \in N} \exp(\mathrm{sim}(q, n))},\]

where q – query embedding, P – set of positive logits, N – set of negative logits, \(sim(\cdot, \cdot)\) – similaruty function.

The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).

__init__(cardinality, log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
Parameters
  • cardinality (int) – number of unique items in vocabulary (catalog). The specified cardinality value must not take into account the padding value.

  • log_epsilon (float) – correction to avoid zero in the logarithm during loss calculating. Default: 1e-6.

  • clamp_border (float) – upper bound for clamping loss tensor, lower bound will be setted to -clamp_border. Default: 100.0.

  • negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default: -100.

forward(model_embeddings, positive_labels, target_padding_mask)

Note: At forward pass, the whole catalog of items is used as negatives. Next, negative logits, corresponding to positions where negative labels coincide with positive ones, are masked.

Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – ground truth labels of positive events of shape (batch_size, sequence_length, num_positives).

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives).

Returns

computed loss value.

Return type

Tensor

LogInCESampled

class replay.nn.loss.LogInCESampled(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)

Sampled version of LogInCE (Log InfoNCE) loss (with negative sampling items).

\[L_{\text{InfoNCE}} = -\log \frac{\sum_{p \in P} \exp(\mathrm{sim}(q, p))}{\sum_{p \in P} \exp(\mathrm{sim}(q, p)) + \sum_{n \in N_{\text{sampled}}} \exp(\mathrm{sim}(q, n))},\]

where q – query embedding, P – set of positive logits, \(N_sampled\) – set of negative logits, \(sim(\cdot, \cdot)\) – similaruty function.

Same as LogInCE, the difference in the set of negatives.

The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).

__init__(log_epsilon=1e-06, clamp_border=100.0, negative_labels_ignore_index=-100)
Parameters
  • log_epsilon (float) – correction to avoid zero in the logarithm during loss calculating. Default: 1e-6.

  • clamp_border (float) – upper bound for clamping loss tensor, lower bound will be setted to -clamp_border. Default: 100.0.

  • negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default: -100.

forward(model_embeddings, positive_labels, negative_labels, target_padding_mask)
Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – labels of positive events of shape (batch_size, sequence_length, num_positives).

  • negative_labels (LongTensor) –

    labels of sampled negative events.

    Expected shape:
    • (batch_size, sequence_length, num_negatives)

    • (batch_size, num_negatives)

    • (num_negatives) - a case where the same negative events are used for the entire batch.

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives)

Returns

computed loss value.

Return type

Tensor

LogOutCE

class replay.nn.loss.LogOutCE(cardinality, negative_labels_ignore_index=-100, **kwargs)

LogOutCE loss.

\[L_{\text{InfoNCE}} = - \sum_{p \in P} \log \frac{ \exp(\mathrm{sim}(q, p))} {\exp(\mathrm{sim}(q, p)) + \sum_{n \in N} \exp(\mathrm{sim}(q, n))}.\]

where q – query embedding, P – set of positive logits, N – set of negative logits, \(sim(\cdot, \cdot)\) – similaruty function.

The loss supports the calculation of logits for the case of multi-positive labels (there are several labels for each position in the sequence).

__init__(cardinality, negative_labels_ignore_index=-100, **kwargs)

To calculate the loss, torch.nn.CrossEntropyLoss is used. You can pass all parameters for initializing the object via kwargs.

Parameters
  • cardinality (int) – number of unique items in vocabulary (catalog). The specified cardinality value must not take into account the padding value.

  • negative_labels_ignore_index (int) – padding value for negative labels. This may be the case when negative labels are formed at the preprocessing level, rather than the negative sampler. The index is ignored and does not contribute to the loss. Default: -100.

forward(model_embeddings, positive_labels, target_padding_mask)

Note: At forward pass, the whole catalog of items is used as negatives. Next, negative logits, corresponding to positions where negative labels coincide with positive ones, are masked.

Parameters
  • model_embeddings (Tensor) – model output of shape (batch_size, sequence_length, embedding_dim).

  • positive_labels (LongTensor) – ground truth labels of positive events of shape (batch_size, sequence_length, num_positives).

  • target_padding_mask (BoolTensor) – padding mask corresponding for positive_labels of shape (batch_size, sequence_length, num_positives).

Returns

computed loss value.

Return type

Tensor

Scalable Cross Entropy

SCEParams

class replay.models.nn.loss.SCEParams(n_buckets, bucket_size_x, bucket_size_y, mix_x=False)

Set of parameters for ScalableCrossEntropyLoss.

Parameters
  • n_buckets (int) – Number of buckets into which samples will be distributed.

  • bucket_size_x (int) – Number of item hidden representations that will be in each bucket.

  • bucket_size_y (int) – Number of item embeddings that will be in each bucket.

  • mix_x (bool) – Whether a randomly generated matrix will be multiplied by the model output matrix or not. Default: False.

ScalableCrossEntropyLoss

class replay.models.nn.loss.ScalableCrossEntropyLoss(sce_params)
__call__(embeddings, positive_labels, all_embeddings, padding_mask, tokens_mask=None)

ScalableCrossEntropyLoss computation.

Parameters
  • embeddings (Tensor) – Matrix of the last transformer block outputs.

  • positive_labels (LongTensor) – Positive labels.

  • all_embeddings (Tensor) – Matrix of all item embeddings.

  • padding_mask (BoolTensor) – Padding mask.

  • tokens_mask (Optional[BoolTensor]) – Tokens mask (need only for Bert4Rec). Default: None.

Return type

Tensor

__init__(sce_params)

ScalableCrossEntropyLoss for Sequential Recommendations with Large Item Catalogs. Reference article may be found at https://arxiv.org/pdf/2409.18721.

Parameters

SCEParams – Dataclass with ScalableCrossEntropyLoss parameters. Dataclass contains following values:

Model Building Blocks

Building blocks for neural network models.

Embeddings

SequenceEmbedding

class replay.nn.embedding.SequenceEmbedding(schema, excluded_features=None, categorical_list_feature_aggregation_method='sum')

The embedding generation class for all types of features given into the sequential models.

The embedding size for each feature will be taken from TensorSchema (from field named embedding_dim). For numerical features, it is expected that the last dimension of the tensor will be equal to tensor_dim field in TensorSchema.

Keep in mind that the first dimension of the every categorical embedding (the size of embedding table) will equal to the cardinality + 1. This is necessary to take into account the padding value.

__init__(schema, excluded_features=None, categorical_list_feature_aggregation_method='sum')
Parameters
  • schema (TensorSchema) – TensorSchema containing meta information about all the features for which you need to generate an embedding.

  • excluded_features (Optional[list[str]]) – A list containing the names of features for which you do not need to generate an embedding. Fragments from this list are expected to be contained in schema. Default: None.

  • categorical_list_feature_aggregation_method (Literal['sum', 'mean', 'max']) – Mode to aggregate tokens in token item representation (categorical list only). Default: "sum".

property embeddings_dim: dict[str, int]

Returns the embedding dimensions for each of the features in the schema.

forward(feature_tensor, feature_names=None)
Parameters
  • feature_tensor (Mapping[str, Tensor]) – a dictionary of tensors to generate embedding. It is expected that the keys from this dictionary match the names of the features in the given schema.

  • feature_names (Optional[Sequence[str]]) –

    A custom list of features for which embeddings need to be generated. It is expected that the values from this list match the names of the features in the given schema.

    Default: None. This means that the names of the features from the schema will be used.

Returns

a dictionary with tensors that contains embeddings.

Return type

Mapping[str, Tensor]

get_item_weights(indices=None)

Getting the embedding weights for a feature that matches the item id feature with the name specified in the schema. It is expected that embeddings for this feature will definitely exist. Note: the row corresponding to the padding will be excluded from the returned weights. This logic will work if given indices is None.

Parameters

indices (Optional[LongTensor]) – Items indices.

Returns

Embeddings for specific items.

Return type

Tensor

CategoricalEmbedding

class replay.nn.embedding.CategoricalEmbedding(feature_info, categorical_list_feature_aggregation_method='sum')

The embedding generation class for categorical features. It supports working with single features for each event in sequence, as well as several (categorical list).

When using this class, keep in mind that the first dimension of the embedding (the size of embedding table) will equal to the cardinality + 1. This is necessary to take into account the padding value.

__init__(feature_info, categorical_list_feature_aggregation_method='sum')
Parameters
  • feature_info (TensorFeatureInfo) – Meta information about the feature.

  • categorical_list_feature_aggregation_method (Literal['sum', 'mean', 'max']) – Mode to aggregate tokens in token item representation (categorical list only). One of {sum, mean, max} Default: "sum".

property embedding_dim: int

Embedding dimension after applying the layer

forward(indices)
Parameters

indices (LongTensor) – Items indices.

Returns

Embeddings for specific items.

Return type

Tensor

property weight: Tensor

Returns the weights of the embedding layer, excluding the row that corresponds to the padding.

NumericalEmbedding

class replay.nn.embedding.NumericalEmbedding(feature_info)

The embedding generation class for numerical features. It supports working with single features for each event in sequence, as well as several (numerical list).

Note: if the embedding_dim field in TensorFeatureInfo for an incoming feature matches its last dimension (tensor_dim field in TensorFeatureInfo), then transformation will not be applied.

__init__(feature_info)
Parameters

feature_info (TensorFeatureInfo) – Meta information about the feature.

property embedding_dim: int

Embedding dimension after applying the layer

forward(values)

Numerical embedding forward pass.

Note: if the embedding_dim for an incoming feature matches its last dimension (tensor_dim), then transformation will not be applied.

Parameters

values (FloatTensor) – feature values.

Returns

Embeddings for specific items.

Return type

Tensor

property weight: Tensor

Returns the weight of the applied layer. If embedding_dim matches tensor_dim, then the identity matrix will be returned.

Aggregators

The main purpose of these modules is to aggregate embeddings. But in general, you can use them to aggregate any type of tensors.

SumAggregator

class replay.nn.agg.SumAggregator(embedding_dim)

The class summarizes the incoming embeddings. Note that for successful aggregation, the dimensions of all embeddings must match.

__init__(embedding_dim)
Parameters

embedding_dim (int) – The last dimension of incoming and outcoming embeddings.

property embedding_dim: int

The dimension of the output embedding

forward(feature_tensors)
Parameters

feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to sum up. The dimensions of all tensors in the dictionary must match.

Returns

torch.Tensor. The last dimension of the tensor is embedding_dim.

Return type

Tensor

ConcatAggregator

class replay.nn.agg.ConcatAggregator(input_embedding_dims, output_embedding_dim)

The class concatenates incoming embeddings by the last dimension.

If you need to concatenate several embeddings, then a linear layer will be applied to get the last dimension equal to embedding_dim.

If only one embedding comes to the input, then its last dimension is expected to be equal to embedding_dim.

__init__(input_embedding_dims, output_embedding_dim)
Parameters
  • input_embedding_dims (list[int]) – Dimensions of incoming embeddings.

  • output_embedding_dim (int) – The dimension of the output embedding after concatenation.

property embedding_dim: int

The dimension of the output embedding

forward(feature_tensors)

To ensure the deterministic nature of the result, the embeddings are concatenated in the ascending order of the keys in the dictionary.

Parameters

feature_tensors (Mapping[str, Tensor]) – a dictionary of tensors to concatenate.

Returns

The last dimension of the tensor is embedding_dim.

Return type

Tensor

Feed Forward Networks

PointWiseFeedForward

class replay.nn.ffn.PointWiseFeedForward(embedding_dim, dropout, activation='gelu')

Point wise feed forward network layer.

Source paper: https://arxiv.org/pdf/1808.09781.pdf

__init__(embedding_dim, dropout, activation='gelu')
Parameters
  • embedding_dim (int) – Dimension of the input features.

  • dropout (float) – probability of an element to be zeroed.

  • activation (Literal['relu', 'gelu']) – the name of the activation function. Default: "gelu".

forward(input_embeddings)
Parameters

input_embeddings (LongTensor) – Query feature tensor.

Returns

Output tensors.

Return type

LongTensor

SwiGLU

class replay.nn.ffn.SwiGLU(embedding_dim, hidden_dim)

SwiGLU Activation Function. Combines the Swish activation with Gated Linear Units.

__init__(embedding_dim, hidden_dim)
Parameters
  • embedding_dim (int) – Dimension of the input features.

  • hidden_dim (int) – Dimension of hidden layer. According to the original source, it is recommended to set the size of the hidden layer as \(2 \cdot \text{embedding_dim}\).

forward(input_embeddings)

Forward pass for SwiGLU.

Parameters

input_embeddings (Tensor) – Input tensor of shape (batch_size, sequence_length, embedding_dim).

Returns

Output tensor of shape (batch_size, sequence_length, embedding_dim).

Return type

Tensor

SwiGLUEncoder

class replay.nn.ffn.SwiGLUEncoder(embedding_dim, hidden_dim)

MLP block consists of SwiGLU Feed-Forward network followed by a RMSNorm layer with skip connection.

__init__(embedding_dim, hidden_dim)
Parameters

embedding_dim (int) – Dimension of the input features.

forward(input_embeddings)
Parameters

input_embeddings (Tensor) – Input tensor of shape (batch_size, sequence_length, embedding_dim).

Returns

Output tensor of shape (batch_size, sequence_length, embedding_dim).

Return type

Tensor

Attention Masks

DefaultAttentionMask

class replay.nn.mask.DefaultAttentionMask(reference_feature_name, num_heads)

Constructs a float lower-triangular attenstion mask of shape (batch_size * num_heads, sequence_length, sequence_length), where -inf for <PAD>, 0 - otherwise.

__call__(feature_tensor, padding_mask)
Parameters
  • feature_tensor (Mapping[str, Tensor]) – dict of features tensors.

  • padding_mask (BoolTensor) – Padding mask where 0 - <PAD>, 1 - otherwise.

Returns

Float attention mask of shape (B * num_heads, L, L), where -inf for <PAD>, 0 - otherwise.

Return type

FloatTensor

__init__(reference_feature_name, num_heads)
Parameters
  • reference_feature_name (str) – To build a mask, you need a reference tensor. So you need to pass the name of the tensor, which will definitely be in the dictionary of feature tensors. The second dimension (1 in zero indexing) of the tensor will be used to construct the attention mask.

  • num_heads (int) – Number of attention heads.

Transformer Heads

EmbeddingTyingHead

class replay.nn.head.EmbeddingTyingHead

The model head for calculating the output logits as a dot product between the model hidden state and the item embeddings. The module supports both 2-d and 3-d tensors for the hidden state and the item embeddings.

As a result of the work, the scores for each item will be obtained.

forward(hidden_states, item_embeddings)
Parameters
  • hidden_states (Tensor) – hidden state of shape (batch_size, embedding_dim) or (batch_size, sequence_length, embedding_dim).

  • item_embeddings (Tensor) – item embeddings of shape (num_items, embedding_dim) or (batch_size, num_items, embedding_dim).

Returns

logits of shape (batch_size, num_items) or (batch_size, sequence_length, num_items).

Return type

Tensor

Universal Lighting module

LightningModule

class replay.nn.lightning.LightningModule(model, optimizer_factory=None, lr_scheduler_factory=None)

A universal wrapper class above the PyTorch model for working with Lightning library.

Pay attention to the format of the forward function’s return value.

__init__(model, optimizer_factory=None, lr_scheduler_factory=None)
Parameters
  • model (Module) –

    Initialized model.

    Expected result of the model’s forward function is an object of the TrainOutput class after training stage and InferenceOutput after inference stage.

  • optimizer_factory (Optional[BaseOptimizerFactory]) – Optimizer factory. Default: None.

  • lr_scheduler_factory (Optional[BaseLRSchedulerFactory]) – Learning rate schedule factory. Default: None.

property candidates_to_score: Optional[LongTensor]
Getter

Returns a tensor containing the candidate IDs. The tensor will be used during the inference stage of the model.

If the parameter was not previously set, None will be returned.

Setter

A one-dimensional tensor containing candidate IDs is expected.

forward(batch)

Implementation of the forward function.

Parameters

batch (dict) – A dictionary containing all the necessary information to run the forward function on the model. The dictionary keys must match the names of the arguments in the model’s forward function. Keys that do not match the arguments of the model’s forward function are filtered out. If the model supports calculating logits for custom candidates on the inference stage, then you can submit them inside the batch or using the candidates_to_score field.

Returns

During training, the model will return an object of the TrainOutput container class or its successor. At the inference stage, the InferenceOutput class or its successor will be returned.

Return type

Union[TrainOutput, InferenceOutput]

TrainOutput

class replay.nn.output.TrainOutput

Storing outputs from models training stage.

Parameters
  • loss

    a tensor containing the calculated loss.

    It is important that the tensor contains a gradient to call back propagation from the outside.

  • hidden_states

    Tuple of torch.Tensor.

    One for the output of the embeddings, if the model has an embedding layer, + one for the output of each layer.

    Expected shape: (batch_size, sequence_length, hidden_size).

InferenceOutput

class replay.nn.output.InferenceOutput

Storing outputs from models inference stage.

Parameters
  • logits

    Sequence of hidden-states at the output of the last layer of the model.

    Expected shape: (batch_size, sequence_length, hidden_size).

  • hidden_states

    Tuple of torch.Tensor (one for the output of the embeddings, if the model has an embedding layer, + one for the output of each layer).

    Expected shape: (batch_size, sequence_length, hidden_size).

Transforms for ParquetModule

This submodule contains a set of standard PyTorch tensor transformations necessary for neural network models. These Transforms are intended for use with the ParquetModule (Lightning DataModule). For applying specify a sequence of transformations for every data split as ParquetModule’s transforms parameter. Specified transformations will be applied per batch on device, then the resulting batch will be used as model input.

CopyTransform

class replay.nn.transform.CopyTransform(mapping)

Copies a set of columns according to the provided mapping. All copied columns are detached from the graph to prevent erroneous differentiation.

Example:

>>> input_batch = {"item_id_mask": torch.BoolTensor([False, True, True])}
>>> transform = CopyTransform({"item_id_mask" : "padding_id"})
>>> output_batch = transform(input_batch)
>>> output_batch
{'item_id_mask': tensor([False,  True,  True]),
'padding_id': tensor([False,  True,  True])}
__init__(mapping)
Parameters

mapping (dict[str, str]) – A dictionary maps which source tensors will be copied into the batch with new names. Tensors with new names will be copies of original ones, original tensors are stayed in batch.

GroupTransform

class replay.nn.transform.GroupTransform(mapping)

Combines existing tensors from a batch moving them to the common groups. The name of the shared keys and the keys to be moved are specified in mapping.

Example:

>>> input_batch = {
...     "item_id": torch.LongTensor([[30, 22, 1]]),
...     "item_feature": torch.LongTensor([[1, 11, 11]])
... }
>>> transform = GroupTransform({"feature_tensors" : ["item_id", "item_feature"]})
>>> output_batch = transform(input_batch)
>>> output_batch
{'feature_tensors': {'item_id': tensor([[30, 22,  1]]),
'item_feature': tensor([[ 1, 11, 11]])}}
__init__(mapping)
Parameters

mapping (dict[str, list[str]]) – A dict mapping new names to a list of existing names for grouping.

RenameTransform

class replay.nn.transform.RenameTransform(mapping)

Renames specific feature columns into new ones. Changes names in original dict, not creates a new dict. Example:

>>> input_batch = {"item_id_mask": torch.BoolTensor([False, True, True])}
>>> transform = RenameTransform({"item_id_mask" : "padding_id"})
>>> output_batch = transform(input_batch)
>>> output_batch
{'padding_id': tensor([False,  True,  True])}
__init__(mapping)
Parameters

mapping (dict[str, str]) – A dict mapping existing names into new ones.

UnsqueezeTransform

class replay.nn.transform.UnsqueezeTransform(column_name, dim)

Unsqueeze specified tensor along specified dimension.

Example:

>>> input_batch = {"padding_id": torch.BoolTensor([False, True, True])}
>>> transform = UnsqueezeTransform("padding_id", dim=0)
>>> output_batch = transform(input_batch)
>>> output_batch
{'padding_id': tensor([[False,  True,  True]])}
__init__(column_name, dim)
Parameters
  • column_name (str) – Name of tensor to be unsqueezed.

  • dim (int) – Dimension along which tensor will be unsqueezed.

NextTokenTransform

class replay.nn.transform.NextTokenTransform(label_field, shift=1, query_features=['query_id', 'query_id_mask'], out_feature_name='positive_labels', mask_postfix='_mask')

For the tensor specified by key label_field (typically “item_id”) in the batch, this transform creates a corresponding “labels” tensor with a key out_feature_name in the batch, shifted forward by the specified shift value. This “labels” tensor are a target that model predicts. Padding mask for “labels” is also created. For all the other features excepted query_features, last shift elements are truncated.

This transform is required for the sequential models optimizing next token prediction task.

WARNING: In order to facilitate the shifting, this transform requires extra elements in the sequence. Therefore, when utilizing this transform, ensure you’re reading at least sequence_length + shift elements from your dataset. The resulting batch will have the relevant fields trimmed to sequence_length.

Example:

>>> input_batch = {
...     "user_id": torch.LongTensor([111]),
...     "item_id": torch.LongTensor([[5, 0, 7, 4]]),
...     "item_id_mask": torch.BoolTensor([[0, 1, 1, 1]])
... }
>>> transform = NextTokenTransform(label_field="item_id", shift=1, query_features="user_id")
>>> output_batch = transform(input_batch)
>>> output_batch
{'user_id': tensor([111]),
'item_id': tensor([[5, 0, 7]]),
'item_id_mask': tensor([[False,  True,  True]]),
'positive_labels': tensor([[0, 7, 4]]),
'positive_labels_mask': tensor([[True, True, True]])}
__init__(label_field, shift=1, query_features=['query_id', 'query_id_mask'], out_feature_name='positive_labels', mask_postfix='_mask')
Parameters
  • label_field (str) – Name of target feature tensor to convert into labels.

  • shift (int) – Number of sequence items to shift by. Default: 1.

  • query_features (Union[List[str], str]) – Name of the query column or list of user features. These columns will be excepted from the shifting and will be stayed unchanged. Default: ["query_id", "query_id_mask"].

  • out_feature_name (str) – The name of result feature in batch. Default: "positive_labels".

  • mask_postfix (str) – Postfix to append to the mask feature corresponding to resulting feature. Default: "_mask".

TokenMaskTransform

class replay.nn.transform.TokenMaskTransform(token_field, out_feature_name='token_mask', mask_prob=0.15, generator=None)

For the feature tensor specified by token_field, randomly masks items in the sequence based on a uniform distribution with specified probability of masking. In fact, this transform creates mask for the Masked Language Modeling (MLM) task analog in the recommendations.

Example:

>>> _ = torch.manual_seed(0)
>>> input_tensor = {"padding_id": torch.BoolTensor([0, 1, 1])}
>>> transform = TokenMaskTransform("padding_id")
>>> output_tensor = transform(input_tensor)
>>> output_tensor
{'padding_id': tensor([False,  True,  True]),
'token_mask': tensor([False,  True, False])}
__init__(token_field, out_feature_name='token_mask', mask_prob=0.15, generator=None)
Parameters
  • token_field (str) – Name of the column containing the unmasked tokes.

  • out_feature_name (str) – Name of the resulting mask column. Default: token_mask.

  • mask_prob (float) – Probability of masking the item, i.e. setting it to 0. Default: 0.15.

  • generator (Optional[Generator]) – Random number generator to be used for generating the uniform distribution. Default: None.

TrimTransform

class replay.nn.transform.TrimTransform(seq_len, feature_names)

Trims sequences of specified names feature_names keeping the specified sequence length seq_len on the right.

Example:

>>> input_batch = {
...     "user_id": torch.LongTensor([111]),
...     "item_id": torch.LongTensor([[5, 4, 0, 7, 4]]),
...     "seen_ids": torch.LongTensor([[5, 4, 0, 7, 4]]),
... }
>>> transform = TrimTransform(seq_len=3, feature_names="item_id")
>>> output_batch = transform(input_batch)
>>> output_batch
{'user_id': tensor([111]),
'item_id': tensor([[0, 7, 4]]),
'seen_ids': tensor([[5, 4, 0, 7, 4]])}
__init__(seq_len, feature_names)
Parameters
  • seq_len (int) – max sequence length used in model. Must be positive.

  • feature_name – name of feature in batch to be trimmed.

SequenceRollTransform

class replay.nn.transform.SequenceRollTransform(field_name, roll=-1, padding_value=0)

Rolls the data along axis 1 by the specified amount and fills the remaining positions by specified padding value.

Example:

>>> input_tensor = {"item_id": torch.LongTensor([[2, 3, 1]])}
>>> transform = SequenceRollTransform("item_id", roll=-1, padding_value=10)
>>> output_tensor = transform(input_tensor)
>>> output_tensor
{'item_id': tensor([[ 3,  1, 10]])}
__init__(field_name, roll=-1, padding_value=0)
Parameters
  • field_name (str) – Name of the target column from the batch to be rolled.

  • roll (int) – Number of positions to roll by. Default: -1.

  • padding_value (int) – The value to use as padding for the sequence. Default: 0.

UniformNegativeSamplingTransform

class replay.nn.transform.UniformNegativeSamplingTransform(cardinality, num_negative_samples, *, out_feature_name='negative_labels', sample_distribution=None, generator=None)

Transform for global negative sampling.

For every batch, transform generates a vector of size (num_negative_samples) consisting of random indices sampeled from a range of cardinality. Unless a custom sample distribution is provided, the indices are weighted equally.

Example:

>>> _ = torch.manual_seed(0)
>>> input_batch = {"item_id": torch.LongTensor([[1, 0, 4]])}
>>> transform = UniformNegativeSamplingTransform(cardinality=4, num_negative_samples=2)
>>> output_batch = transform(input_batch)
>>> output_batch
{'item_id': tensor([[1, 0, 4]]), 'negative_labels': tensor([2, 1])}
__init__(cardinality, num_negative_samples, *, out_feature_name='negative_labels', sample_distribution=None, generator=None)
Parameters
  • cardinality (int) – number of unique items in vocabulary (catalog). The specified cardinality value must not take into account the padding value.

  • num_negative_samples (int) – The size of negatives vector to generate.

  • out_feature_name (Optional[str]) – The name of result feature in batch.

  • sample_distribution (Optional[Tensor]) – The weighs of indices in the vocabulary. If specified, must match the cardinality. Default: None.

  • generator (Optional[Generator]) – Random number generator to be used for sampling from the distribution. Default: None.

MultiClassNegativeSamplingTransform

class replay.nn.transform.MultiClassNegativeSamplingTransform(num_negative_samples, sample_mask, *, negative_selector_name='negative_selector', out_feature_name='negative_labels', generator=None)

Transform for generating negatives using a fixed class-assignment matrix.

For every batch, transform generates a tensor of size (N, num_negative_samples), where N is number of classes. This tensor consists of random indices sampled using specified fixed class-assignment matrix.

Also, transform receives from batch by key a tensor negative_selector_name of shape (batch size,), where i-th element in [0, N-1] specifies which class of N is used to select from sampled negatives that corresponds to every i-th batch row (user’s history sequence).

The resulting negatives tensor has shape of (batch_size, num_negative_samples).

Example:

>>> _ = torch.manual_seed(0)
>>> sample_mask = torch.tensor([
...     [1, 0, 1, 0, 0, 0],
...     [0, 0, 0, 1, 1, 0],
...     [0, 1, 0, 0, 0, 1],
... ])
>>> input_batch = {"negative_selector": torch.tensor([0, 2, 1, 1, 0])}
>>> transform = MultiClassNegativeSamplingTransform(
...                 num_negative_samples=2,
...                 sample_mask=sample_mask
... )
>>> output_batch = transform(input_batch)
>>> output_batch
{'negative_selector': tensor([0, 2, 1, 1, 0]),
 'negative_labels': tensor([[2, 0],
         [5, 1],
         [3, 4],
         [3, 4],
         [2, 0]])}
__init__(num_negative_samples, sample_mask, *, negative_selector_name='negative_selector', out_feature_name='negative_labels', generator=None)
Parameters
  • num_negative_samples (int) – The size of negatives vector to generate.

  • sample_mask (Tensor) – The class-assignment (indicator) matrix of shape: (N, number of items in catalog), where sample_mask[n, i] is a weight (or binary indicator) of assigning item i to class n.

  • negative_selector_name (Optional[str]) – name of tensor in batch of shape (batch size,), where i-th element in [0, N-1] specifies which class of N is used to get negatives corresponding to i-th query_id in batch.

  • out_feature_name (Optional[str]) – The name of result feature in batch.

  • generator (Optional[Generator]) – Random number generator to be used for sampling from the distribution. Default: None.

Standard set of transforms for models

SasRec Transforms

replay.nn.transform.template.make_default_sasrec_transforms(tensor_schema, query_column='query_id')

Creates a valid transformation pipeline for SasRec data batches.

Generated pipeline expects input dataset to contain the following columns:
  1. Query ID column, specified by query_column.

  2. Item ID column, specified in the tensor schema.

Parameters
  • tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.

  • query_column (str) – Name of the column containing query IDs. Default: "query_id".

Returns

dict of transforms specified for every dataset split (train, validation, test, predict).

Return type

dict[str, list[torch.nn.modules.module.Module]]

TwoTower Transforms

replay.nn.transform.template.make_default_twotower_transforms(tensor_schema, query_column='query_id')

Creates a valid transformation pipeline for TwoTower data batches.

Generated pipeline expects input dataset to contain the following columns:
  1. Query ID column, specified by query_column.

  2. Item ID column, specified in the tensor schema.

Parameters
  • tensor_schema (TensorSchema) – TensorSchema used to infer feature columns.

  • query_column (str) – Name of the column containing query IDs. Default: "query_id".

Returns

dict of transforms specified for every dataset split (train, validation, test, predict).

Return type

dict[str, list[torch.nn.modules.module.Module]]

Easy training, validation and inference with Lightning

Replay provides Callbacks and Postprocessors to make the model training, validation and inference process as convenient as possible.

During training/validation:

You can define the list of validation metrics and the model is determined to be the best and is saved if the metric updates its value during validation.

During inference:

You can get the recommendations in the following formats: PySpark DataFrame, Pandas DataFrame, Polars DataFrame, PyTorch tensors. Each of the types corresponds a callback. You can filter the results using postprocessors strategy. In addition to outputting logits (scores) from the model, you can output any hidden states using HiddenStateCallback.

For a better understanding, you should look at examples of using neural network models.

Callbacks

ComputeMetricsCallback

class replay.nn.lightning.callback.ComputeMetricsCallback(metrics=None, ks=None, postprocessors=None, item_count=None, ground_truth_column='ground_truth', train_column='train')

Callback for validation and testing stages.

If multiple validation/testing dataloaders are used, the suffix of the metric name will contain the serial number of the dataloader.

For the correct calculation of metrics inside the callback, the batch must contain the ground_truth_column key - the padding value of this tensor can be any, the main condition is that the padding value does not overlap with the existing item ID values. For example, these can be negative values.

To calculate the coverage and novelty metrics, the batch must additionally contain the train_column key. The padding value of this tensor can be any, the main condition is that the padding value does not overlap with the existing item ID values. For example, these can be negative values.

__init__(metrics=None, ks=None, postprocessors=None, item_count=None, ground_truth_column='ground_truth', train_column='train')
Parameters
  • metrics (Optional[list[Literal['recall', 'precision', 'ndcg', 'map', 'mrr', 'novelty', 'coverage']]]) –

    Sequence of metrics to calculate.

    Default: None. This means that the default metrics will be used - Map, NDCG, Recall.

  • ks (Optional[list[int]]) –

    highest k scores in ranking.

    Default: None. This means that the default ks will be [1, 5, 10, 20].

  • postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model. For example, it can be a softmax operation to logits or set the -inf value for some IDs. Default: None.

  • item_count (Optional[int]) – the total number of items in the dataset, required only for Coverage calculations. Default: None.

  • ground_truth_column (str) – Name of key in batch that contains ground truth items.

  • train_column (str) – Name of key in batch that contains items on which the model is trained.

PandasTopItemsCallback

class replay.nn.lightning.callback.PandasTopItemsCallback(top_k, query_column, item_column, rating_column='rating', postprocessors=None)

A callback that records the result of the model’s forward function at the inference stage in a Pandas Dataframe.

__init__(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
Parameters
  • top_k (int) – Take the top_k IDs with the highest logit values.

  • query_column (str) – The name of the query column in the resulting dataframe.

  • item_column (str) – The name of the item column in the resulting dataframe.

  • rating_column (str) – The name of the rating column in the resulting dataframe. This column will contain the top_k items with the highest logit values.

  • postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K ones. For example, it can be a softmax operation to logits or set the -inf value for some IDs. Default: None.

get_result()
Returns

prediction result

Return type

_T

PolarsTopItemsCallback

class replay.nn.lightning.callback.PolarsTopItemsCallback(top_k, query_column, item_column, rating_column='rating', postprocessors=None)

A callback that records the result of the model’s forward function at the inference stage in a Polars Dataframe.

__init__(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
Parameters
  • top_k (int) – Take the top_k IDs with the highest logit values.

  • query_column (str) – The name of the query column in the resulting dataframe.

  • item_column (str) – The name of the item column in the resulting dataframe.

  • rating_column (str) – The name of the rating column in the resulting dataframe. This column will contain the top_k items with the highest logit values.

  • postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K ones. For example, it can be a softmax operation to logits or set the -inf value for some IDs. Default: None.

get_result()
Returns

prediction result

Return type

_T

SparkTopItemsCallback

class replay.nn.lightning.callback.SparkTopItemsCallback(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)

A callback that records the result of the model’s forward function at the inference stage in a Spark Dataframe.

__init__(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)
Parameters
  • top_k (int) – Take the top_k IDs with the highest logit values.

  • query_column (str) – The name of the query column in the resulting dataframe.

  • item_column (str) – The name of the item column in the resulting dataframe.

  • rating_column (str) – The name of the rating column in the resulting dataframe. This column will contain the top_k items with the highest logit values.

  • spark_session (SparkSession) – Spark session. Required to create a Spark DataFrame.

  • postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K ones. For example, it can be a softmax operation to logits or set the -inf value for some IDs. Default: None.

get_result()
Returns

prediction result

Return type

_T

TorchTopItemsCallback

class replay.nn.lightning.callback.TorchTopItemsCallback(top_k, postprocessors=None)

A callback that records the result of the model’s forward function at the inference stage in a PyTorch Tensors.

__init__(top_k, postprocessors=None)
Parameters
  • top_k (int) – Take the top_k IDs with the highest logit values.

  • postprocessors (Optional[list[replay.nn.lightning.postprocessor._base.PostprocessorBase]]) – A list of postprocessors for modifying logits from the model before sorting and taking top K. For example, it can be a softmax operation to logits or set the -inf value for some IDs. Default: None.

get_result()
Returns

prediction result

Return type

_T

HiddenStatesCallback

class replay.nn.lightning.callback.HiddenStatesCallback(hidden_state_index)

A callback for getting any hidden state from the model.

When applying this callback, it is expected that the result of the model’s forward function contains the hidden_states key.

__init__(hidden_state_index)
Parameters

hidden_state_index (int) – It is expected that the result of the model’s forward function contains the hidden_states key. hidden_states key contains Tuple of PyTorch Tensors. Therefore, to get a specific hidden state, you need to submit an index from this tuple.

get_result()
Returns

Hidden states through all batches.

Postprocessors

PostprocessorBase

class replay.nn.lightning.postprocessor.PostprocessorBase

Abstract base class for post processor

__init__()
abstract on_prediction(batch, logits)

The method is called externally inside the callback at the prediction (inference) stage.

Parameters
  • batch (dict) – the batch sent to the model from the dataloader

  • logits (Tensor) – logits from the model

Returns

modified logits

Return type

Tensor

abstract on_validation(batch, logits)

The method is called externally inside the callback at the validation stage.

Parameters
  • batch (dict) – the batch sent to the model from the dataloader

  • logits (Tensor) – logits from the model

Returns

modified logits

Return type

Tensor

SeenItemsFilter

class replay.nn.lightning.postprocessor.SeenItemsFilter(item_count, seen_items_column='seen_ids')

Masks (sets logits value to -inf) the items that already have been seen in the given dataset (i.e. in the sequence of items for that logits are calculated).

Should be used in Lightning callbacks for inferencing or metrics computing.

Input example:

logits [B=2 users, I=3 items]:

logits =
[[0.1, 0.2, 0.3],    # user0
[-0.1, -0.2, -0.3]]  # user1

Seen items per user:

seen_items =
user0: [1, 0]
user1: [1, 2, 1]

Output example:

SeenItemsFilter sets logits of seen items to -inf:

processed_logits =
[[   -inf,    -inf,  0.3000], # user0
[-0.1000,    -inf,    -inf]]  # user1
__init__(item_count, seen_items_column='seen_ids')
Parameters
  • item_count (int) –

    Total number of items that the model knows about (cardinality). It is recommended to take this value from TensorSchema.

    Please note that values ​​outside the range [0, item_count-1] are filtered out (considered as padding).

  • seen_items_column – Name of the column in batch that contains users’ interactions (seen item ids).

on_prediction(batch, logits)

The method is called externally inside the callback at the prediction (inference) stage.

Parameters
  • batch (dict) – the batch sent to the model from the dataloader

  • logits (Tensor) – logits from the model

Returns

modified logits

Return type

Tensor

on_validation(batch, logits)

The method is called externally inside the callback at the validation stage.

Parameters
  • batch (dict) – the batch sent to the model from the dataloader

  • logits (Tensor) – logits from the model

Returns

modified logits

Return type

Tensor