Models

This module contains recommender system algorithms including:

  • distributed models built in PySpark

  • neural networks build in PyTorch with distributed inference in PySpark

  • wrappers for commonly used recommender systems libraries andmodels with non-distributed training and distributed inference in PySpark.

RePlay Recommenders

Algorithm

Implementation

Popular Recommender

PySpark

Popular By Queries

PySpark

Wilson Recommender

PySpark

Random Recommender

PySpark

UCB

PySpark

KL-UCB

PySpark/Python CPU

LinUCB

PySpark/Python CPU

Thompson Sampling

PySpark

K-Nearest Neighbours

PySpark

Alternating Least Squares

PySpark

SLIM

PySpark

Word2Vec Recommender

PySpark

Association Rules Item-to-Item Recommender

PySpark

Cluster Recommender

PySpark

Neural Matrix Factorization (Experimental)

Python CPU/GPU

MultVAE (Experimental)

Python CPU/GPU

DDPG (Experimental)

Python CPU

DT4Rec (Experimental)

Python CPU/GPU

NeuralTS (Experimental)

Python CPU/GPU

ADMM SLIM (Experimental)

Python CPU

Wrapper for implicit (Experimental)

Python CPU

Wrapper for LightFM (Experimental)

Python CPU

RL-based CQL Recommender (Experimental)

PySpark

ULinUCB (Experimental)

Python CPU

Hierarchical Recommender (Experimental)

PySpark

To get more info on how to choose base model, please see this page.

Recommender interface

Recommender

class replay.models.Recommender

Usual recommender class for models without features.

fit(dataset)

Fit a recommendation model

Parameters

dataset (Dataset) – historical interactions with query/item features [user_idx, item_idx, timestamp, rating]

Return type

None

Returns

fit_predict(dataset, k, queries=None, items=None, filter_seen_items=True, recs_file_path=None)

Fit model and get recommendations

Parameters
  • dataset (Dataset) – historical interactions with query/item features [user_idx, item_idx, timestamp, rating]

  • k (int) – number of recommendations for each query

  • queries (Union[DataFrame, Iterable, None]) – queries to create recommendations for dataframe containing [user_idx] or array-like; if None, recommend to all queries from interactions

  • items (Union[DataFrame, Iterable, None]) – candidate items for recommendations dataframe containing [item_idx] or array-like; if None, take all items from interactions. If it contains new items, rating for them will be 0.

  • filter_seen_items (bool) – flag to remove seen items from recommendations based on interactions.

  • recs_file_path (Optional[str]) – save recommendations at the given absolute path as parquet file. If None, cached and materialized recommendations dataframe will be returned

Return type

Optional[DataFrame]

Returns

cached recommendation dataframe with columns [user_idx, item_idx, rating] or None if file_path is provided

get_features(ids)

Returns query or item feature vectors as a Column with type ArrayType

Parameters

ids (DataFrame) – Spark DataFrame with unique ids

Return type

Optional[Tuple[DataFrame, int]]

Returns

feature vectors. If a model does not have a vector for some ids they are not present in the final result.

predict(dataset, k, queries=None, items=None, filter_seen_items=True, recs_file_path=None)

Get recommendations

Parameters
  • dataset (Dataset) – historical interactions with query/item features [user_idx, item_idx, timestamp, rating]

  • k (int) – number of recommendations for each query

  • queries (Union[DataFrame, Iterable, None]) – queries to create recommendations for dataframe containing [user_idx] or array-like; if None, recommend to all queries from interactions

  • items (Union[DataFrame, Iterable, None]) – candidate items for recommendations dataframe containing [item_idx] or array-like; if None, take all items from interactions. If it contains new items, rating for them will be 0.

  • filter_seen_items (bool) – flag to remove seen items from recommendations based on interactions.

  • recs_file_path (Optional[str]) – save recommendations at the given absolute path as parquet file. If None, cached and materialized recommendations dataframe will be returned

Return type

Optional[DataFrame]

Returns

cached recommendation dataframe with columns [user_idx, item_idx, rating] or None if file_path is provided

predict_pairs(pairs, dataset=None, recs_file_path=None, k=None)

Get recommendations for specific query-item pairs. If a model can’t produce recommendation for specific pair it is removed from the resulting dataframe.

Parameters
  • pairs (DataFrame) – dataframe with pairs to calculate rating for, [user_idx, item_idx].

  • dataset (Optional[Dataset]) – historical interactions with query/item features [user_idx, item_idx, timestamp, rating]

  • recs_file_path (Optional[str]) – save recommendations at the given absolute path as parquet file. If None, cached and materialized recommendations dataframe will be returned

  • k (Optional[int]) – top-k items for each query from pairs.

Return type

Optional[DataFrame]

Returns

cached recommendation dataframe with columns [user_idx, item_idx, rating] or None if file_path is provided

BaseRecommender

class replay.models.base_rec.BaseRecommender

Base recommender

optimize(train_dataset, test_dataset, param_borders=None, criterion=<class 'replay.metrics.ndcg.NDCG'>, k=10, budget=10, new_study=True)

Searches the best parameters with optuna.

Parameters
  • train_dataset (Dataset) – train data

  • test_dataset (Dataset) – test data

  • param_borders (Optional[Dict[str, List[Any]]]) – a dictionary with search borders, where key is the parameter name and value is the range of possible values {param: [low, high]}. In case of categorical parameters it is all possible values: {cat_param: [cat_1, cat_2, cat_3]}.

  • criterion (Metric) – metric to use for optimization

  • k (int) – recommendation list length

  • budget (int) – number of points to try

  • new_study (bool) – keep searching with previous study or start a new study

Return type

Optional[Dict[str, Any]]

Returns

dictionary with best parameters

Distributed models

Models with both training and inference implemented in pyspark.

Wilson Recommender

Confidence interval for binomial distribution can be calculated as:

\[WilsonScore = \frac{\widehat{p}+\frac{z_{ \frac{\alpha}{2}}^{2}}{2n}\pm z_ {\frac{\alpha}{2}}\sqrt{\frac{\widehat{p}(1-\widehat{p})+\frac{z_ {\frac{\alpha}{2}}^{2}}{4n}}{n}} }{1+\frac{z_{ \frac{\alpha}{2}}^{2}}{n}}\]

Where \(\hat{p}\) – is an observed fraction of positive ratings.

\(z_{\alpha}\) 1-alpha quantile of normal distribution.

class replay.models.Wilson(alpha=0.05, add_cold_items=True, cold_weight=0.5, sample=False, seed=None)

Calculates lower confidence bound for the confidence interval of true fraction of positive ratings.

rating must be converted to binary 0-1 form.

>>> import pandas as pd
>>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType
>>> from replay.utils.spark_utils import convert2spark
>>> data_frame = pd.DataFrame({"user_id": [1, 2], "item_id": [1, 2], "rating": [1, 1]})
>>> interactions = convert2spark(data_frame)
>>> feature_schema = FeatureSchema(
...     [
...         FeatureInfo(
...             column="user_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.QUERY_ID,
...         ),
...         FeatureInfo(
...             column="item_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.ITEM_ID,
...         ),
...         FeatureInfo(
...             column="rating",
...             feature_type=FeatureType.NUMERICAL,
...             feature_hint=FeatureHint.RATING,
...         ),
...     ]
... )
>>> dataset = Dataset(feature_schema, interactions)
>>> model = Wilson()
>>> model.fit_predict(dataset, k=1).toPandas()
    user_id   item_id     rating
0         1         2   0.206549
1         2         1   0.206549

Random Recommender

class replay.models.RandomRec(distribution='uniform', alpha=0.0, seed=None, add_cold_items=True, cold_weight=0.5)

Recommend random items, either weighted by item popularity or uniform.

\[P\left(i\right)\propto N_i + \alpha\]

\(N_i\) — number of users who rated item \(i\)

\(\alpha\) — bigger \(\alpha\) values increase amount of rare items in recommendations.

Must be bigger than -1. Default value is \(\alpha = 0\).

Model without seed provides non-determenistic recommendations, model with fixed seed provides reproducible recommendataions.

As the recommendations from predict are cached, save them to disk, or create a checkpoint and unpersist them to get different recommendations after another predict call.

>>> from replay.utils.session_handler import get_spark_session, State
>>> spark = get_spark_session(1, 1)
>>> state = State(spark)
>>> import pandas as pd
>>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType
>>> from replay.utils.spark_utils import convert2spark
>>>
>>> interactions = convert2spark(pd.DataFrame({
...     "user_id": [1, 1, 2, 2, 3, 4],
...     "item_id": [1, 2, 2, 3, 3, 3]
... }))
>>> interactions.show()
+-------+-------+
|user_id|item_id|
+-------+-------+
|      1|      1|
|      1|      2|
|      2|      2|
|      2|      3|
|      3|      3|
|      4|      3|
+-------+-------+

>>> random_pop = RandomRec(distribution="popular_based", alpha=-1)
Traceback (most recent call last):
 ...
ValueError: alpha must be bigger than -1
>>> random_pop = RandomRec(distribution="abracadabra")
Traceback (most recent call last):
 ...
ValueError: distribution can be one of [popular_based, relevance, uniform]
>>> feature_schema = FeatureSchema(
...     [
...         FeatureInfo(
...             column="user_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.QUERY_ID,
...         ),
...         FeatureInfo(
...             column="item_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.ITEM_ID,
...         ),
...     ]
... )
>>> dataset = Dataset(feature_schema, interactions)
>>> random_pop = RandomRec(distribution="popular_based", alpha=1.0, seed=777)
>>> random_pop.fit(dataset)
>>> random_pop.item_popularity.show()
+-------+------------------+
|item_id|            rating|
+-------+------------------+
|      1|0.2222222222222222|
|      2|0.3333333333333333|
|      3|0.4444444444444444|
+-------+------------------+

>>> recs = random_pop.predict(dataset, 2)
>>> recs.show()
+-------+-------+-------------------+
|user_id|item_id|             rating|
+-------+-------+-------------------+
|      1|      3| 0.5403400662326382|
|      2|      1| 0.8999281214873164|
|      3|      1|  0.909252584485465|
|      3|      2|0.14871615041196307|
|      4|      2| 0.9923148665602072|
|      4|      1| 0.4814574953690183|
+-------+-------+-------------------+

>>> recs = random_pop.predict(dataset, 2, queries=[1], items=[7, 8])
>>> recs.show()
+-------+-------+-------------------+
|user_id|item_id|             rating|
+-------+-------+-------------------+
|      1|      7|0.25547770543750425|
|      1|      8|0.12755340075617772|
+-------+-------+-------------------+

>>> random_pop = RandomRec(seed=555)
>>> random_pop.fit(dataset)
>>> random_pop.item_popularity.show()
+-------+------------------+
|item_id|            rating|
+-------+------------------+
|      1|0.3333333333333333|
|      2|0.3333333333333333|
|      3|0.3333333333333333|
+-------+------------------+
__init__(distribution='uniform', alpha=0.0, seed=None, add_cold_items=True, cold_weight=0.5)
Parameters
  • distribution (str) – recommendation strategy: “uniform” - all items are sampled uniformly “popular_based” - recommend popular items more

  • alpha (float) – bigger values adjust model towards less popular items

  • seed (Optional[int]) – random seed

  • add_cold_items (bool) – flag to consider cold items in recommendations building if present in items parameter of predict method or pairs parameter of predict_pairs methods. If true, cold items are assigned relevance equals to the less relevant item relevance multiplied by cold_weight and may appear among top-K recommendations. Otherwise cold items are filtered out. Could be changed after model training by setting the add_cold_items attribute.

  • cold_weight (float) – if add_cold_items is True, cold items are added with reduced relevance. The relevance for cold items is equal to the relevance of a least relevant item multiplied by a cold_weight value. Cold_weight value should be in interval (0, 1].

UCB Recommender

class replay.models.UCB(exploration_coef=2, sample=False, seed=None)

Simple bandit model, which caclulate item rating as upper confidence bound (UCB) for the confidence interval of true fraction of positive ratings. Should be used in iterative (online) mode to achive proper recommendation quality.

rating from interactions must be converted to binary 0-1 form.

\[pred_i = ctr_i + \sqrt{\frac{c\ln{n}}{n_i}}\]

\(pred_i\) – predicted rating of item \(i\) \(c\) – exploration coeficient \(n\) – number of interactions in log \(n_i\) – number of interactions with item \(i\)

>>> import pandas as pd
>>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType
>>> from replay.utils.spark_utils import convert2spark
>>> data_frame = pd.DataFrame({"user_id": [1, 2, 3, 3], "item_id": [1, 2, 1, 2], "rating": [1, 0, 0, 0]})
>>> interactions = convert2spark(data_frame)
>>> feature_schema = FeatureSchema(
...     [
...         FeatureInfo(
...             column="user_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.QUERY_ID,
...         ),
...         FeatureInfo(
...             column="item_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.ITEM_ID,
...         ),
...         FeatureInfo(
...             column="rating",
...             feature_type=FeatureType.NUMERICAL,
...             feature_hint=FeatureHint.RATING,
...         ),
...     ]
... )
>>> dataset = Dataset(feature_schema, interactions)
>>> model = UCB()
>>> model.fit(dataset)
>>> model.predict(dataset, k=2, queries=[1,2,3,4], items=[1,2,3]
... ).toPandas().sort_values(["user_id","rating","item_id"],
... ascending=[True,False,True]).reset_index(drop=True)
    user_id   item_id     rating
0         1         3   2.665109
1         1         2   1.177410
2         2         3   2.665109
3         2         1   1.677410
4         3         3   2.665109
5         4         3   2.665109
6         4         1   1.677410
__init__(exploration_coef=2, sample=False, seed=None)
Parameters
  • exploration_coef (float) – exploration coefficient

  • sample (bool) – flag to choose recommendation strategy. If True, items are sampled with a probability proportional to the calculated predicted rating. Could be changed after model training by setting the sample attribute.

  • seed (Optional[int]) – random seed. Provides reproducibility if fixed

KL-UCB Recommender

class replay.models.KLUCB(exploration_coef=0.0, sample=False, seed=None)

Bernoulli bandit model. Same to UCB computes item relevance as an upper confidence bound of true fraction of positive interactions.

In a nutshell, KL-UCB considers the data as the history of interactions with items. The interaction may be either positive or negative. For each item the model computes empirical frequency of positive interactions and estimates the true frequency with an upper confidence bound. The higher the bound for an item is the more relevant it is presumed.

The upper bound below is what differs from the classical UCB. It is computed according to the original article where is proven to produce assymptotically better results.

\[u_i = \max q \in [0,1] : n_i \cdot \operatorname{KL}\left(\frac{p_i}{n_i}, q \right) \leqslant \log(n) + c \log(\log(n)),\]

where

\(u_i\) – upper bound for item \(i\),

\(c\) – exploration coeficient,

\(n\) – number of interactions in log,

\(n_i\) – number of interactions with item \(i\),

\(p_i\) – number of positive interactions with item \(i\),

and

\[\operatorname{KL}(p, q) = p \log\frac{p}{q} + (1-p)\log\frac{1-p}{1-q}\]

is the KL-divergence of Bernoulli distribution with parameter \(p\) from Bernoulli distribution with parameter \(q\).

Being a bit trickier though, the bound shares with UCB the same exploration-exploitation tradeoff dilemma. You may increase the c coefficient in order to shift the tradeoff towards exploration or decrease it to set the model to be more sceptical of items with small volume of collected statistics. The authors of the article though claim c = 0 to be of the best choice in practice.

As any other RePlay model, KL-UCB takes a log to fit on as a DataFrame with columns [user_idx, item_idx, timestamp, relevance]. Following the procedure above, KL-UCB would see each row as a record of an interaction with item_idx with positive (relevance = 1) or negative (relevance = 0) outcome. user_idx and timestamp are ignored i.e. the model treats log as non-personalized - item scores are same for all users.

If relevance column is not of 0/1 initially, then you have to decide what kind of relevance has to be considered as positive and convert relevance to binary format during preprocessing.

To provide a prediction, KL-UCB would sample a set of recommended items for each user with probabilites proportional to obtained relevances.

>>> import pandas as pd
>>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType
>>> from replay.utils.spark_utils import convert2spark
>>> data_frame = pd.DataFrame({"user_id": [1, 2, 3, 3], "item_id": [1, 2, 1, 2], "rating": [1, 0, 0, 0]})
>>> interactions = convert2spark(data_frame)
>>> feature_schema = FeatureSchema(
...     [
...         FeatureInfo(
...             column="user_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.QUERY_ID,
...         ),
...         FeatureInfo(
...             column="item_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.ITEM_ID,
...         ),
...         FeatureInfo(
...             column="rating",
...             feature_type=FeatureType.NUMERICAL,
...             feature_hint=FeatureHint.RATING,
...         ),
...     ]
... )
>>> dataset = Dataset(feature_schema, interactions)
>>> model = KLUCB()
>>> model.fit(dataset)
>>> model.predict(dataset, k=2, queries=[1,2,3,4], items=[1,2,3]
... ).toPandas().sort_values(["user_id","rating","item_id"],
... ascending=[True,False,True]).reset_index(drop=True)
    user_id   item_id     rating
0         1         3   1.000000
1         1         2   0.750000
2         2         3   1.000000
3         2         1   0.933013
4         3         3   1.000000
5         4         3   1.000000
6         4         1   0.933013
__init__(exploration_coef=0.0, sample=False, seed=None)
Parameters
  • exploration_coef (float) – exploration coefficient

  • sample (bool) – flag to choose recommendation strategy. If True, items are sampled with a probability proportional to the calculated predicted relevance. Could be changed after model training by setting the sample attribute.

  • seed (Optional[int]) – random seed. Provides reproducibility if fixed

LinUCB Recommender

class replay.models.LinUCB(eps, alpha=1.0, is_hybrid=False)

A recommender algorithm for contextual bandit problems.

Implicitly proposed by Li et al. The model assumes a linear relationship between user context, item features and action rewards, making it efficient for high-dimensional contexts.

Note:

It’s recommended to scale features to a similar range (e.g., using StandardScaler or MinMaxScaler) to ensure proper convergence and prevent numerical instability (since relationships to learn are linear).

>>> import pandas as pd
>>> from replay.data.dataset import (
...     Dataset, FeatureHint, FeatureInfo, FeatureSchema, FeatureSource, FeatureType
... )
>>> interactions = pd.DataFrame({"user_id": [0, 1, 2, 2], "item_id": [0, 1, 0, 1], "rating": [1, 0, 0, 0]})
>>> user_features = pd.DataFrame(
...     {"user_id": [0, 1, 2], "usr_feat_1": [1, 2, 3], "usr_feat_2": [4, 5, 6], "usr_feat_3": [7, 8, 9]}
... )
>>> item_features = pd.DataFrame(
...     {
...         "item_id": [0, 1, 2, 3, 4, 5],
...         "itm_feat_1": [1, 2, 3, 4, 5, 6],
...         "itm_feat_2": [7, 8, 9, 10, 11, 12],
...         "itm_feat_3": [13, 14, 15, 16, 17, 18]
...     }
... )
>>> feature_schema = FeatureSchema(
...    [
...        FeatureInfo(
...            column="user_id",
...            feature_type=FeatureType.CATEGORICAL,
...            feature_hint=FeatureHint.QUERY_ID,
...        ),
...        FeatureInfo(
...            column="item_id",
...            feature_type=FeatureType.CATEGORICAL,
...            feature_hint=FeatureHint.ITEM_ID,
...        ),
...        FeatureInfo(
...            column="rating",
...            feature_type=FeatureType.NUMERICAL,
...            feature_hint=FeatureHint.RATING,
...        ),
...        *[
...            FeatureInfo(
...                column=name, feature_type=FeatureType.NUMERICAL, feature_source=FeatureSource.ITEM_FEATURES,
...            )
...            for name in ["itm_feat_1", "itm_feat_2", "itm_feat_3"]
...        ],
...        *[
...            FeatureInfo(
...                column=name, feature_type=FeatureType.NUMERICAL, feature_source=FeatureSource.QUERY_FEATURES
...            )
...            for name in ["usr_feat_1", "usr_feat_2", "usr_feat_3"]
...        ],
...    ]
... )
>>> dataset = Dataset(
...     feature_schema=feature_schema,
...     interactions=interactions,
...     item_features=item_features,
...     query_features=user_features,
...     categorical_encoded=True,
... )
>>> dataset.to_spark()
>>> model = LinUCB(eps=-10.0, alpha=1.0, is_hybrid=False)
>>> model.fit(dataset)
>>> model.predict(dataset, k=2, queries=[0,1,2]).toPandas().sort_values(["user_id","rating","item_id"],
... ascending=[True,False,True]).reset_index(drop=True)
    user_id   item_id     rating
0         0         1   -11.073741
1         0         2   -81.240384
2         1         0   -6.555529
3         1         2   -96.436508
4         2         2   -112.249722
5         2         3   -112.249722
__init__(eps, alpha=1.0, is_hybrid=False)
Parameters
  • eps (float) – exploration coefficient

  • alpha (float) – ridge parameter

  • is_hybrid (bool) – flag to choose model type. If True, model is hybrid.

Thompson Sampling

class replay.models.ThompsonSampling(sample=False, seed=None)

Thompson Sampling recommender.

Bandit model with efficient exploration-exploitation balance. The reward probability of each of the K arms is modeled by a Beta distribution which is updated after an arm is selected. The initial prior distribution is Beta(1,1).

__init__(sample=False, seed=None)

K Nearest Neighbours

class replay.models.ItemKNN(num_neighbours=10, use_rating=False, shrink=0.0, weighting=None, index_builder=None)

Item-based ItemKNN with modified cosine similarity measure.

__init__(num_neighbours=10, use_rating=False, shrink=0.0, weighting=None, index_builder=None)
Parameters
  • num_neighbours (int) – number of neighbours

  • use_rating (bool) – flag to use rating values as is or to treat them as 1

  • shrink (float) – term added to the denominator when calculating similarity

  • weighting (Optional[str]) – item reweighting type, one of [None, ‘tf_idf’, ‘bm25’]

  • index_builder (Optional[IndexBuilder]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.

Alternating Least Squares

class replay.models.ALSWrap(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_query_blocks=None)

Wrapper for Spark ALS.

__init__(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_query_blocks=None)
Parameters
  • rank (int) – hidden dimension for the approximate matrix

  • implicit_prefs (bool) – flag to use implicit feedback

  • seed (Optional[int]) – random seed

  • num_item_blocks (Optional[int]) – number of blocks the items will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of interactions.

  • num_query_blocks (Optional[int]) – number of blocks the queries will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of interactions.

Alternating Least Squares on Scala (Experimental)

class replay.experimental.models.ScalaALSWrap(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_user_blocks=None, index_builder=None)

Wrapper for Spark ALS.

__init__(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_user_blocks=None, index_builder=None)
Parameters
  • rank (int) – hidden dimension for the approximate matrix

  • implicit_prefs (bool) – flag to use implicit feedback

  • seed (Optional[int]) – random seed

  • num_item_blocks (Optional[int]) – number of blocks the items will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of log.

  • num_user_blocks (Optional[int]) – number of blocks the users will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of log.

SLIM

SLIM Recommender calculates similarity between objects to produce recommendations \(W\).

Loss function is:

\[L = \frac 12||A - A W||^2_F + \frac \beta 2 ||W||_F^2+ \lambda ||W||_1\]

\(W\) – item similarity matrix

\(A\) – interaction matrix

Finding \(W\) can be splitted into solving separate linear regressions with ElasticNet regularization. Thus each row in \(W\) is optimized with

\[l = \frac 12||a_j - A w_j||^2_2 + \frac \beta 2 ||w_j||_2^2+ \lambda ||w_j||_1\]

To remove trivial solution, we add an extra requirements \(w_{jj}=0\), and \(w_{ij}\ge 0\)

class replay.models.SLIM(beta=0.01, lambda_=0.01, seed=None, index_builder=None, allow_collect_to_master=False)

SLIM: Sparse Linear Methods for Top-N Recommender Systems

__init__(beta=0.01, lambda_=0.01, seed=None, index_builder=None, allow_collect_to_master=False)
Parameters
  • beta (float) – l2 regularization

  • lambda – l1 regularization

  • seed (Optional[int]) – random seed

  • index_builder (Optional[IndexBuilder]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.

  • allow_collect_to_master (bool) – Flag allowing spark to make a collection to the master node, Default: False.

Word2Vec Recommender

class replay.models.Word2VecRec(rank=100, min_count=5, step_size=0.025, max_iter=1, window_size=1, use_idf=False, seed=None, num_partitions=None, index_builder=None)

Trains word2vec model where items are treated as words and queries as sentences.

__init__(rank=100, min_count=5, step_size=0.025, max_iter=1, window_size=1, use_idf=False, seed=None, num_partitions=None, index_builder=None)
Parameters
  • rank (int) – embedding size

  • min_count (int) – the minimum number of times a token must appear to be included in the word2vec model’s vocabulary

  • step_size (int) – step size to be used for each iteration of optimization

  • max_iter (int) – max number of iterations

  • window_size (int) – window size

  • use_idf (bool) – flag to use inverse document frequency

  • seed (Optional[int]) – random seed

  • index_builder (Optional[IndexBuilder]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.

Association Rules Item-to-Item Recommender

class replay.models.AssociationRulesItemRec(session_column, min_item_count=5, min_pair_count=5, num_neighbours=1000, use_rating=False, similarity_metric='confidence', index_builder=None)

Item-to-item recommender based on association rules. Calculate pairs confidence, lift and confidence_gain defined as confidence(a, b)/confidence(!a, b) to get top-k associated items. Predict items for queries using lift, confidence or confidence_gain metrics.

Forecasts will be based on indicators: lift, confidence or confidence_gain. It all depends on your choice.

During class initialization, you can explicitly specify the metric to be used as a similarity for calculating the predict.

You can change your selection before calling .predict() or .predict_pairs() if you set a new value for the similarity_metric parameter.

Usage of ANN functionality requires only sparse indices and only one similarity_metric, defined in __init__ will be available during inference.

>>> import pandas as pd
>>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType
>>> from replay.utils.spark_utils import convert2spark
>>> data_frame = pd.DataFrame({"user_id": [1, 1, 2, 3], "item_id": [1, 2, 2, 3], "rating": [2, 1, 4, 1]})
>>> data_frame_for_predict = pd.DataFrame({"user_id": [2], "item_id": [1]})
>>> data_frame
   user_id  item_id  rating
0         1         1          2
1         1         2          1
2         2         2          4
3         3         3          1
>>> interactions = convert2spark(data_frame)
>>> pred_interactions = convert2spark(data_frame_for_predict)
>>> feature_schema = FeatureSchema(
...     [
...         FeatureInfo(
...             column="user_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.QUERY_ID,
...         ),
...         FeatureInfo(
...             column="item_id",
...             feature_type=FeatureType.CATEGORICAL,
...             feature_hint=FeatureHint.ITEM_ID,
...         ),
...         FeatureInfo(
...             column="rating",
...             feature_type=FeatureType.NUMERICAL,
...             feature_hint=FeatureHint.RATING,
...         ),
...     ]
... )
>>> train_dataset = Dataset(feature_schema, interactions)
>>> pred_dataset = Dataset(feature_schema.subset(["user_id", "item_id"]), pred_interactions)
>>> model = AssociationRulesItemRec(min_item_count=1, min_pair_count=0, session_column="user_id")
>>> res = model.fit(train_dataset)
>>> model.similarity.orderBy("item_idx_one").show()
+------------+------------+----------+----------+----+---------------+
|item_idx_one|item_idx_two|similarity|confidence|lift|confidence_gain|
+------------+------------+----------+----------+----+---------------+
|           1|           2|       1.0|       1.0| 1.5|            2.0|
|           2|           1|       0.5|       0.5| 1.5|       Infinity|
+------------+------------+----------+----------+----+---------------+
>>> model.similarity_metric = "confidence"
>>> model.predict_pairs(pred_interactions, train_dataset).show()
+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|      2|      1|   0.5|
+-------+-------+------+
>>> model.similarity_metric = "lift"
>>> model.predict_pairs(pred_interactions, train_dataset).show()
+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|      2|      1|   1.5|
+-------+-------+------+

Classical model uses items co-occurrence in sessions for confidence, lift and confidence_gain calculation but rating could also be passed to the model, e.g. if you want to apply time smoothing and treat old sessions as less important. In this case all items in sessions should have the same rating.

__init__(session_column, min_item_count=5, min_pair_count=5, num_neighbours=1000, use_rating=False, similarity_metric='confidence', index_builder=None)
Parameters
  • session_column (str) – name of column to group sessions. Items are combined by the user_id column if session_column is not defined.

  • min_item_count (int) – items with fewer sessions will be filtered out

  • min_pair_count (int) – pairs with fewer sessions will be filtered out

  • num_neighbours (Optional[int]) – maximal number of neighbours to save for each item

  • use_rating (bool) – flag to use rating values instead of co-occurrence count If true, pair rating in session is minimal rating of item in pair. Item rating is sum of rating in all sessions.

  • similarity_metric (str) – lift of ‘confidence’ The metric used as a similarity to calculate the prediction, one of [lift, confidence, confidence_gain]

  • index_builder (Optional[IndexBuilder]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.

get_nearest_items(items, k, metric='lift', candidates=None)

Get k most similar items be the metric for each of the items.

Parameters
  • items (Union[DataFrame, Iterable]) – spark dataframe or list of item ids to find neighbors

  • k (int) – number of neighbors

  • metric (str) – lift of ‘confidence_gain’

  • candidates (Union[DataFrame, Iterable, None]) – spark dataframe or list of items to consider as similar, e.g. popular/new items. If None, all items presented during model training are used.

Return type

DataFrame

Returns

dataframe with the most similar items an distance, where bigger value means greater similarity. spark-dataframe with columns [item_id, neighbour_item_id, similarity]

Cluster Recommender

class replay.models.ClusterRec(num_clusters=10)

Generate recommendations for cold queries using k-means clusters

__init__(num_clusters=10)
Parameters

num_clusters (int) – number of clusters

Neural models with distributed inference

Models implemented in pytorch with distributed inference in pyspark.

Neural Matrix Factorization (Experimental)

class replay.experimental.models.NeuroMF(learning_rate=0.05, epochs=20, embedding_gmf_dim=None, embedding_mlp_dim=None, hidden_mlp_dims=None, l2_reg=0, count_negative_sample=1, factor=0.2, patience=3)

Neural Matrix Factorization model (NeuMF, NCF).

In this implementation MLP and GMF modules are optional.

__init__(learning_rate=0.05, epochs=20, embedding_gmf_dim=None, embedding_mlp_dim=None, hidden_mlp_dims=None, l2_reg=0, count_negative_sample=1, factor=0.2, patience=3)

MLP or GMF model can be ignored if its embedding size (embedding_mlp_dim or embedding_gmf_dim) is set to None. Default variant is MLP + GMF with embedding size 128.

Parameters
  • learning_rate (float) – learning rate

  • epochs (int) – number of epochs to train model

  • embedding_gmf_dim (Optional[int]) – embedding size for gmf

  • embedding_mlp_dim (Optional[int]) – embedding size for mlp

  • hidden_mlp_dims (Optional[List[int]]) – list of hidden dimension sized for mlp

  • l2_reg (float) – l2 regularization term

  • count_negative_sample (int) – number of negative samples to use

  • factor (float) – ReduceLROnPlateau reducing factor. new_lr = lr * factor

  • patience (int) – number of non-improved epochs before reducing lr

Mult-VAE (Experimental)

Variation AutoEncoder

../../_images/vae-gaussian.png

Problem formulation

We have a sample of independent equally distributed random values from true distribution \(x_i \sim p_d(x)\), \(i = 1, \dots, N\).

Build a probability model \(p_\theta(x)\) for true distribution \(p_d(x)\).

Distribution \(p_\theta(x)\) allows both to estimate probability density for a given item \(x\), and to sample \(x \sim p_\theta(x)\).

Probability model

\(z \in \mathbb{R}^d\) - is a local latent variable, one for each item \(x\).

Generative process for variational autoencoder:

  1. Sample \(z \sim p(z)\).

  2. Sample \(x \sim p_\theta(x | z)\).

Distribution parameters \(p_\theta(x | z)\) are defined with neural net weights \(\theta\), with input \(z\).

Item probability density \(x\):

\[p_\theta(x) = \mathbb{E}_{z \sim p(z)} p_\theta(x | z)\]

Use lower estimate bound for the log likelihood.

\[\begin{split}\log p_\theta(x) = \mathbb{E}_{z \sim q_\phi(z | x)} \log p_\theta( x) = \mathbb{E}_{z \sim q_\phi(z | x)} \log \frac{p_\theta(x, z) q_\phi(z | x)} {q_\phi(z | x) p_\theta(z | x)} = \\ = \mathbb{E}_{z \sim q_\phi(z | x)} \log \frac{p_\theta(x, z)}{q_\phi(z | x)} + KL( q_\phi(z | x) || p_\theta(z | x))\end{split}\]
\[\begin{split}\log p_\theta(x) \geqslant \mathbb{E}_{z \sim q_\phi(z | x)} \log \frac{p_\theta(x | z)p(z)}{q_\phi(z | x)} = \mathbb{E}_{z \sim q_\phi(z | x)} \log p_\theta(x | z) - KL(q_\phi(z | x) || p(z)) = \\ = L(x; \phi, \theta) \to \max\limits_{\phi, \theta}\end{split}\]

\(q_\phi(z | x)\) is a proposal or a recognition distribution. It is a gaussian with weights \(\phi\): \(q_\phi(z | x) = \mathcal{N}(z | \mu_\phi(x), \sigma^2_\phi(x)I)\).

Difference between lower estimate bound \(L(x; \phi, \theta)\) and log likelihood \(\log p_\theta(x)\) - is a KL-divergence between a proposal and aposteriory distribution on \(z\): \(KL(q_\phi(z | x) || p_\theta(z | x))\). Maximum value \(L(x; \phi, \theta)\) for fixed model parameters \(\theta\) is reached with \(q_\phi(z | x) = p_\theta(z | x)\), but explicit calculation of \(p_\theta(z | x)\) is not efficient to calculate, so it is also optimized by \(\phi\). The closer \(q_\phi(z | x)\) to \(p_\theta(z | x)\), the better the estimate.

We usually take normal distribution for \(p(z)\):

\[\varepsilon \sim \mathcal{N}(\varepsilon | 0, I)\]
\[z = \mu + \sigma \varepsilon \Rightarrow z \sim \mathcal{N}(z | \mu, \sigma^2I)\]
\[\frac{\partial}{\partial \phi} L(x; \phi, \theta) = \mathbb{E}_{ \varepsilon \sim \mathcal{N}(\varepsilon | 0, I)} \frac{\partial} {\partial \phi} \log p_\theta(x | \mu_\phi(x) + \sigma_\phi(x) \varepsilon) - \frac{\partial}{\partial \phi} KL(q_\phi(z | x) || p(z))\]
\[\frac{\partial}{\partial \theta} L(x; \phi, \theta) = \mathbb{E}_{z \sim q_\phi(z | x)} \frac{\partial}{\partial \theta} \log p_\theta(x | z)\]

In this case

\[KL(q_\phi(z | x) || p(z)) = -\frac{1}{2}\sum_{i=1}^{dimZ}(1+ log(\sigma_i^2) - \mu_i^2-\sigma_i^2)\]

KL-divergence coefficient can also not be equal to one, in this case:

\[L(x; \phi, \theta) = \mathbb{E}_{z \sim q_\phi(z | x)} \log p_\theta(x | z) - \beta \cdot KL(q_\phi(z | x) || p(z)) \to \max\limits_{\phi, \theta}\]

With \(\beta = 0\) VAE is the same as the Denoising AutoEncoder.

class replay.experimental.models.MultVAE(learning_rate=0.01, epochs=100, latent_dim=200, hidden_dim=600, dropout=0.3, anneal=0.1, l2_reg=0, factor=0.2, patience=3)

Variational Autoencoders for Collaborative Filtering

__init__(learning_rate=0.01, epochs=100, latent_dim=200, hidden_dim=600, dropout=0.3, anneal=0.1, l2_reg=0, factor=0.2, patience=3)
Parameters
  • learning_rate (float) – learning rate

  • epochs (int) – number of epochs to train model

  • latent_dim (int) – latent dimension size for user vectors

  • hidden_dim (int) – hidden dimension size for encoder and decoder

  • dropout (float) – dropout coefficient

  • anneal (float) – anneal coefficient [0,1]

  • l2_reg (float) – l2 regularization term

  • factor (float) – ReduceLROnPlateau reducing factor. new_lr = lr * factor

  • patience (int) – number of non-improved epochs before reducing lr

DDPG (Experimental)

class replay.experimental.models.DDPG(noise_sigma=0.2, noise_theta=0.05, noise_type='gauss', seed=9, user_num=10, item_num=10, log_dir='logs/tmp', exact_embeddings_size=True, n_critics_head=10, env_gamma_alpha=0.2, critic_heads_q=0.15, n_jobs=None, use_gpu=False, user_batch_size=8, min_trajectory_len=10)

Deep Deterministic Policy Gradient

This implementation enhanced by more advanced noise strategy.

__init__(noise_sigma=0.2, noise_theta=0.05, noise_type='gauss', seed=9, user_num=10, item_num=10, log_dir='logs/tmp', exact_embeddings_size=True, n_critics_head=10, env_gamma_alpha=0.2, critic_heads_q=0.15, n_jobs=None, use_gpu=False, user_batch_size=8, min_trajectory_len=10)
Parameters
  • noise_sigma (float) – Ornstein-Uhlenbeck noise sigma value

  • noise_theta (float) – Ornstein-Uhlenbeck noise theta value

  • noise_type (str) – type of action noise, one of [“ou”, “gauss”]

  • seed (int) – random seed

  • user_num (int) – number of users, specify when using exact_embeddings_size

  • item_num (int) – number of items, specify when using exact_embeddings_size

  • log_dir (str) – dir to save models

Exact_embeddings_size

flag whether to set user/item_num from training log

DT4Rec (Experimental)

class replay.experimental.models.dt4rec.dt4rec.DT4Rec(item_num, user_num, seed=123, trajectory_len=30, epochs=1, batch_size=64, use_cuda=True)

Decision Transformer for Recommendations

General Idea:

Decision Transformer: Reinforcement Learning via Sequence Modeling.

Ideas for improvements:

User Retention-oriented Recommendation with Decision Transformer.

Also, some sources are listed in their respective classes

__init__(item_num, user_num, seed=123, trajectory_len=30, epochs=1, batch_size=64, use_cuda=True)

NeuralTS (Experimental)

class replay.experimental.models.NeuralTS(user_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, item_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, embedding_sizes=[32, 32, 64], hidden_layers=[32, 20], wide_out_dim=1, deep_out_dim=20, head_dropout=0.8, deep_dropout=0.4, dim_head=20, n_epochs=2, opt_lr=0.0003, lr_min=1e-05, use_gpu=False, use_warp_loss=True, cnt_neg_samples=100, cnt_samples_for_predict=10, exploration_coef=1.0, cnt_users=None, cnt_items=None, plot_dir=None)

‘Neural Thompson sampling recommender <https://dl.acm.org/doi/pdf/10.1145/3383313.3412214>`_ based on Wide&Deep model.

Parameters
  • user_cols (Dict[str, List[str]]) – user_cols = {‘continuous_cols’:List[str], ‘cat_embed_cols’:List[str], ‘wide_cols’: List[str]}, where List[str] – some column names from user_features dataframe, which is input to the fit method, or empty List

  • item_cols (Dict[str, List[str]]) – item_cols = {‘continuous_cols’:List[str], ‘cat_embed_cols’:List[str], ‘wide_cols’: List[str]}, where List[str] – some column names from item_features dataframe, which is input to the fit method, or empty List

  • embedding_sizes (List[int]) – list of length three in which embedding_sizes[0] = embedding size for users, embedding_sizes[1] = embedding size for items, embedding_sizes[2] = embedding size for pair (users, items)

  • hidden_layers (List[int]) – list of hidden layer sizes for Deep model

  • wide_out_dim (int) – output size for the Wide model

  • deep_out_dim (int) – output size for the Deep model

  • head_dropout (float) – probability of an element to be zeroed for WideDeep model head

  • deep_dropout (float) – probability of an element to be zeroed for Deep model

  • dim_head (int) – output size for WideDeep model head

  • n_epochs (int) – number of epochs for model training

  • opt_lr (float) – learning rate for the AdamW optimizer

  • lr_min (float) – minimum learning rate value for the CosineAnnealingLR learning rate scheduler

  • use_gpu (bool) – if true, the model will be trained on the GPU

  • use_warp_loss (bool) – if true, then warp loss will be used otherwise weighted logistic loss.

  • cnt_neg_samples (int) – number of additional negative examples for each user

  • cnt_samples_for_predict (int) – number of sampled predictions for one user, which are used to estimate the mean and variance of relevance

  • exploration_coef (float) – exploration coefficient

  • plot_dir (Optional[str]) – file name where the training graphs will be saved, if None, the graphs will not be saved

  • cnt_users (Optional[int]) – number of users, used in Wide&Deep model initialization

  • cnt_items (Optional[int]) – number of items, used in Wide&Deep model initialization

__init__(user_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, item_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, embedding_sizes=[32, 32, 64], hidden_layers=[32, 20], wide_out_dim=1, deep_out_dim=20, head_dropout=0.8, deep_dropout=0.4, dim_head=20, n_epochs=2, opt_lr=0.0003, lr_min=1e-05, use_gpu=False, use_warp_loss=True, cnt_neg_samples=100, cnt_samples_for_predict=10, exploration_coef=1.0, cnt_users=None, cnt_items=None, plot_dir=None)

ULinUCB Recommender (Experimental)

class replay.experimental.models.ULinUCB(alpha=-2.0)

A recommender implicitly proposed by Song et al. Is used as the default node recommender in HierarchicalRecommender. Shares all the logic with classical item-disjoint LinUCB but is user-disjoint instead. May be useful in problems with fixed number of users and item-oriented data.

__init__(alpha=-2.0)
Parameters

alpha (float) – exploration coefficient

CQL Recommender (Experimental)

Conservative Q-Learning (CQL) algorithm is a SAC-based data-driven deep reinforcement learning algorithm, which achieves state-of-the-art performance in offline RL problems.

../../_images/cql_comparison.png
class replay.experimental.models.cql.CQL(mdp_dataset_builder, actor_learning_rate=0.0001, critic_learning_rate=0.0003, temp_learning_rate=0.0001, alpha_learning_rate=0.0001, actor_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), critic_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), temp_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), alpha_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), actor_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), critic_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), q_func_factory=MeanQFunctionFactory(share_encoder=False), batch_size=64, n_steps=1, gamma=0.99, tau=0.005, n_critics=2, initial_temperature=1.0, initial_alpha=1.0, alpha_threshold=10.0, conservative_weight=5.0, n_action_samples=10, soft_q_backup=False, use_gpu=False, observation_scaler=None, action_scaler=None, reward_scaler=None, **params)

Conservative Q-Learning algorithm.

CQL is a SAC-based data-driven deep reinforcement learning algorithm, which achieves state-of-the-art performance in offline RL problems.

CQL mitigates overestimation error by minimizing action-values under the current policy and maximizing values under data distribution for underestimation issue.

\[L(\theta_i) = \alpha\, \mathbb{E}_{s_t \sim D} \left[\log{\sum_a \exp{Q_{\theta_i}(s_t, a)}} - \mathbb{E}_{a \sim D} \big[Q_{\theta_i}(s_t, a)\big] - \tau\right] + L_\mathrm{SAC}(\theta_i)\]

where \(lpha\) is an automatically adjustable value via Lagrangian dual gradient descent and :math:` au` is a threshold value. If the action-value difference is smaller than :math:` au`, the \(lpha\) will become smaller. Otherwise, the \(lpha\) will become larger to aggressively penalize action-values.

In continuous control, \(\log{\sum_a \exp{Q(s, a)}}\) is computed as follows.

\[\log{\sum_a \exp{Q(s, a)}} \approx \log{\left( \frac{1}{2N} \sum_{a_i \sim \text{Unif}(a)}^N \left[\frac{\exp{Q(s, a_i)}}{\text{Unif}(a)}\right] + \frac{1}{2N} \sum_{a_i \sim \pi_\phi(a|s)}^N \left[\frac{\exp{Q(s, a_i)}}{\pi_\phi(a_i|s)}\right]\right)}\]

where \(N\) is the number of sampled actions.

An implementation of this algorithm is heavily based on the corresponding implementation in the d3rlpy library (see https://github.com/takuseno/d3rlpy/blob/master/d3rlpy/algos/cql.py)

The rest of optimization is exactly same as d3rlpy.algos.SAC.

References:
Args:

mdp_dataset_builder (MdpDatasetBuilder): the MDP dataset builder from users’ log. actor_learning_rate (float): learning rate for policy function. critic_learning_rate (float): learning rate for Q functions. temp_learning_rate (float): learning rate for temperature parameter of SAC. alpha_learning_rate (float): learning rate for \(lpha\). actor_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the actor. The available options are [SGD, Adam or RMSprop]. critic_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the critic. The available options are [SGD, Adam or RMSprop]. temp_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the temperature. The available options are [SGD, Adam or RMSprop]. alpha_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for \(lpha\). The available options are [SGD, Adam or RMSprop]. actor_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the actor. The available options are [‘pixel’, ‘dense’, ‘vector’, ‘default’]. See d3rlpy.models.encoders.EncoderFactory for details. critic_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the critic. The available options are [‘pixel’, ‘dense’, ‘vector’, ‘default’]. See d3rlpy.models.encoders.EncoderFactory for details. q_func_factory (d3rlpy.models.q_functions.QFunctionFactory or str): Q function factory. The available options are [‘mean’, ‘qr’, ‘iqn’, ‘fqf’]. See d3rlpy.models.q_functions.QFunctionFactory for details. batch_size (int): mini-batch size. n_steps (int): Number of training steps. gamma (float): discount factor. tau (float): target network synchronization coefficient. n_critics (int): the number of Q functions for ensemble. initial_temperature (float): initial temperature value. initial_alpha (float): initial \(lpha\) value. alpha_threshold (float): threshold value described as :math:` au`. conservative_weight (float): constant weight to scale conservative loss. n_action_samples (int): the number of sampled actions to compute \(\log{\sum_a \exp{Q(s, a)}}\). soft_q_backup (bool): flag to use SAC-style backup. use_gpu (Union[int, str, bool]): device option. If the value is boolean and True, cuda:0 will be used. If the value is integer, cuda:<device> will be used. If the value is string in torch device style, the specified device will be used. observation_scaler (d3rlpy.preprocessing.Scaler or str): preprocessor. The available options are [‘pixel’, ‘min_max’, ‘standard’]. action_scaler (d3rlpy.preprocessing.ActionScaler or str): action preprocessor. The available options are [‘min_max’]. reward_scaler (d3rlpy.preprocessing.RewardScaler or str): reward preprocessor. The available options are [‘clip’, ‘min_max’, ‘standard’]. impl (d3rlpy.algos.torch.cql_impl.CQLImpl): algorithm implementation.

__init__(mdp_dataset_builder, actor_learning_rate=0.0001, critic_learning_rate=0.0003, temp_learning_rate=0.0001, alpha_learning_rate=0.0001, actor_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), critic_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), temp_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), alpha_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), actor_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), critic_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), q_func_factory=MeanQFunctionFactory(share_encoder=False), batch_size=64, n_steps=1, gamma=0.99, tau=0.005, n_critics=2, initial_temperature=1.0, initial_alpha=1.0, alpha_threshold=10.0, conservative_weight=5.0, n_action_samples=10, soft_q_backup=False, use_gpu=False, observation_scaler=None, action_scaler=None, reward_scaler=None, **params)

Hierarchical models

Hierarchical Recommender (Experimental)

class replay.experimental.models.HierarchicalRecommender(depth, cluster_model, recommender_class=<class 'replay.experimental.models.u_lin_ucb.ULinUCB'>, recommender_params={})

Hierarchical Recommender class is inspired by the article of Song et al and is a generalization of the method. By default it works as HCB proposed there.

The model sequentially clusterizes the item space constructing a tree of given depth. The clusterization is performed according to the cluster_model - any sklearn clusterer instance provided by the user.

At each node of a tree a node recommender instance is mounted. All of them are produced by recommender_class object (not an instance!) and are initialized with recommender_params.

To predict an item the model goes down the tree each time selecting the next node as the one predicted by the parent node recommender. A leaf node recommender would give an item itself.

The log is considered as the history of user-item interactions. To fit the model each interaction is counted in all node recommenders on the path from the root to the item as if such path would be traversed through the prediction process.

Hierarchical Recommender may be useful to enhance the perforamance of simple models not suitable for large item space problems (such as many contextual bandits) and to reduce prediction time in models that need to iterate through all of the items to make a recommendation.

In this version Hierarchical Recommender is implemented as HybridRecommender and apart from log requires both item_features and user_features in fit() method. By the same reason only HybridRecommender classess may be passed as a recommender_class. Need in features at predict() depends on the recommender_class itself.

Note that current implementation relies mostly on python rather than pyspark.

__init__(depth, cluster_model, recommender_class=<class 'replay.experimental.models.u_lin_ucb.ULinUCB'>, recommender_params={})
Parameters
  • depth – depth of the item tree

  • cluster_model – an sklearn.cluster object (or any with similar API) that would perform clustering on the item space

  • recommender_class – a RePlay hybrid recommender class object (not an instance!) instances of which would be mounted at each tree node

  • recommender_params – initialization parameters for the recommenders

Wrappers and other models with distributed inference

Wrappers for popular recommendation libraries and algorithms implemented in python with distributed inference in pyspark.

ADMM SLIM (Experimental)

class replay.experimental.models.ADMMSLIM(lambda_1=5, lambda_2=5000, seed=None, index_builder=None)

ADMM SLIM: Sparse Recommendations for Many Users

This is a modification for the basic SLIM model. Recommendations are improved with Alternating Direction Method of Multipliers.

__init__(lambda_1=5, lambda_2=5000, seed=None, index_builder=None)
Parameters
  • lambda_1 (float) – l1 regularization term

  • lambda_2 (float) – l2 regularization term

  • seed (Optional[int]) – random seed

  • index_builder (Optional[IndexBuilder]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.

LightFM (Experimental)

class replay.experimental.models.LightFMWrap(no_components=128, loss='warp', random_state=None)

Wrapper for LightFM.

__init__(no_components=128, loss='warp', random_state=None)

implicit (Experimental)

class replay.experimental.models.ImplicitWrap(model)

Wrapper for implicit

Example:

>>> import implicit
>>> model = implicit.als.AlternatingLeastSquares(factors=5)
>>> als = ImplicitWrap(model)

This way you can use implicit models as any other in replay with conversions made under the hood.

>>> import pandas as pd
>>> from replay.utils.spark_utils import convert2spark
>>> df = pd.DataFrame({"user_idx": [1, 1, 2, 2], "item_idx": [1, 2, 2, 3], "relevance": [1, 1, 1, 1]})
>>> df = convert2spark(df)
>>> als.fit_predict(df, 1, users=[1])[["user_idx", "item_idx"]].toPandas()
   user_idx  item_idx
0         1         3
__init__(model)

Provide initialized implicit model.

Neural Networks recommenders

Neural Networks recommenders are Lightning-compatible. They can be trained using a Trainer from Lightning module.

Bert4Rec

class replay.models.nn.Bert4Rec(tensor_schema, block_count=2, head_count=4, hidden_size=256, max_seq_len=100, dropout_rate=0.1, pass_per_transformer_block_count=1, enable_positional_embedding=True, enable_embedding_tying=False, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)

Implements BERT training-validation loop

__init__(tensor_schema, block_count=2, head_count=4, hidden_size=256, max_seq_len=100, dropout_rate=0.1, pass_per_transformer_block_count=1, enable_positional_embedding=True, enable_embedding_tying=False, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)
Parameters
  • (TensorSchema) (tensor_schema) – Tensor schema of features.

  • block_count (int) – Number of Transformer blocks. Default: 2.

  • head_count (int) – Number of Attention heads. Default: 4.

  • hidden_size (int) – Hidden size of transformer. Default: 256.

  • max_seq_len (int) – Max length of sequence. Default: 100.

  • (float) (dropout_rate) – Dropout rate. Default: 0.1.

  • pass_per_transformer_block_count (int) – Number of times to pass data over each Transformer block. Default: 1.

  • enable_positional_embedding (bool) – Add positional embedding to the result. Default: True.

  • enable_embedding_tying (bool) – Use embedding tying head. If True - result scores are calculated by dot product of input and output embeddings, if False - default linear layer is applied to calculate logits for each item. Default: False.

  • loss_type (str) – Loss type. Possible values: "CE", "BCE". Default: CE.

  • (Optional[int]) (loss_sample_count) – Sample count to calculate loss. Default: None.

  • negative_sampling_strategy (str) – Negative sampling strategy to calculate loss on sampled negatives. Is used when large count of items in dataset. Possible values: "global_uniform", "inbatch" Default: global_uniform.

  • negatives_sharing (bool) – Apply negative sharing in calculating sampled logits. Default: False.

  • optimizer_factory (OptimizerFactory) – Optimizer factory. Default: FatOptimizerFactory.

  • lr_scheduler_factory (Optional[LRSchedulerFactory]) – Learning rate schedule factory. Default: None.

predict(batch, candidates_to_score=None)
Parameters
  • (Bert4RecPredictionBatch) (batch) – Batch of prediction data.

  • candidates_to_score (Optional[LongTensor]) – Item ids to calculate scores. Default: None.

Return type

Tensor

Returns

Calculated scores on prediction batch.

SasRec

class replay.models.nn.SasRec(tensor_schema, block_count=2, head_count=1, hidden_size=50, max_seq_len=200, dropout_rate=0.2, ti_modification=False, time_span=256, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)

SASRec Lightning module.

You can get initialization parameters with attribute hparams for object of SasRec instance.

__init__(tensor_schema, block_count=2, head_count=1, hidden_size=50, max_seq_len=200, dropout_rate=0.2, ti_modification=False, time_span=256, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)
Parameters
  • tensor_schema (TensorSchema) – Tensor schema of features.

  • block_count (int) – Number of Transformer blocks. Default: 2.

  • head_count (int) – Number of Attention heads. Default: 1.

  • hidden_size (int) – Hidden size of transformer. Default: 50.

  • max_seq_len (int) – Max length of sequence. Default: 200.

  • dropout_rate (float) – Dropout rate. Default: 0.2.

  • ti_modification (bool) – Enable time relation. Default: False.

  • time_span (int) – Time span value. Default: 256.

  • loss_type (str) – Loss type. Possible values: "CE", "BCE". Default: CE.

  • (Optional[int]) (loss_sample_count) – Sample count to calculate loss. Default: None.

  • negative_sampling_strategy (str) – Negative sampling strategy to calculate loss on sampled negatives. Is used when large count of items in dataset. Possible values: "global_uniform", "inbatch" Default: global_uniform.

  • negatives_sharing (bool) – Apply negative sharing in calculating sampled logits. Default: False.

  • optimizer_factory (OptimizerFactory) – Optimizer factory. Default: FatOptimizerFactory.

  • lr_scheduler_factory (Optional[LRSchedulerFactory]) – Learning rate schedule factory. Default: None.

predict(batch, candidates_to_score=None)
Parameters
  • batch (SasRecPredictionBatch) – Batch of prediction data.

  • candidates_to_score (Optional[LongTensor]) – Item ids to calculate scores. Default: None.

Return type

Tensor

Returns

Calculated scores.

Compiled sequential models

Sequential models like SasRec and Bert4Rec can be converted to ONNX format for fast inference on CPU.

SasRecCompiled

class replay.models.nn.sequential.compiled.SasRecCompiled(compiled_model, schema)

SasRec CPU-optimized model for inference via OpenVINO. It is recommended to compile model with compile method and pass SasRec checkpoint or the model object itself into it. It is also possible to compile model by yourself and pass it to the __init__ with TensorSchema.

Note that compilation requires disk write (and maybe delete) permission.

classmethod compile(model, mode='one_query', batch_size=None, num_candidates_to_score=None, num_threads=None, onnx_path=None)

Model compilation.

Parameters
  • model (Union[SasRec, str, Path]) – Path to lightning SasRec model saved in .ckpt format or the SasRec object itself.

  • mode (Literal['batch', 'one_query', 'dynamic_batch_size']) –

    Inference mode, defines shape of inputs. Could be one of [one_query, batch, dynamic_batch_size].

    one_query - sets input shape to [1, max_seq_len]

    batch - sets input shape to [batch_size, max_seq_len]

    dynamic_batch_size - sets batch_size to dynamic range [?, max_seq_len]

    Default: one_query.

  • batch_size (Optional[int]) – Batch size, required for batch mode. Default: None.

  • num_candidates_to_score (Optional[int]) –

    Number of item ids to calculate scores. Could be one of [None, -1, N].

    -1 - sets candidates_to_score shape to dynamic range [1, ?]

    N - sets candidates_to_score shape to [1, N]

    None - disable candidates_to_score usage

    Default: None.

  • num_threads (Optional[int]) – Number of CPU threads to use. Must be a natural number or None. If None, then compiler will set this parameter automatically. Default: None.

  • onnx_path (Optional[str]) – Save ONNX model to path, if defined. Default: None.

Return type

SasRecCompiled

predict(batch, candidates_to_score=None)

Inference on one batch.

Parameters
  • batch (SasRecPredictionBatch) – Prediction input.

  • candidates_to_score (Optional[LongTensor]) – Item ids to calculate scores. Default: None.

Return type

Tensor

Returns

Tensor with scores.

Bert4RecCompiled

class replay.models.nn.sequential.compiled.Bert4RecCompiled(compiled_model, schema)

Bert4Rec CPU-optimized model for inference via OpenVINO. It is recommended to compile model with compile method and pass Bert4Rec checkpoint or the model object itself into it. It is also possible to compile model by yourself and pass it to the __init__ with TensorSchema.

Note that compilation requires disk write (and maybe delete) permission.

classmethod compile(model, mode='one_query', batch_size=None, num_candidates_to_score=None, num_threads=None, onnx_path=None)

Model compilation.

Parameters
  • model (Union[Bert4Rec, str, Path]) – Path to lightning Bert4Rec model saved in .ckpt format or the Bert4Rec object itself.

  • mode (Literal['batch', 'one_query', 'dynamic_batch_size']) –

    Inference mode, defines shape of inputs. Could be one of [one_query, batch, dynamic_batch_size].

    one_query - sets input shape to [1, max_seq_len]

    batch - sets input shape to [batch_size, max_seq_len]

    dynamic_batch_size - sets batch_size to dynamic range [?, max_seq_len]

    Default: one_query.

  • batch_size (Optional[int]) – Batch size, required for batch mode. Default: None.

  • num_candidates_to_score (Optional[int]) –

    Number of item ids to calculate scores. Could be one of [None, -1, N].

    -1 - sets candidates_to_score shape to dynamic range [1, ?]

    N - sets candidates_to_score shape to [1, N]

    None - disables candidates_to_score usage

    Default: None.

  • num_threads (Optional[int]) – Number of CPU threads to use. Must be a natural number or None. If None, then compiler will set this parameter automatically. Default: None.

  • onnx_path (Optional[str]) – Save ONNX model to path, if defined. Default: None.

Return type

Bert4RecCompiled

predict(batch, candidates_to_score=None)

Inference on one batch.

Parameters
  • batch (Bert4RecPredictionBatch) – Prediction input.

  • candidates_to_score (Optional[LongTensor]) – Item ids to calculate scores. Default: None.

Return type

Tensor

Returns

Tensor with scores.

Features for easy training and validation with Lightning

Replay provides Callbacks and Postprocessors to make the model training and validation process as convenient as possible.

During training:

You can define the list of validation metrics and the model is determined to be the best and is saved if the metric updates its value during validation.

During inference:

You can get the recommendations in four formats: PySpark DataFrame, Pandas DataFrame, Polars DataFrame, PyTorch tensors. Each of the types corresponds a callback. You can filter the results using postprocessors strategy.

For a better understanding, you should look at examples of using neural network models.

Callbacks

ValidationMetricsCallback

class replay.models.nn.sequential.callbacks.ValidationMetricsCallback(metrics=None, ks=None, postprocessors=None, item_count=None)

Callback for validation and testing stages.

If multiple validation/testing dataloaders are used, the suffix of the metric name will contain the serial number of the dataloader.

__init__(metrics=None, ks=None, postprocessors=None, item_count=None)
Parameters
  • metrics (Optional[List[Literal['recall', 'precision', 'ndcg', 'map', 'mrr', 'novelty', 'coverage']]]) – Sequence of metrics to calculate.

  • ks (Optional[List[int]]) – highest k scores in ranking. Default: will be [1, 5, 10, 20].

  • postprocessors (Optional[List[BasePostProcessor]]) – postprocessors to validation stage.

  • item_count (Optional[int]) – the total number of items in the dataset, required only for Coverage calculations.

SparkPredictionCallback

class replay.models.nn.sequential.callbacks.SparkPredictionCallback(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)

Callback for prediction stage with spark data frame

__init__(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)
Parameters
  • top_k (int) – Takes the highest k scores in the ranking.

  • query_column (str) – query column name.

  • item_column (str) – item column name.

  • rating_column (str) – rating column name.

  • postprocessors (Optional[List[BasePostProcessor]]) – postprocessors to apply.

get_result()
Return type

TypeVar(_T)

Returns

prediction result

PandasPredictionCallback

class replay.models.nn.sequential.callbacks.PandasPredictionCallback(top_k, query_column, item_column, rating_column='rating', postprocessors=None)

Callback for predition stage with pandas data frame

__init__(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
Parameters
  • top_k (int) – Takes the highest k scores in the ranking.

  • query_column (str) – query column name.

  • item_column (str) – item column name.

  • rating_column (str) – rating column name.

  • postprocessors (Optional[List[BasePostProcessor]]) – postprocessors to apply.

get_result()
Return type

TypeVar(_T)

Returns

prediction result

TorchPredictionCallback

class replay.models.nn.sequential.callbacks.TorchPredictionCallback(top_k, postprocessors=None)

Callback for predition stage with tuple of tensors

__init__(top_k, postprocessors=None)
Parameters
  • top_k (int) – Takes the highest k scores in the ranking.

  • postprocessors (Optional[List[BasePostProcessor]]) – postprocessors to apply.

get_result()
Return type

TypeVar(_T)

Returns

prediction result

QueryEmbeddingsPredictionCallback

class replay.models.nn.sequential.callbacks.QueryEmbeddingsPredictionCallback

Callback for prediction stage to get query embeddings.

__init__()
get_result()
Returns

Query embeddings through all batches.

Postprocessors

RemoveSeenItems

class replay.models.nn.sequential.postprocessors.postprocessors.RemoveSeenItems(sequential)

Filters out the items that already have been seen in dataset.

__init__(sequential)

SampleItems

class replay.models.nn.sequential.postprocessors.postprocessors.SampleItems(grouped_validation_items, user_col, item_col, items_list, sample_count)

Generates negative samples to compute sampled metrics

__init__(grouped_validation_items, user_col, item_col, items_list, sample_count)