Models
This module contains recommender system algorithms including:
distributed models built in PySpark
neural networks build in PyTorch with distributed inference in PySpark
wrappers for commonly used recommender systems libraries andmodels with non-distributed training and distributed inference in PySpark.
RePlay Recommenders
Algorithm |
Implementation |
---|---|
Popular Recommender |
PySpark |
Popular By Queries |
PySpark |
Wilson Recommender |
PySpark |
Random Recommender |
PySpark |
UCB |
PySpark |
KL-UCB |
PySpark/Python CPU |
LinUCB |
PySpark/Python CPU |
Thompson Sampling |
PySpark |
K-Nearest Neighbours |
PySpark |
Alternating Least Squares |
PySpark |
SLIM |
PySpark |
Word2Vec Recommender |
PySpark |
Association Rules Item-to-Item Recommender |
PySpark |
Cluster Recommender |
PySpark |
Neural Matrix Factorization (Experimental) |
Python CPU/GPU |
MultVAE (Experimental) |
Python CPU/GPU |
DDPG (Experimental) |
Python CPU |
DT4Rec (Experimental) |
Python CPU/GPU |
NeuralTS (Experimental) |
Python CPU/GPU |
ADMM SLIM (Experimental) |
Python CPU |
Wrapper for implicit (Experimental) |
Python CPU |
Wrapper for LightFM (Experimental) |
Python CPU |
RL-based CQL Recommender (Experimental) |
PySpark |
ULinUCB (Experimental) |
Python CPU |
Hierarchical Recommender (Experimental) |
PySpark |
To get more info on how to choose base model, please see this page.
Recommender interface
Recommender
- class replay.models.Recommender
Usual recommender class for models without features.
- fit(dataset)
Fit a recommendation model
- Parameters
dataset (
Dataset
) – historical interactions with query/item features[user_idx, item_idx, timestamp, rating]
- Return type
None
- Returns
- fit_predict(dataset, k, queries=None, items=None, filter_seen_items=True, recs_file_path=None)
Fit model and get recommendations
- Parameters
dataset (
Dataset
) – historical interactions with query/item features[user_idx, item_idx, timestamp, rating]
k (
int
) – number of recommendations for each queryqueries (
Union
[DataFrame
,Iterable
,None
]) – queries to create recommendations for dataframe containing[user_idx]
orarray-like
; ifNone
, recommend to all queries frominteractions
items (
Union
[DataFrame
,Iterable
,None
]) – candidate items for recommendations dataframe containing[item_idx]
orarray-like
; ifNone
, take all items frominteractions
. If it contains new items,rating
for them will be0
.filter_seen_items (
bool
) – flag to remove seen items from recommendations based oninteractions
.recs_file_path (
Optional
[str
]) – save recommendations at the given absolute path as parquet file. If None, cached and materialized recommendations dataframe will be returned
- Return type
Optional
[DataFrame
]- Returns
cached recommendation dataframe with columns
[user_idx, item_idx, rating]
or None if file_path is provided
- get_features(ids)
Returns query or item feature vectors as a Column with type ArrayType
- Parameters
ids (
DataFrame
) – Spark DataFrame with unique ids- Return type
Optional
[Tuple
[DataFrame
,int
]]- Returns
feature vectors. If a model does not have a vector for some ids they are not present in the final result.
- predict(dataset, k, queries=None, items=None, filter_seen_items=True, recs_file_path=None)
Get recommendations
- Parameters
dataset (
Dataset
) – historical interactions with query/item features[user_idx, item_idx, timestamp, rating]
k (
int
) – number of recommendations for each queryqueries (
Union
[DataFrame
,Iterable
,None
]) – queries to create recommendations for dataframe containing[user_idx]
orarray-like
; ifNone
, recommend to all queries frominteractions
items (
Union
[DataFrame
,Iterable
,None
]) – candidate items for recommendations dataframe containing[item_idx]
orarray-like
; ifNone
, take all items frominteractions
. If it contains new items,rating
for them will be0
.filter_seen_items (
bool
) – flag to remove seen items from recommendations based oninteractions
.recs_file_path (
Optional
[str
]) – save recommendations at the given absolute path as parquet file. If None, cached and materialized recommendations dataframe will be returned
- Return type
Optional
[DataFrame
]- Returns
cached recommendation dataframe with columns
[user_idx, item_idx, rating]
or None if file_path is provided
- predict_pairs(pairs, dataset=None, recs_file_path=None, k=None)
Get recommendations for specific query-item
pairs
. If a model can’t produce recommendation for specific pair it is removed from the resulting dataframe.- Parameters
pairs (
DataFrame
) – dataframe with pairs to calculate rating for,[user_idx, item_idx]
.dataset (
Optional
[Dataset
]) – historical interactions with query/item features[user_idx, item_idx, timestamp, rating]
recs_file_path (
Optional
[str
]) – save recommendations at the given absolute path as parquet file. If None, cached and materialized recommendations dataframe will be returnedk (
Optional
[int
]) – top-k items for each query from pairs.
- Return type
Optional
[DataFrame
]- Returns
cached recommendation dataframe with columns
[user_idx, item_idx, rating]
or None if file_path is provided
BaseRecommender
- class replay.models.base_rec.BaseRecommender
Base recommender
- optimize(train_dataset, test_dataset, param_borders=None, criterion=<class 'replay.metrics.ndcg.NDCG'>, k=10, budget=10, new_study=True)
Searches the best parameters with optuna.
- Parameters
train_dataset (
Dataset
) – train datatest_dataset (
Dataset
) – test dataparam_borders (
Optional
[Dict
[str
,List
[Any
]]]) – a dictionary with search borders, where key is the parameter name and value is the range of possible values{param: [low, high]}
. In case of categorical parameters it is all possible values:{cat_param: [cat_1, cat_2, cat_3]}
.criterion (
Metric
) – metric to use for optimizationk (
int
) – recommendation list lengthbudget (
int
) – number of points to trynew_study (
bool
) – keep searching with previous study or start a new study
- Return type
Optional
[Dict
[str
,Any
]]- Returns
dictionary with best parameters
Distributed models
Models with both training and inference implemented in pyspark.
Popular Recommender
- class replay.models.PopRec(use_rating=False, add_cold_items=True, cold_weight=0.5)
Recommend objects using their popularity.
Popularity of an item is a probability that random user rated this item.
\[Popularity(i) = \dfrac{N_i}{N}\]\(N_i\) - number of users who rated item \(i\)
\(N\) - total number of users
>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> data_frame = pd.DataFrame( ... {"user_id": [1, 1, 2, 2, 3, 4], ... "item_id": [1, 2, 2, 3, 3, 3], ... "rating": [0.5, 1, 0.1, 0.8, 0.7, 1]} ... ) >>> data_frame user_id item_id rating 0 1 1 0.5 1 1 2 1.0 2 2 2 0.1 3 2 3 0.8 4 3 3 0.7 5 4 3 1.0
>>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... ] ... ) >>> interactions = convert2spark(data_frame) >>> dataset = Dataset(feature_schema, interactions) >>> res = PopRec().fit_predict(dataset, 1) >>> res.toPandas().sort_values("user_id", ignore_index=True) user_id item_id rating 0 1 3 0.75 1 2 1 0.25 2 3 2 0.50 3 4 2 0.50
>>> res = PopRec().fit_predict(dataset, 1, filter_seen_items=False) >>> res.toPandas().sort_values("user_id", ignore_index=True) user_id item_id rating 0 1 3 0.75 1 2 3 0.75 2 3 3 0.75 3 4 3 0.75
>>> res = PopRec(use_rating=True).fit_predict(dataset, 1) >>> res.toPandas().sort_values("user_id", ignore_index=True) user_id item_id rating 0 1 3 0.625 1 2 1 0.125 2 3 2 0.275 3 4 2 0.275
Query Popular Recommender
- class replay.models.QueryPopRec
Recommends old objects from each query’s personal top. Input is the number of interactions between queries and items.
Popularity for item \(i\) and query \(u\) is defined as the fraction of actions with item \(i\) among all interactions of query \(u\):
\[Popularity(i_u) = \dfrac{N_iu}{N_u}\]\(N_iu\) - number of interactions of query \(u\) with item \(i\). \(N_u\) - total number of interactions of query \(u\).
>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> data_frame = pd.DataFrame({"user_id": [1, 1, 3], "item_id": [1, 2, 3], "rating": [2, 1, 1]}) >>> data_frame user_id item_id rating 0 1 1 2 1 1 2 1 2 3 3 1
>>> interactions = convert2spark(data_frame) >>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... ] ... ) >>> dataset = Dataset(feature_schema, interactions) >>> model = QueryPopRec() >>> res = model.fit_predict(dataset, 1, filter_seen_items=False) >>> model.query_item_popularity.count() 3 >>> res.toPandas().sort_values("user_id", ignore_index=True) user_id item_id rating 0 1 1 0.666667 1 3 3 1.000000
Wilson Recommender
Confidence interval for binomial distribution can be calculated as:
Where \(\hat{p}\) – is an observed fraction of positive ratings.
\(z_{\alpha}\) 1-alpha quantile of normal distribution.
- class replay.models.Wilson(alpha=0.05, add_cold_items=True, cold_weight=0.5, sample=False, seed=None)
Calculates lower confidence bound for the confidence interval of true fraction of positive ratings.
rating
must be converted to binary 0-1 form.>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> data_frame = pd.DataFrame({"user_id": [1, 2], "item_id": [1, 2], "rating": [1, 1]}) >>> interactions = convert2spark(data_frame) >>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... ] ... ) >>> dataset = Dataset(feature_schema, interactions) >>> model = Wilson() >>> model.fit_predict(dataset, k=1).toPandas() user_id item_id rating 0 1 2 0.206549 1 2 1 0.206549
Random Recommender
- class replay.models.RandomRec(distribution='uniform', alpha=0.0, seed=None, add_cold_items=True, cold_weight=0.5)
Recommend random items, either weighted by item popularity or uniform.
\[P\left(i\right)\propto N_i + \alpha\]\(N_i\) — number of users who rated item \(i\)
- \(\alpha\) — bigger \(\alpha\) values increase amount of rare items in recommendations.
Must be bigger than -1. Default value is \(\alpha = 0\).
Model without seed provides non-determenistic recommendations, model with fixed seed provides reproducible recommendataions.
As the recommendations from predict are cached, save them to disk, or create a checkpoint and unpersist them to get different recommendations after another predict call.
>>> from replay.utils.session_handler import get_spark_session, State >>> spark = get_spark_session(1, 1) >>> state = State(spark)
>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> >>> interactions = convert2spark(pd.DataFrame({ ... "user_id": [1, 1, 2, 2, 3, 4], ... "item_id": [1, 2, 2, 3, 3, 3] ... })) >>> interactions.show() +-------+-------+ |user_id|item_id| +-------+-------+ | 1| 1| | 1| 2| | 2| 2| | 2| 3| | 3| 3| | 4| 3| +-------+-------+ >>> random_pop = RandomRec(distribution="popular_based", alpha=-1) Traceback (most recent call last): ... ValueError: alpha must be bigger than -1
>>> random_pop = RandomRec(distribution="abracadabra") Traceback (most recent call last): ... ValueError: distribution can be one of [popular_based, relevance, uniform]
>>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... ] ... ) >>> dataset = Dataset(feature_schema, interactions) >>> random_pop = RandomRec(distribution="popular_based", alpha=1.0, seed=777) >>> random_pop.fit(dataset) >>> random_pop.item_popularity.show() +-------+------------------+ |item_id| rating| +-------+------------------+ | 1|0.2222222222222222| | 2|0.3333333333333333| | 3|0.4444444444444444| +-------+------------------+ >>> recs = random_pop.predict(dataset, 2) >>> recs.show() +-------+-------+-------------------+ |user_id|item_id| rating| +-------+-------+-------------------+ | 1| 3| 0.5403400662326382| | 2| 1| 0.8999281214873164| | 3| 1| 0.909252584485465| | 3| 2|0.14871615041196307| | 4| 2| 0.9923148665602072| | 4| 1| 0.4814574953690183| +-------+-------+-------------------+ >>> recs = random_pop.predict(dataset, 2, queries=[1], items=[7, 8]) >>> recs.show() +-------+-------+-------------------+ |user_id|item_id| rating| +-------+-------+-------------------+ | 1| 7|0.25547770543750425| | 1| 8|0.12755340075617772| +-------+-------+-------------------+ >>> random_pop = RandomRec(seed=555) >>> random_pop.fit(dataset) >>> random_pop.item_popularity.show() +-------+------------------+ |item_id| rating| +-------+------------------+ | 1|0.3333333333333333| | 2|0.3333333333333333| | 3|0.3333333333333333| +-------+------------------+
- __init__(distribution='uniform', alpha=0.0, seed=None, add_cold_items=True, cold_weight=0.5)
- Parameters
distribution (
str
) – recommendation strategy: “uniform” - all items are sampled uniformly “popular_based” - recommend popular items morealpha (
float
) – bigger values adjust model towards less popular itemsseed (
Optional
[int
]) – random seedadd_cold_items (
bool
) – flag to consider cold items in recommendations building if present in items parameter of predict method or pairs parameter of predict_pairs methods. If true, cold items are assigned relevance equals to the less relevant item relevance multiplied by cold_weight and may appear among top-K recommendations. Otherwise cold items are filtered out. Could be changed after model training by setting the add_cold_items attribute.cold_weight (
float
) – if add_cold_items is True, cold items are added with reduced relevance. The relevance for cold items is equal to the relevance of a least relevant item multiplied by a cold_weight value. Cold_weight value should be in interval (0, 1].
UCB Recommender
- class replay.models.UCB(exploration_coef=2, sample=False, seed=None)
Simple bandit model, which caclulate item rating as upper confidence bound (UCB) for the confidence interval of true fraction of positive ratings. Should be used in iterative (online) mode to achive proper recommendation quality.
rating
from interactions must be converted to binary 0-1 form.\[pred_i = ctr_i + \sqrt{\frac{c\ln{n}}{n_i}}\]\(pred_i\) – predicted rating of item \(i\) \(c\) – exploration coeficient \(n\) – number of interactions in log \(n_i\) – number of interactions with item \(i\)
>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> data_frame = pd.DataFrame({"user_id": [1, 2, 3, 3], "item_id": [1, 2, 1, 2], "rating": [1, 0, 0, 0]}) >>> interactions = convert2spark(data_frame) >>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... ] ... ) >>> dataset = Dataset(feature_schema, interactions) >>> model = UCB() >>> model.fit(dataset) >>> model.predict(dataset, k=2, queries=[1,2,3,4], items=[1,2,3] ... ).toPandas().sort_values(["user_id","rating","item_id"], ... ascending=[True,False,True]).reset_index(drop=True) user_id item_id rating 0 1 3 2.665109 1 1 2 1.177410 2 2 3 2.665109 3 2 1 1.677410 4 3 3 2.665109 5 4 3 2.665109 6 4 1 1.677410
- __init__(exploration_coef=2, sample=False, seed=None)
- Parameters
exploration_coef (
float
) – exploration coefficientsample (
bool
) – flag to choose recommendation strategy. If True, items are sampled with a probability proportional to the calculated predicted rating. Could be changed after model training by setting the sample attribute.seed (
Optional
[int
]) – random seed. Provides reproducibility if fixed
KL-UCB Recommender
- class replay.models.KLUCB(exploration_coef=0.0, sample=False, seed=None)
Bernoulli bandit model. Same to
UCB
computes item relevance as an upper confidence bound of true fraction of positive interactions.In a nutshell, KL-UCB considers the data as the history of interactions with items. The interaction may be either positive or negative. For each item the model computes empirical frequency of positive interactions and estimates the true frequency with an upper confidence bound. The higher the bound for an item is the more relevant it is presumed.
The upper bound below is what differs from the classical UCB. It is computed according to the original article where is proven to produce assymptotically better results.
\[u_i = \max q \in [0,1] : n_i \cdot \operatorname{KL}\left(\frac{p_i}{n_i}, q \right) \leqslant \log(n) + c \log(\log(n)),\]where
\(u_i\) – upper bound for item \(i\),
\(c\) – exploration coeficient,
\(n\) – number of interactions in log,
\(n_i\) – number of interactions with item \(i\),
\(p_i\) – number of positive interactions with item \(i\),
and
\[\operatorname{KL}(p, q) = p \log\frac{p}{q} + (1-p)\log\frac{1-p}{1-q}\]is the KL-divergence of Bernoulli distribution with parameter \(p\) from Bernoulli distribution with parameter \(q\).
Being a bit trickier though, the bound shares with UCB the same exploration-exploitation tradeoff dilemma. You may increase the c coefficient in order to shift the tradeoff towards exploration or decrease it to set the model to be more sceptical of items with small volume of collected statistics. The authors of the article though claim c = 0 to be of the best choice in practice.
As any other RePlay model, KL-UCB takes a log to fit on as a
DataFrame
with columns[user_idx, item_idx, timestamp, relevance]
. Following the procedure above, KL-UCB would see each row as a record of an interaction withitem_idx
with positive (relevance = 1) or negative (relevance = 0) outcome.user_idx
andtimestamp
are ignored i.e. the model treats log as non-personalized - item scores are same for all users.If
relevance
column is not of 0/1 initially, then you have to decide what kind of relevance has to be considered as positive and convertrelevance
to binary format during preprocessing.To provide a prediction, KL-UCB would sample a set of recommended items for each user with probabilites proportional to obtained relevances.
>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> data_frame = pd.DataFrame({"user_id": [1, 2, 3, 3], "item_id": [1, 2, 1, 2], "rating": [1, 0, 0, 0]}) >>> interactions = convert2spark(data_frame) >>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... ] ... ) >>> dataset = Dataset(feature_schema, interactions) >>> model = KLUCB() >>> model.fit(dataset) >>> model.predict(dataset, k=2, queries=[1,2,3,4], items=[1,2,3] ... ).toPandas().sort_values(["user_id","rating","item_id"], ... ascending=[True,False,True]).reset_index(drop=True) user_id item_id rating 0 1 3 1.000000 1 1 2 0.750000 2 2 3 1.000000 3 2 1 0.933013 4 3 3 1.000000 5 4 3 1.000000 6 4 1 0.933013
- __init__(exploration_coef=0.0, sample=False, seed=None)
- Parameters
exploration_coef (
float
) – exploration coefficientsample (
bool
) – flag to choose recommendation strategy. If True, items are sampled with a probability proportional to the calculated predicted relevance. Could be changed after model training by setting the sample attribute.seed (
Optional
[int
]) – random seed. Provides reproducibility if fixed
LinUCB Recommender
- class replay.models.LinUCB(eps, alpha=1.0, is_hybrid=False)
A recommender algorithm for contextual bandit problems.
Implicitly proposed by Li et al. The model assumes a linear relationship between user context, item features and action rewards, making it efficient for high-dimensional contexts.
- Note:
It’s recommended to scale features to a similar range (e.g., using StandardScaler or MinMaxScaler) to ensure proper convergence and prevent numerical instability (since relationships to learn are linear).
>>> import pandas as pd >>> from replay.data.dataset import ( ... Dataset, FeatureHint, FeatureInfo, FeatureSchema, FeatureSource, FeatureType ... ) >>> interactions = pd.DataFrame({"user_id": [0, 1, 2, 2], "item_id": [0, 1, 0, 1], "rating": [1, 0, 0, 0]}) >>> user_features = pd.DataFrame( ... {"user_id": [0, 1, 2], "usr_feat_1": [1, 2, 3], "usr_feat_2": [4, 5, 6], "usr_feat_3": [7, 8, 9]} ... ) >>> item_features = pd.DataFrame( ... { ... "item_id": [0, 1, 2, 3, 4, 5], ... "itm_feat_1": [1, 2, 3, 4, 5, 6], ... "itm_feat_2": [7, 8, 9, 10, 11, 12], ... "itm_feat_3": [13, 14, 15, 16, 17, 18] ... } ... ) >>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... *[ ... FeatureInfo( ... column=name, feature_type=FeatureType.NUMERICAL, feature_source=FeatureSource.ITEM_FEATURES, ... ) ... for name in ["itm_feat_1", "itm_feat_2", "itm_feat_3"] ... ], ... *[ ... FeatureInfo( ... column=name, feature_type=FeatureType.NUMERICAL, feature_source=FeatureSource.QUERY_FEATURES ... ) ... for name in ["usr_feat_1", "usr_feat_2", "usr_feat_3"] ... ], ... ] ... ) >>> dataset = Dataset( ... feature_schema=feature_schema, ... interactions=interactions, ... item_features=item_features, ... query_features=user_features, ... categorical_encoded=True, ... ) >>> dataset.to_spark() >>> model = LinUCB(eps=-10.0, alpha=1.0, is_hybrid=False) >>> model.fit(dataset) >>> model.predict(dataset, k=2, queries=[0,1,2]).toPandas().sort_values(["user_id","rating","item_id"], ... ascending=[True,False,True]).reset_index(drop=True) user_id item_id rating 0 0 1 -11.073741 1 0 2 -81.240384 2 1 0 -6.555529 3 1 2 -96.436508 4 2 2 -112.249722 5 2 3 -112.249722
- __init__(eps, alpha=1.0, is_hybrid=False)
- Parameters
eps (
float
) – exploration coefficientalpha (
float
) – ridge parameteris_hybrid (
bool
) – flag to choose model type. If True, model is hybrid.
Thompson Sampling
- class replay.models.ThompsonSampling(sample=False, seed=None)
Thompson Sampling recommender.
Bandit model with efficient exploration-exploitation balance. The reward probability of each of the K arms is modeled by a Beta distribution which is updated after an arm is selected. The initial prior distribution is Beta(1,1).
- __init__(sample=False, seed=None)
K Nearest Neighbours
- class replay.models.ItemKNN(num_neighbours=10, use_rating=False, shrink=0.0, weighting=None, index_builder=None)
Item-based ItemKNN with modified cosine similarity measure.
- __init__(num_neighbours=10, use_rating=False, shrink=0.0, weighting=None, index_builder=None)
- Parameters
num_neighbours (
int
) – number of neighboursuse_rating (
bool
) – flag to use rating values as is or to treat them as 1shrink (
float
) – term added to the denominator when calculating similarityweighting (
Optional
[str
]) – item reweighting type, one of [None, ‘tf_idf’, ‘bm25’]index_builder (
Optional
[IndexBuilder
]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.
Alternating Least Squares
- class replay.models.ALSWrap(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_query_blocks=None)
Wrapper for Spark ALS.
- __init__(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_query_blocks=None)
- Parameters
rank (
int
) – hidden dimension for the approximate matriximplicit_prefs (
bool
) – flag to use implicit feedbackseed (
Optional
[int
]) – random seednum_item_blocks (
Optional
[int
]) – number of blocks the items will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of interactions.num_query_blocks (
Optional
[int
]) – number of blocks the queries will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of interactions.
Alternating Least Squares on Scala (Experimental)
- class replay.experimental.models.ScalaALSWrap(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_user_blocks=None, index_builder=None)
Wrapper for Spark ALS.
- __init__(rank=10, implicit_prefs=True, seed=None, num_item_blocks=None, num_user_blocks=None, index_builder=None)
- Parameters
rank (
int
) – hidden dimension for the approximate matriximplicit_prefs (
bool
) – flag to use implicit feedbackseed (
Optional
[int
]) – random seednum_item_blocks (
Optional
[int
]) – number of blocks the items will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of log.num_user_blocks (
Optional
[int
]) – number of blocks the users will be partitioned into in order to parallelize computation. if None then will be init with number of partitions of log.
SLIM
SLIM Recommender calculates similarity between objects to produce recommendations \(W\).
Loss function is:
\(W\) – item similarity matrix
\(A\) – interaction matrix
Finding \(W\) can be splitted into solving separate linear regressions with ElasticNet regularization. Thus each row in \(W\) is optimized with
To remove trivial solution, we add an extra requirements \(w_{jj}=0\), and \(w_{ij}\ge 0\)
- class replay.models.SLIM(beta=0.01, lambda_=0.01, seed=None, index_builder=None, allow_collect_to_master=False)
SLIM: Sparse Linear Methods for Top-N Recommender Systems
- __init__(beta=0.01, lambda_=0.01, seed=None, index_builder=None, allow_collect_to_master=False)
- Parameters
beta (
float
) – l2 regularizationlambda – l1 regularization
seed (
Optional
[int
]) – random seedindex_builder (
Optional
[IndexBuilder
]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.allow_collect_to_master (
bool
) – Flag allowing spark to make a collection to the master node, Default:False
.
Word2Vec Recommender
- class replay.models.Word2VecRec(rank=100, min_count=5, step_size=0.025, max_iter=1, window_size=1, use_idf=False, seed=None, num_partitions=None, index_builder=None)
Trains word2vec model where items are treated as words and queries as sentences.
- __init__(rank=100, min_count=5, step_size=0.025, max_iter=1, window_size=1, use_idf=False, seed=None, num_partitions=None, index_builder=None)
- Parameters
rank (
int
) – embedding sizemin_count (
int
) – the minimum number of times a token must appear to be included in the word2vec model’s vocabularystep_size (
int
) – step size to be used for each iteration of optimizationmax_iter (
int
) – max number of iterationswindow_size (
int
) – window sizeuse_idf (
bool
) – flag to use inverse document frequencyseed (
Optional
[int
]) – random seedindex_builder (
Optional
[IndexBuilder
]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.
Association Rules Item-to-Item Recommender
- class replay.models.AssociationRulesItemRec(session_column, min_item_count=5, min_pair_count=5, num_neighbours=1000, use_rating=False, similarity_metric='confidence', index_builder=None)
Item-to-item recommender based on association rules. Calculate pairs confidence, lift and confidence_gain defined as confidence(a, b)/confidence(!a, b) to get top-k associated items. Predict items for queries using lift, confidence or confidence_gain metrics.
Forecasts will be based on indicators: lift, confidence or confidence_gain. It all depends on your choice.
During class initialization, you can explicitly specify the metric to be used as a similarity for calculating the predict.
You can change your selection before calling .predict() or .predict_pairs() if you set a new value for the similarity_metric parameter.
Usage of ANN functionality requires only sparse indices and only one similarity_metric, defined in __init__ will be available during inference.
>>> import pandas as pd >>> from replay.data.dataset import Dataset, FeatureSchema, FeatureInfo, FeatureHint, FeatureType >>> from replay.utils.spark_utils import convert2spark >>> data_frame = pd.DataFrame({"user_id": [1, 1, 2, 3], "item_id": [1, 2, 2, 3], "rating": [2, 1, 4, 1]}) >>> data_frame_for_predict = pd.DataFrame({"user_id": [2], "item_id": [1]}) >>> data_frame user_id item_id rating 0 1 1 2 1 1 2 1 2 2 2 4 3 3 3 1 >>> interactions = convert2spark(data_frame) >>> pred_interactions = convert2spark(data_frame_for_predict) >>> feature_schema = FeatureSchema( ... [ ... FeatureInfo( ... column="user_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.QUERY_ID, ... ), ... FeatureInfo( ... column="item_id", ... feature_type=FeatureType.CATEGORICAL, ... feature_hint=FeatureHint.ITEM_ID, ... ), ... FeatureInfo( ... column="rating", ... feature_type=FeatureType.NUMERICAL, ... feature_hint=FeatureHint.RATING, ... ), ... ] ... ) >>> train_dataset = Dataset(feature_schema, interactions) >>> pred_dataset = Dataset(feature_schema.subset(["user_id", "item_id"]), pred_interactions) >>> model = AssociationRulesItemRec(min_item_count=1, min_pair_count=0, session_column="user_id") >>> res = model.fit(train_dataset) >>> model.similarity.orderBy("item_idx_one").show() +------------+------------+----------+----------+----+---------------+ |item_idx_one|item_idx_two|similarity|confidence|lift|confidence_gain| +------------+------------+----------+----------+----+---------------+ | 1| 2| 1.0| 1.0| 1.5| 2.0| | 2| 1| 0.5| 0.5| 1.5| Infinity| +------------+------------+----------+----------+----+---------------+ >>> model.similarity_metric = "confidence" >>> model.predict_pairs(pred_interactions, train_dataset).show() +-------+-------+------+ |user_id|item_id|rating| +-------+-------+------+ | 2| 1| 0.5| +-------+-------+------+ >>> model.similarity_metric = "lift" >>> model.predict_pairs(pred_interactions, train_dataset).show() +-------+-------+------+ |user_id|item_id|rating| +-------+-------+------+ | 2| 1| 1.5| +-------+-------+------+
Classical model uses items co-occurrence in sessions for confidence, lift and confidence_gain calculation but rating could also be passed to the model, e.g. if you want to apply time smoothing and treat old sessions as less important. In this case all items in sessions should have the same rating.
- __init__(session_column, min_item_count=5, min_pair_count=5, num_neighbours=1000, use_rating=False, similarity_metric='confidence', index_builder=None)
- Parameters
session_column (
str
) – name of column to group sessions. Items are combined by theuser_id
column ifsession_column
is not defined.min_item_count (
int
) – items with fewer sessions will be filtered outmin_pair_count (
int
) – pairs with fewer sessions will be filtered outnum_neighbours (
Optional
[int
]) – maximal number of neighbours to save for each itemuse_rating (
bool
) – flag to use rating values instead of co-occurrence count If true, pair rating in session is minimal rating of item in pair. Item rating is sum of rating in all sessions.similarity_metric (
str
) – lift of ‘confidence’ The metric used as a similarity to calculate the prediction, one of [lift
,confidence
,confidence_gain
]index_builder (
Optional
[IndexBuilder
]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.
- get_nearest_items(items, k, metric='lift', candidates=None)
Get k most similar items be the metric for each of the items.
- Parameters
items (
Union
[DataFrame
,Iterable
]) – spark dataframe or list of item ids to find neighborsk (
int
) – number of neighborsmetric (
str
) – lift of ‘confidence_gain’candidates (
Union
[DataFrame
,Iterable
,None
]) – spark dataframe or list of items to consider as similar, e.g. popular/new items. If None, all items presented during model training are used.
- Return type
DataFrame
- Returns
dataframe with the most similar items an distance, where bigger value means greater similarity. spark-dataframe with columns
[item_id, neighbour_item_id, similarity]
Neural models with distributed inference
Models implemented in pytorch with distributed inference in pyspark.
Neural Matrix Factorization (Experimental)
- class replay.experimental.models.NeuroMF(learning_rate=0.05, epochs=20, embedding_gmf_dim=None, embedding_mlp_dim=None, hidden_mlp_dims=None, l2_reg=0, count_negative_sample=1, factor=0.2, patience=3)
Neural Matrix Factorization model (NeuMF, NCF).
In this implementation MLP and GMF modules are optional.
- __init__(learning_rate=0.05, epochs=20, embedding_gmf_dim=None, embedding_mlp_dim=None, hidden_mlp_dims=None, l2_reg=0, count_negative_sample=1, factor=0.2, patience=3)
MLP or GMF model can be ignored if its embedding size (embedding_mlp_dim or embedding_gmf_dim) is set to
None
. Default variant is MLP + GMF with embedding size 128.- Parameters
learning_rate (
float
) – learning rateepochs (
int
) – number of epochs to train modelembedding_gmf_dim (
Optional
[int
]) – embedding size for gmfembedding_mlp_dim (
Optional
[int
]) – embedding size for mlphidden_mlp_dims (
Optional
[List
[int
]]) – list of hidden dimension sized for mlpl2_reg (
float
) – l2 regularization termcount_negative_sample (
int
) – number of negative samples to usefactor (
float
) – ReduceLROnPlateau reducing factor. new_lr = lr * factorpatience (
int
) – number of non-improved epochs before reducing lr
Mult-VAE (Experimental)
Variation AutoEncoder

Problem formulation
We have a sample of independent equally distributed random values from true distribution \(x_i \sim p_d(x)\), \(i = 1, \dots, N\).
Build a probability model \(p_\theta(x)\) for true distribution \(p_d(x)\).
Distribution \(p_\theta(x)\) allows both to estimate probability density for a given item \(x\), and to sample \(x \sim p_\theta(x)\).
Probability model
\(z \in \mathbb{R}^d\) - is a local latent variable, one for each item \(x\).
Generative process for variational autoencoder:
Sample \(z \sim p(z)\).
Sample \(x \sim p_\theta(x | z)\).
Distribution parameters \(p_\theta(x | z)\) are defined with neural net weights \(\theta\), with input \(z\).
Item probability density \(x\):
Use lower estimate bound for the log likelihood.
\(q_\phi(z | x)\) is a proposal or a recognition distribution. It is a gaussian with weights \(\phi\): \(q_\phi(z | x) = \mathcal{N}(z | \mu_\phi(x), \sigma^2_\phi(x)I)\).
Difference between lower estimate bound \(L(x; \phi, \theta)\) and log likelihood \(\log p_\theta(x)\) - is a KL-divergence between a proposal and aposteriory distribution on \(z\): \(KL(q_\phi(z | x) || p_\theta(z | x))\). Maximum value \(L(x; \phi, \theta)\) for fixed model parameters \(\theta\) is reached with \(q_\phi(z | x) = p_\theta(z | x)\), but explicit calculation of \(p_\theta(z | x)\) is not efficient to calculate, so it is also optimized by \(\phi\). The closer \(q_\phi(z | x)\) to \(p_\theta(z | x)\), the better the estimate.
We usually take normal distribution for \(p(z)\):
In this case
KL-divergence coefficient can also not be equal to one, in this case:
With \(\beta = 0\) VAE is the same as the Denoising AutoEncoder.
- class replay.experimental.models.MultVAE(learning_rate=0.01, epochs=100, latent_dim=200, hidden_dim=600, dropout=0.3, anneal=0.1, l2_reg=0, factor=0.2, patience=3)
Variational Autoencoders for Collaborative Filtering
- __init__(learning_rate=0.01, epochs=100, latent_dim=200, hidden_dim=600, dropout=0.3, anneal=0.1, l2_reg=0, factor=0.2, patience=3)
- Parameters
learning_rate (
float
) – learning rateepochs (
int
) – number of epochs to train modellatent_dim (
int
) – latent dimension size for user vectorshidden_dim (
int
) – hidden dimension size for encoder and decoderdropout (
float
) – dropout coefficientanneal (
float
) – anneal coefficient [0,1]l2_reg (
float
) – l2 regularization termfactor (
float
) – ReduceLROnPlateau reducing factor. new_lr = lr * factorpatience (
int
) – number of non-improved epochs before reducing lr
DDPG (Experimental)
- class replay.experimental.models.DDPG(noise_sigma=0.2, noise_theta=0.05, noise_type='gauss', seed=9, user_num=10, item_num=10, log_dir='logs/tmp', exact_embeddings_size=True, n_critics_head=10, env_gamma_alpha=0.2, critic_heads_q=0.15, n_jobs=None, use_gpu=False, user_batch_size=8, min_trajectory_len=10)
Deep Deterministic Policy Gradient
This implementation enhanced by more advanced noise strategy.
- __init__(noise_sigma=0.2, noise_theta=0.05, noise_type='gauss', seed=9, user_num=10, item_num=10, log_dir='logs/tmp', exact_embeddings_size=True, n_critics_head=10, env_gamma_alpha=0.2, critic_heads_q=0.15, n_jobs=None, use_gpu=False, user_batch_size=8, min_trajectory_len=10)
- Parameters
noise_sigma (
float
) – Ornstein-Uhlenbeck noise sigma valuenoise_theta (
float
) – Ornstein-Uhlenbeck noise theta valuenoise_type (
str
) – type of action noise, one of [“ou”, “gauss”]seed (
int
) – random seeduser_num (
int
) – number of users, specify when usingexact_embeddings_size
item_num (
int
) – number of items, specify when usingexact_embeddings_size
log_dir (
str
) – dir to save models
- Exact_embeddings_size
flag whether to set user/item_num from training log
DT4Rec (Experimental)
- class replay.experimental.models.dt4rec.dt4rec.DT4Rec(item_num, user_num, seed=123, trajectory_len=30, epochs=1, batch_size=64, use_cuda=True)
Decision Transformer for Recommendations
- General Idea:
Decision Transformer: Reinforcement Learning via Sequence Modeling.
- Ideas for improvements:
User Retention-oriented Recommendation with Decision Transformer.
Also, some sources are listed in their respective classes
- __init__(item_num, user_num, seed=123, trajectory_len=30, epochs=1, batch_size=64, use_cuda=True)
NeuralTS (Experimental)
- class replay.experimental.models.NeuralTS(user_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, item_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, embedding_sizes=[32, 32, 64], hidden_layers=[32, 20], wide_out_dim=1, deep_out_dim=20, head_dropout=0.8, deep_dropout=0.4, dim_head=20, n_epochs=2, opt_lr=0.0003, lr_min=1e-05, use_gpu=False, use_warp_loss=True, cnt_neg_samples=100, cnt_samples_for_predict=10, exploration_coef=1.0, cnt_users=None, cnt_items=None, plot_dir=None)
‘Neural Thompson sampling recommender <https://dl.acm.org/doi/pdf/10.1145/3383313.3412214>`_ based on Wide&Deep model.
- Parameters
user_cols (
Dict
[str
,List
[str
]]) – user_cols = {‘continuous_cols’:List[str], ‘cat_embed_cols’:List[str], ‘wide_cols’: List[str]}, where List[str] – some column names from user_features dataframe, which is input to the fit method, or empty Listitem_cols (
Dict
[str
,List
[str
]]) – item_cols = {‘continuous_cols’:List[str], ‘cat_embed_cols’:List[str], ‘wide_cols’: List[str]}, where List[str] – some column names from item_features dataframe, which is input to the fit method, or empty Listembedding_sizes (
List
[int
]) – list of length three in which embedding_sizes[0] = embedding size for users, embedding_sizes[1] = embedding size for items, embedding_sizes[2] = embedding size for pair (users, items)hidden_layers (
List
[int
]) – list of hidden layer sizes for Deep modelwide_out_dim (
int
) – output size for the Wide modeldeep_out_dim (
int
) – output size for the Deep modelhead_dropout (
float
) – probability of an element to be zeroed for WideDeep model headdeep_dropout (
float
) – probability of an element to be zeroed for Deep modeldim_head (
int
) – output size for WideDeep model headn_epochs (
int
) – number of epochs for model trainingopt_lr (
float
) – learning rate for the AdamW optimizerlr_min (
float
) – minimum learning rate value for the CosineAnnealingLR learning rate scheduleruse_gpu (
bool
) – if true, the model will be trained on the GPUuse_warp_loss (
bool
) – if true, then warp loss will be used otherwise weighted logistic loss.cnt_neg_samples (
int
) – number of additional negative examples for each usercnt_samples_for_predict (
int
) – number of sampled predictions for one user, which are used to estimate the mean and variance of relevanceexploration_coef (
float
) – exploration coefficientplot_dir (
Optional
[str
]) – file name where the training graphs will be saved, if None, the graphs will not be savedcnt_users (
Optional
[int
]) – number of users, used in Wide&Deep model initializationcnt_items (
Optional
[int
]) – number of items, used in Wide&Deep model initialization
- __init__(user_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, item_cols={'cat_embed_cols': [], 'continuous_cols': [], 'wide_cols': []}, embedding_sizes=[32, 32, 64], hidden_layers=[32, 20], wide_out_dim=1, deep_out_dim=20, head_dropout=0.8, deep_dropout=0.4, dim_head=20, n_epochs=2, opt_lr=0.0003, lr_min=1e-05, use_gpu=False, use_warp_loss=True, cnt_neg_samples=100, cnt_samples_for_predict=10, exploration_coef=1.0, cnt_users=None, cnt_items=None, plot_dir=None)
ULinUCB Recommender (Experimental)
- class replay.experimental.models.ULinUCB(alpha=-2.0)
A recommender implicitly proposed by Song et al. Is used as the default node recommender in
HierarchicalRecommender
. Shares all the logic with classical item-disjointLinUCB
but is user-disjoint instead. May be useful in problems with fixed number of users and item-oriented data.- __init__(alpha=-2.0)
- Parameters
alpha (
float
) – exploration coefficient
CQL Recommender (Experimental)
Conservative Q-Learning (CQL) algorithm is a SAC-based data-driven deep reinforcement learning algorithm, which achieves state-of-the-art performance in offline RL problems.

- class replay.experimental.models.cql.CQL(mdp_dataset_builder, actor_learning_rate=0.0001, critic_learning_rate=0.0003, temp_learning_rate=0.0001, alpha_learning_rate=0.0001, actor_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), critic_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), temp_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), alpha_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), actor_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), critic_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), q_func_factory=MeanQFunctionFactory(share_encoder=False), batch_size=64, n_steps=1, gamma=0.99, tau=0.005, n_critics=2, initial_temperature=1.0, initial_alpha=1.0, alpha_threshold=10.0, conservative_weight=5.0, n_action_samples=10, soft_q_backup=False, use_gpu=False, observation_scaler=None, action_scaler=None, reward_scaler=None, **params)
Conservative Q-Learning algorithm.
CQL is a SAC-based data-driven deep reinforcement learning algorithm, which achieves state-of-the-art performance in offline RL problems.
CQL mitigates overestimation error by minimizing action-values under the current policy and maximizing values under data distribution for underestimation issue.
\[L(\theta_i) = \alpha\, \mathbb{E}_{s_t \sim D} \left[\log{\sum_a \exp{Q_{\theta_i}(s_t, a)}} - \mathbb{E}_{a \sim D} \big[Q_{\theta_i}(s_t, a)\big] - \tau\right] + L_\mathrm{SAC}(\theta_i)\]where \(lpha\) is an automatically adjustable value via Lagrangian dual gradient descent and :math:` au` is a threshold value. If the action-value difference is smaller than :math:` au`, the \(lpha\) will become smaller. Otherwise, the \(lpha\) will become larger to aggressively penalize action-values.
In continuous control, \(\log{\sum_a \exp{Q(s, a)}}\) is computed as follows.
\[\log{\sum_a \exp{Q(s, a)}} \approx \log{\left( \frac{1}{2N} \sum_{a_i \sim \text{Unif}(a)}^N \left[\frac{\exp{Q(s, a_i)}}{\text{Unif}(a)}\right] + \frac{1}{2N} \sum_{a_i \sim \pi_\phi(a|s)}^N \left[\frac{\exp{Q(s, a_i)}}{\pi_\phi(a_i|s)}\right]\right)}\]where \(N\) is the number of sampled actions.
An implementation of this algorithm is heavily based on the corresponding implementation in the d3rlpy library (see https://github.com/takuseno/d3rlpy/blob/master/d3rlpy/algos/cql.py)
The rest of optimization is exactly same as
d3rlpy.algos.SAC
.- References:
- Args:
mdp_dataset_builder (MdpDatasetBuilder): the MDP dataset builder from users’ log. actor_learning_rate (float): learning rate for policy function. critic_learning_rate (float): learning rate for Q functions. temp_learning_rate (float): learning rate for temperature parameter of SAC. alpha_learning_rate (float): learning rate for \(lpha\). actor_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the actor. The available options are [SGD, Adam or RMSprop]. critic_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the critic. The available options are [SGD, Adam or RMSprop]. temp_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the temperature. The available options are [SGD, Adam or RMSprop]. alpha_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for \(lpha\). The available options are [SGD, Adam or RMSprop]. actor_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the actor. The available options are [‘pixel’, ‘dense’, ‘vector’, ‘default’]. See d3rlpy.models.encoders.EncoderFactory for details. critic_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the critic. The available options are [‘pixel’, ‘dense’, ‘vector’, ‘default’]. See d3rlpy.models.encoders.EncoderFactory for details. q_func_factory (d3rlpy.models.q_functions.QFunctionFactory or str): Q function factory. The available options are [‘mean’, ‘qr’, ‘iqn’, ‘fqf’]. See d3rlpy.models.q_functions.QFunctionFactory for details. batch_size (int): mini-batch size. n_steps (int): Number of training steps. gamma (float): discount factor. tau (float): target network synchronization coefficient. n_critics (int): the number of Q functions for ensemble. initial_temperature (float): initial temperature value. initial_alpha (float): initial \(lpha\) value. alpha_threshold (float): threshold value described as :math:` au`. conservative_weight (float): constant weight to scale conservative loss. n_action_samples (int): the number of sampled actions to compute \(\log{\sum_a \exp{Q(s, a)}}\). soft_q_backup (bool): flag to use SAC-style backup. use_gpu (Union[int, str, bool]): device option. If the value is boolean and True, cuda:0 will be used. If the value is integer, cuda:<device> will be used. If the value is string in torch device style, the specified device will be used. observation_scaler (d3rlpy.preprocessing.Scaler or str): preprocessor. The available options are [‘pixel’, ‘min_max’, ‘standard’]. action_scaler (d3rlpy.preprocessing.ActionScaler or str): action preprocessor. The available options are [‘min_max’]. reward_scaler (d3rlpy.preprocessing.RewardScaler or str): reward preprocessor. The available options are [‘clip’, ‘min_max’, ‘standard’]. impl (d3rlpy.algos.torch.cql_impl.CQLImpl): algorithm implementation.
- __init__(mdp_dataset_builder, actor_learning_rate=0.0001, critic_learning_rate=0.0003, temp_learning_rate=0.0001, alpha_learning_rate=0.0001, actor_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), critic_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), temp_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), alpha_optim_factory=AdamFactory(betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False), actor_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), critic_encoder_factory=DefaultEncoderFactory(activation='relu', use_batch_norm=False, dropout_rate=None), q_func_factory=MeanQFunctionFactory(share_encoder=False), batch_size=64, n_steps=1, gamma=0.99, tau=0.005, n_critics=2, initial_temperature=1.0, initial_alpha=1.0, alpha_threshold=10.0, conservative_weight=5.0, n_action_samples=10, soft_q_backup=False, use_gpu=False, observation_scaler=None, action_scaler=None, reward_scaler=None, **params)
Hierarchical models
Hierarchical Recommender (Experimental)
- class replay.experimental.models.HierarchicalRecommender(depth, cluster_model, recommender_class=<class 'replay.experimental.models.u_lin_ucb.ULinUCB'>, recommender_params={})
Hierarchical Recommender class is inspired by the article of Song et al and is a generalization of the method. By default it works as HCB proposed there.
The model sequentially clusterizes the item space constructing a tree of given
depth
. The clusterization is performed according to thecluster_model
- any sklearn clusterer instance provided by the user.At each node of a tree a node recommender instance is mounted. All of them are produced by
recommender_class
object (not an instance!) and are initialized withrecommender_params
.To predict an item the model goes down the tree each time selecting the next node as the one predicted by the parent node recommender. A leaf node recommender would give an item itself.
The log is considered as the history of user-item interactions. To fit the model each interaction is counted in all node recommenders on the path from the root to the item as if such path would be traversed through the prediction process.
Hierarchical Recommender may be useful to enhance the perforamance of simple models not suitable for large item space problems (such as many contextual bandits) and to reduce prediction time in models that need to iterate through all of the items to make a recommendation.
In this version Hierarchical Recommender is implemented as
HybridRecommender
and apart fromlog
requires bothitem_features
anduser_features
infit()
method. By the same reason onlyHybridRecommender
classess may be passed as arecommender_class
. Need in features atpredict()
depends on therecommender_class
itself.Note that current implementation relies mostly on python rather than pyspark.
- __init__(depth, cluster_model, recommender_class=<class 'replay.experimental.models.u_lin_ucb.ULinUCB'>, recommender_params={})
- Parameters
depth – depth of the item tree
cluster_model – an sklearn.cluster object (or any with similar API) that would perform clustering on the item space
recommender_class – a RePlay hybrid recommender class object (not an instance!) instances of which would be mounted at each tree node
recommender_params – initialization parameters for the recommenders
Wrappers and other models with distributed inference
Wrappers for popular recommendation libraries and algorithms implemented in python with distributed inference in pyspark.
ADMM SLIM (Experimental)
- class replay.experimental.models.ADMMSLIM(lambda_1=5, lambda_2=5000, seed=None, index_builder=None)
ADMM SLIM: Sparse Recommendations for Many Users
This is a modification for the basic SLIM model. Recommendations are improved with Alternating Direction Method of Multipliers.
- __init__(lambda_1=5, lambda_2=5000, seed=None, index_builder=None)
- Parameters
lambda_1 (
float
) – l1 regularization termlambda_2 (
float
) – l2 regularization termseed (
Optional
[int
]) – random seedindex_builder (
Optional
[IndexBuilder
]) – IndexBuilder instance that adds ANN functionality. If not set, then ann will not be used.
LightFM (Experimental)
implicit (Experimental)
- class replay.experimental.models.ImplicitWrap(model)
Wrapper for implicit
Example:
>>> import implicit >>> model = implicit.als.AlternatingLeastSquares(factors=5) >>> als = ImplicitWrap(model)
This way you can use implicit models as any other in replay with conversions made under the hood.
>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> df = pd.DataFrame({"user_idx": [1, 1, 2, 2], "item_idx": [1, 2, 2, 3], "relevance": [1, 1, 1, 1]}) >>> df = convert2spark(df) >>> als.fit_predict(df, 1, users=[1])[["user_idx", "item_idx"]].toPandas() user_idx item_idx 0 1 3
- __init__(model)
Provide initialized
implicit
model.
Neural Networks recommenders
Neural Networks recommenders are Lightning-compatible. They can be trained using a Trainer from Lightning module.
Bert4Rec
- class replay.models.nn.Bert4Rec(tensor_schema, block_count=2, head_count=4, hidden_size=256, max_seq_len=100, dropout_rate=0.1, pass_per_transformer_block_count=1, enable_positional_embedding=True, enable_embedding_tying=False, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)
Implements BERT training-validation loop
- __init__(tensor_schema, block_count=2, head_count=4, hidden_size=256, max_seq_len=100, dropout_rate=0.1, pass_per_transformer_block_count=1, enable_positional_embedding=True, enable_embedding_tying=False, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)
- Parameters
(TensorSchema) (tensor_schema) – Tensor schema of features.
block_count (
int
) – Number of Transformer blocks. Default:2
.head_count (
int
) – Number of Attention heads. Default:4
.hidden_size (
int
) – Hidden size of transformer. Default:256
.max_seq_len (
int
) – Max length of sequence. Default:100
.(float) (dropout_rate) – Dropout rate. Default:
0.1
.pass_per_transformer_block_count (
int
) – Number of times to pass data over each Transformer block. Default:1
.enable_positional_embedding (
bool
) – Add positional embedding to the result. Default:True
.enable_embedding_tying (
bool
) – Use embedding tying head. If True - result scores are calculated by dot product of input and output embeddings, if False - default linear layer is applied to calculate logits for each item. Default:False
.loss_type (
str
) – Loss type. Possible values:"CE"
,"BCE"
. Default:CE
.(Optional[int]) (loss_sample_count) – Sample count to calculate loss. Default:
None
.negative_sampling_strategy (
str
) – Negative sampling strategy to calculate loss on sampled negatives. Is used when large count of items in dataset. Possible values:"global_uniform"
,"inbatch"
Default:global_uniform
.negatives_sharing (
bool
) – Apply negative sharing in calculating sampled logits. Default:False
.optimizer_factory (
OptimizerFactory
) – Optimizer factory. Default:FatOptimizerFactory
.lr_scheduler_factory (
Optional
[LRSchedulerFactory
]) – Learning rate schedule factory. Default:None
.
- predict(batch, candidates_to_score=None)
- Parameters
(Bert4RecPredictionBatch) (batch) – Batch of prediction data.
candidates_to_score (
Optional
[LongTensor
]) – Item ids to calculate scores. Default:None
.
- Return type
Tensor
- Returns
Calculated scores on prediction batch.
SasRec
- class replay.models.nn.SasRec(tensor_schema, block_count=2, head_count=1, hidden_size=50, max_seq_len=200, dropout_rate=0.2, ti_modification=False, time_span=256, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)
SASRec Lightning module.
You can get initialization parameters with attribute hparams for object of SasRec instance.
- __init__(tensor_schema, block_count=2, head_count=1, hidden_size=50, max_seq_len=200, dropout_rate=0.2, ti_modification=False, time_span=256, loss_type='CE', loss_sample_count=None, negative_sampling_strategy='global_uniform', negatives_sharing=False, optimizer_factory=<replay.models.nn.optimizer_utils.optimizer_factory.FatOptimizerFactory object>, lr_scheduler_factory=None)
- Parameters
tensor_schema (
TensorSchema
) – Tensor schema of features.block_count (
int
) – Number of Transformer blocks. Default:2
.head_count (
int
) – Number of Attention heads. Default:1
.hidden_size (
int
) – Hidden size of transformer. Default:50
.max_seq_len (
int
) – Max length of sequence. Default:200
.dropout_rate (
float
) – Dropout rate. Default:0.2
.ti_modification (
bool
) – Enable time relation. Default:False
.time_span (
int
) – Time span value. Default:256
.loss_type (
str
) – Loss type. Possible values:"CE"
,"BCE"
. Default:CE
.(Optional[int]) (loss_sample_count) – Sample count to calculate loss. Default:
None
.negative_sampling_strategy (
str
) – Negative sampling strategy to calculate loss on sampled negatives. Is used when large count of items in dataset. Possible values:"global_uniform"
,"inbatch"
Default:global_uniform
.negatives_sharing (
bool
) – Apply negative sharing in calculating sampled logits. Default:False
.optimizer_factory (
OptimizerFactory
) – Optimizer factory. Default:FatOptimizerFactory
.lr_scheduler_factory (
Optional
[LRSchedulerFactory
]) – Learning rate schedule factory. Default:None
.
- predict(batch, candidates_to_score=None)
- Parameters
batch (
SasRecPredictionBatch
) – Batch of prediction data.candidates_to_score (
Optional
[LongTensor
]) – Item ids to calculate scores. Default:None
.
- Return type
Tensor
- Returns
Calculated scores.
Compiled sequential models
Sequential models like SasRec and Bert4Rec can be converted to ONNX format for fast inference on CPU.
SasRecCompiled
- class replay.models.nn.sequential.compiled.SasRecCompiled(compiled_model, schema)
SasRec CPU-optimized model for inference via OpenVINO. It is recommended to compile model with
compile
method and passSasRec
checkpoint or the model object itself into it. It is also possible to compile model by yourself and pass it to the__init__
withTensorSchema
.Note that compilation requires disk write (and maybe delete) permission.
- classmethod compile(model, mode='one_query', batch_size=None, num_candidates_to_score=None, num_threads=None, onnx_path=None)
Model compilation.
- Parameters
model (
Union
[SasRec
,str
,Path
]) – Path to lightning SasRec model saved in .ckpt format or the SasRec object itself.mode (
Literal
['batch'
,'one_query'
,'dynamic_batch_size'
]) –Inference mode, defines shape of inputs. Could be one of [
one_query
,batch
,dynamic_batch_size
].one_query
- sets input shape to [1, max_seq_len]batch
- sets input shape to [batch_size, max_seq_len]dynamic_batch_size
- sets batch_size to dynamic range [?, max_seq_len]Default:
one_query
.batch_size (
Optional
[int
]) – Batch size, required forbatch
mode. Default:None
.num_candidates_to_score (
Optional
[int
]) –Number of item ids to calculate scores. Could be one of [
None
,-1
,N
].-1
- sets candidates_to_score shape to dynamic range [1, ?]N
- sets candidates_to_score shape to [1, N]None
- disable candidates_to_score usageDefault:
None
.num_threads (
Optional
[int
]) – Number of CPU threads to use. Must be a natural number orNone
. IfNone
, then compiler will set this parameter automatically. Default:None
.onnx_path (
Optional
[str
]) – Save ONNX model to path, if defined. Default:None
.
- Return type
- predict(batch, candidates_to_score=None)
Inference on one batch.
- Parameters
batch (
SasRecPredictionBatch
) – Prediction input.candidates_to_score (
Optional
[LongTensor
]) – Item ids to calculate scores. Default:None
.
- Return type
Tensor
- Returns
Tensor with scores.
Bert4RecCompiled
- class replay.models.nn.sequential.compiled.Bert4RecCompiled(compiled_model, schema)
Bert4Rec CPU-optimized model for inference via OpenVINO. It is recommended to compile model with
compile
method and passBert4Rec
checkpoint or the model object itself into it. It is also possible to compile model by yourself and pass it to the__init__
withTensorSchema
.Note that compilation requires disk write (and maybe delete) permission.
- classmethod compile(model, mode='one_query', batch_size=None, num_candidates_to_score=None, num_threads=None, onnx_path=None)
Model compilation.
- Parameters
model (
Union
[Bert4Rec
,str
,Path
]) – Path to lightning Bert4Rec model saved in .ckpt format or the Bert4Rec object itself.mode (
Literal
['batch'
,'one_query'
,'dynamic_batch_size'
]) –Inference mode, defines shape of inputs. Could be one of [
one_query
,batch
,dynamic_batch_size
].one_query
- sets input shape to [1, max_seq_len]batch
- sets input shape to [batch_size, max_seq_len]dynamic_batch_size
- sets batch_size to dynamic range [?, max_seq_len]Default:
one_query
.batch_size (
Optional
[int
]) – Batch size, required forbatch
mode. Default:None
.num_candidates_to_score (
Optional
[int
]) –Number of item ids to calculate scores. Could be one of [
None
,-1
,N
].-1
- sets candidates_to_score shape to dynamic range [1, ?]N
- sets candidates_to_score shape to [1, N]None
- disables candidates_to_score usageDefault:
None
.num_threads (
Optional
[int
]) – Number of CPU threads to use. Must be a natural number orNone
. IfNone
, then compiler will set this parameter automatically. Default:None
.onnx_path (
Optional
[str
]) – Save ONNX model to path, if defined. Default:None
.
- Return type
- predict(batch, candidates_to_score=None)
Inference on one batch.
- Parameters
batch (
Bert4RecPredictionBatch
) – Prediction input.candidates_to_score (
Optional
[LongTensor
]) – Item ids to calculate scores. Default:None
.
- Return type
Tensor
- Returns
Tensor with scores.
Features for easy training and validation with Lightning
Replay provides Callbacks and Postprocessors to make the model training and validation process as convenient as possible.
During training:
You can define the list of validation metrics and the model is determined to be the best and is saved if the metric updates its value during validation.
During inference:
You can get the recommendations in four formats: PySpark DataFrame, Pandas DataFrame, Polars DataFrame, PyTorch tensors. Each of the types corresponds a callback. You can filter the results using postprocessors strategy.
For a better understanding, you should look at examples of using neural network models.
Callbacks
ValidationMetricsCallback
- class replay.models.nn.sequential.callbacks.ValidationMetricsCallback(metrics=None, ks=None, postprocessors=None, item_count=None)
Callback for validation and testing stages.
If multiple validation/testing dataloaders are used, the suffix of the metric name will contain the serial number of the dataloader.
- __init__(metrics=None, ks=None, postprocessors=None, item_count=None)
- Parameters
metrics (
Optional
[List
[Literal
['recall'
,'precision'
,'ndcg'
,'map'
,'mrr'
,'novelty'
,'coverage'
]]]) – Sequence of metrics to calculate.ks (
Optional
[List
[int
]]) – highest k scores in ranking. Default: will be [1, 5, 10, 20].postprocessors (
Optional
[List
[BasePostProcessor
]]) – postprocessors to validation stage.item_count (
Optional
[int
]) – the total number of items in the dataset, required only for Coverage calculations.
SparkPredictionCallback
- class replay.models.nn.sequential.callbacks.SparkPredictionCallback(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)
Callback for prediction stage with spark data frame
- __init__(top_k, query_column, item_column, rating_column, spark_session, postprocessors=None)
- Parameters
top_k (
int
) – Takes the highest k scores in the ranking.query_column (
str
) – query column name.item_column (
str
) – item column name.rating_column (
str
) – rating column name.postprocessors (
Optional
[List
[BasePostProcessor
]]) – postprocessors to apply.
- get_result()
- Return type
TypeVar
(_T
)- Returns
prediction result
PandasPredictionCallback
- class replay.models.nn.sequential.callbacks.PandasPredictionCallback(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
Callback for predition stage with pandas data frame
- __init__(top_k, query_column, item_column, rating_column='rating', postprocessors=None)
- Parameters
top_k (
int
) – Takes the highest k scores in the ranking.query_column (
str
) – query column name.item_column (
str
) – item column name.rating_column (
str
) – rating column name.postprocessors (
Optional
[List
[BasePostProcessor
]]) – postprocessors to apply.
- get_result()
- Return type
TypeVar
(_T
)- Returns
prediction result
TorchPredictionCallback
- class replay.models.nn.sequential.callbacks.TorchPredictionCallback(top_k, postprocessors=None)
Callback for predition stage with tuple of tensors
- __init__(top_k, postprocessors=None)
- Parameters
top_k (
int
) – Takes the highest k scores in the ranking.postprocessors (
Optional
[List
[BasePostProcessor
]]) – postprocessors to apply.
- get_result()
- Return type
TypeVar
(_T
)- Returns
prediction result