Preprocessing
This module contains tools for preprocessing data including:
filters
processors for feature transforms
Filters
Select or remove data by some criteria
- class replay.preprocessing.filters.ConsecutiveDuplicatesFilter(keep='first', query_column='query_id', item_column='item_id', timestamp_column='timestamp')
Removes consecutive duplicate items from sequential dataset.
>>> import datetime as dt >>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> interactions = pd.DataFrame({ ... "user_id": ["u0", "u1", "u1", "u0", "u0", "u0", "u1", "u0"], ... "item_id": ["i0", "i1", "i1", "i2", "i0", "i1", "i2", "i1"], ... "timestamp": [dt.datetime(2024, 1, 1) + dt.timedelta(days=i) for i in range(8)] ... }) >>> interactions = convert2spark(interactions) >>> interactions.show() +-------+-------+-------------------+ |user_id|item_id| timestamp| +-------+-------+-------------------+ | u0| i0|2024-01-01 00:00:00| | u1| i1|2024-01-02 00:00:00| | u1| i1|2024-01-03 00:00:00| | u0| i2|2024-01-04 00:00:00| | u0| i0|2024-01-05 00:00:00| | u0| i1|2024-01-06 00:00:00| | u1| i2|2024-01-07 00:00:00| | u0| i1|2024-01-08 00:00:00| +-------+-------+-------------------+
>>> ConsecutiveDuplicatesFilter(query_column="user_id").transform(interactions).show() +-------+-------+-------------------+ |user_id|item_id| timestamp| +-------+-------+-------------------+ | u0| i0|2024-01-01 00:00:00| | u0| i2|2024-01-04 00:00:00| | u0| i0|2024-01-05 00:00:00| | u0| i1|2024-01-06 00:00:00| | u1| i1|2024-01-02 00:00:00| | u1| i2|2024-01-07 00:00:00| +-------+-------+-------------------+
- class replay.preprocessing.filters.EntityDaysFilter(days=10, first=True, entity_column='user_id', timestamp_column='timestamp')
Get first/last
daysof interactions by entity.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.orderBy('user_id', 'item_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
Get first day by users:
>>> EntityDaysFilter(1, True, entity_column='user_id').transform(log_sp).orderBy('user_id', 'item_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
Get last day by item:
>>> EntityDaysFilter(1, False, entity_column='item_id').transform(log_sp).orderBy('item_id', 'user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i1| 1.0|2020-01-01 00:04:15| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.GlobalDaysFilter(days=10, first=True, timestamp_column='timestamp')
Select first/last days from
interactions.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> GlobalDaysFilter(1).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
>>> GlobalDaysFilter(1, first=False).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.InteractionEntriesFilter(query_column='user_id', item_column='item_id', min_inter_per_user=None, max_inter_per_user=None, min_inter_per_item=None, max_inter_per_item=None, allow_caching=True)
Remove interactions less than minimum constraint value and greater than maximum constraint value for each column.
>>> import pandas as pd >>> interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> interactions user_id item_id rating 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> filtered_interactions = InteractionEntriesFilter(min_inter_per_user=4).transform(interactions) >>> filtered_interactions user_id item_id rating 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4
- class replay.preprocessing.filters.LowRatingFilter(value, rating_column='rating')
Remove records with records less than
valueincolumn.>>> import pandas as pd >>> data_frame = pd.DataFrame({"rating": [1, 5, 3.5, 4]}) >>> LowRatingFilter(3.5).transform(data_frame) rating 1 5.0 2 3.5 3 4.0
- class replay.preprocessing.filters.MinCountFilter(num_entries, groupby_column='user_id')
Remove entries with entities (e.g. users, items) which are presented in interactions less than num_entries times. The interactions is grouped by groupby_column, which is entry column name, to calculate counts.
>>> import pandas as pd >>> data_frame = pd.DataFrame({"user_id": [1, 1, 2]}) >>> MinCountFilter(2).transform(data_frame) user_id 0 1 1 1
- class replay.preprocessing.filters.NumInteractionsFilter(num_interactions=10, first=True, query_column='user_id', timestamp_column='timestamp', item_column=None)
Get first/last
num_interactionsinteractions for each query.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
Only first interaction:
>>> NumInteractionsFilter(1, True, item_column='item_id').transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u3| i1| 1.0|2020-01-01 00:04:15| +-------+-------+------+-------------------+
Only last interaction:
>>> NumInteractionsFilter(1, False).transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> NumInteractionsFilter(1, False, item_column='item_id').transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.QuantileItemsFilter(alpha_quantile=0.99, items_proportion=0.5, query_column='query_id', item_column='item_id')
Filter is aimed on undersampling the interactions dataset.
Filter algorithm performs undersampling by removing items_proportion of interactions for each items counts that exceeds the alpha_quantile value in distribution. Filter firstly removes popular items (items that have most interactions). Filter also keeps the original relation of items popularity among each other by removing interactions only in range of current item count and quantile count (specified by alpha_quantile).
>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({ ... "user_id": [0, 0, 1, 2, 2, 2, 2], ... "item_id": [0, 2, 1, 1, 2, 2, 2] ... }) >>> log_spark = convert2spark(log_pd) >>> log_spark.show() +-------+-------+ |user_id|item_id| +-------+-------+ | 0| 0| | 0| 2| | 1| 1| | 2| 1| | 2| 2| | 2| 2| | 2| 2| +-------+-------+
>>> QuantileItemsFilter(query_column="user_id").transform(log_spark).show() +-------+-------+ |user_id|item_id| +-------+-------+ | 0| 0| | 1| 1| | 2| 1| | 2| 2| | 2| 2| | 0| 2| +-------+-------+
- class replay.preprocessing.filters.TimePeriodFilter(start_date=None, end_date=None, timestamp_column='timestamp', time_column_format='%Y-%m-%d %H:%M:%S')
Select a part of data between
[start_date, end_date).>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> TimePeriodFilter( ... start_date="2020-01-01 14:00:00", ... end_date=datetime(2020, 1, 3, 0, 0, 0) ... ).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
- replay.preprocessing.filters.filter_cold(target, reference, mode='items', query_column='query_id', item_column='item_id')
Filter rows in
targetkeeping only users/items that exist inreference.This function works with pandas, Polars and Spark DataFrames.
targetandreferencemust be of the same backend type. Depending onmode, it removes rows whoseitem_columnand/orquery_columnvalues are not present in the corresponding columns ofreference.Parameters
- targetDataFrameLike
Dataset to be filtered (pandas/Polars/Spark).
- referenceDataFrameLike
Dataset that defines the allowed universe of users/items.
- mode{“items”, “users”, “both”}, default “items”
What to filter: only items, only users, or both.
- query_columnstr, default “query_id”
Name of the user (query) column.
- item_columnstr, default “item_id”
Name of the item column.
Returns
- DataFrameLike
Filtered
targetof the same backend type as the input.
Raises
- ValueError
If
modeis not one of {“items”, “users”, “both”}.- TypeError
If
targetandreferenceare of different backend types.- KeyError
If required columns are missing in either dataset.
- NotImplementedError
If the input dataframe type is not supported.
- Return type
Union[DataFrame, DataFrame, DataFrame]
Utils
- replay.preprocessing.utils.merge_subsets(dfs, columns=None, check_columns=True, subset_for_duplicates=None, on_duplicate='error')
Merge multiple dataframes of the same backend into a single one.
All inputs must be of the same dataframe type (pandas/Polars/Spark). Before concatenation, each dataframe is aligned to a common set of columns: either the provided
columnsor the columns of the first dataframe. Duplicate rows are handled according toon_duplicate.Parameters
- dfsSequence[DataFrameLike]
Dataframes to merge.
- columnsOptional[Sequence[str]]
Columns to align to. If
None, columns of the first dataframe are used.- check_columnsbool
Whether to validate that all inputs have the same column set.
- subset_for_duplicatesOptional[Sequence[str]]
Columns subset used to detect duplicates. If
None, all aligned columns are used.- on_duplicate{“error”, “drop”, “ignore”}
How to handle duplicates: raise an error, drop them, or ignore.
Returns
- DataFrameLike
Merged dataframe of the same backend as the inputs.
Raises
- ValueError
If
dfsis empty, if duplicates are found withon_duplicate='error', or if column sets differ when validation is enabled.- TypeError
If inputs are of different dataframe types.
- Return type
Union[DataFrame, DataFrame, DataFrame]
CSRConverter
Convert input data to csr sparse matrix.
- class replay.preprocessing.converter.CSRConverter(first_dim_column, second_dim_column, data_column=None, row_count=None, column_count=None, allow_collect_to_master=False)
Convert input data to csr sparse matrix. Where
data_column,first_dim_columnandsecond_dim_columnsatisfy the relationshipmatrix[first_dim_column[i], second_dim_column[i]] = data_column[i].>>> import pandas as pd >>> interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> interactions user_id item_id rating 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> csr_interactions = CSRConverter( ... first_dim_column="user_id", ... second_dim_column="item_id", ... data_column="rating", ... ).transform(interactions) >>> csr_interactions.todense() matrix([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [ 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 3, 0], [ 0, 0, 0, 0, 0, 3, 0, 0, 2, 0, 0, 1], [ 0, 0, 1, 0, 3, 4, 0, 0, 0, 12, 0, 0]])
- transform(data)
Transform Spark or Pandas Data Frame to csr.
- Parameters
data (Union[DataFrame, DataFrame, DataFrame]) – Spark or Pandas Data Frame containing columns
first_dim_column,second_dim_column, and optionaldata_column.- Returns
Sparse interactions.
- Return type
csr_matrix
Sessionizer
Create and filter sessions from given interactions.
- class replay.preprocessing.sessionizer.Sessionizer(user_column='user_id', time_column='timestamp', session_column='session_id', session_gap=86400, time_column_format='yyyy-MM-dd HH:mm:ss', min_inter_per_session=None, max_inter_per_session=None, min_sessions_per_user=None, max_sessions_per_user=None)
Create and filter sessions from given interactions. Session ids are formed as subtraction between unique users cumulative sum and number of entries inside each user history that are greater than session gap.
>>> import pandas as pd >>> time_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> time_interactions user_id item_id timestamp 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> Sessionizer(session_gap=5).transform(time_interactions) user_id item_id timestamp session_id 0 1 3 1 2 1 1 7 2 2 2 1 10 3 2 3 2 5 3 5 4 2 8 2 5 5 2 11 1 5 6 3 4 3 9 7 3 9 12 8 8 3 2 1 9 9 3 5 4 9
- transform(interactions)
Create and filter sessions from given interactions.
- Parameters
interactions (Union[DataFrame, DataFrame, DataFrame]) – DataFrame containing columns
user_column,time_column.- Returns
DataFrame with created and filtered sessions.
- Return type
Union[DataFrame, DataFrame, DataFrame]
Padder (Experimental)
Pad array columns in dataframe.
- class replay.experimental.preprocessing.padder.Padder(pad_columns, padding_side='right', padding_value=0, array_size=None, cut_array=True, cut_side='right')
Pad array columns in dataframe.
>>> import pandas as pd >>> pad_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 1, 2, 2, 3, 3, 3], ... "timestamp": [[1], [1, 2], [1, 2, 4], [1, 2, 4, 6], [4, 7, 12], ... [4, 7, 12, 126], [1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6], ... [1, 2, 3, 4, 5, 6, 7]], ... "item_id": [['a'], ['a', 'b'], ['a', 'b', 'd'], ['a', 'b', 'd', 'f'], ['d', 'e', 'm'], ... ['d', 'e', 'm', 'g'], ['a', 'b', 'c', 'd', 'a'], ['a', 'b', 'c', 'd', 'a', 'f'], ... ['a', 'b', 'c', 'd', 'a', 'f', 'e']] ... }) >>> pad_interactions user_id timestamp item_id 0 1 [1] [a] 1 1 [1, 2] [a, b] 2 1 [1, 2, 4] [a, b, d] 3 1 [1, 2, 4, 6] [a, b, d, f] 4 2 [4, 7, 12] [d, e, m] 5 2 [4, 7, 12, 126] [d, e, m, g] 6 3 [1, 2, 3, 4, 5] [a, b, c, d, a] 7 3 [1, 2, 3, 4, 5, 6] [a, b, c, d, a, f] 8 3 [1, 2, 3, 4, 5, 6, 7] [a, b, c, d, a, f, e] >>> Padder( ... pad_columns=["item_id", "timestamp"], ... padding_side="right", ... padding_value=["[PAD]", 0], ... array_size=5, ... cut_array=True, ... cut_side="right" ... ).transform(pad_interactions) user_id timestamp item_id 0 1 [1, 0, 0, 0, 0] [a, [PAD], [PAD], [PAD], [PAD]] 1 1 [1, 2, 0, 0, 0] [a, b, [PAD], [PAD], [PAD]] 2 1 [1, 2, 4, 0, 0] [a, b, d, [PAD], [PAD]] 3 1 [1, 2, 4, 6, 0] [a, b, d, f, [PAD]] 4 2 [4, 7, 12, 0, 0] [d, e, m, [PAD], [PAD]] 5 2 [4, 7, 12, 126, 0] [d, e, m, g, [PAD]] 6 3 [1, 2, 3, 4, 5] [a, b, c, d, a] 7 3 [2, 3, 4, 5, 6] [b, c, d, a, f] 8 3 [3, 4, 5, 6, 7] [c, d, a, f, e]
- transform(interactions)
Pad dataframe.
- Parameters
interactions (Union[DataFrame, DataFrame, DataFrame]) – DataFrame with array columns with names pad_columns.
- Returns
DataFrame with padded array columns.
- Return type
Union[DataFrame, DataFrame, DataFrame]
SequenceGenerator (Experimental)
Creating sequences for sequential models.
- class replay.experimental.preprocessing.sequence_generator.SequenceGenerator(groupby_column, orderby_column=None, transform_columns=None, len_window=50, sequence_prefix=None, sequence_suffix='_list', label_prefix='label_', label_suffix=None, get_list_len=False, list_len_column='list_len')
Creating sequences for sequential models.
E.g.,
u1has purchase sequence<i1, i2, i3, i4>, then after processing, there will be generated three cases.u1, <i1> | i2(Which means given user_id
u1and item_seq<i1>, model need to predict the next itemi2.)The other cases are below:
u1, <i1, i2> | i3u1, <i1, i2, i3> | i4>>> import pandas as pd >>> time_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> time_interactions user_id item_id timestamp 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> sequences = ( ... SequenceGenerator( ... groupby_column="user_id", transform_columns=["item_id", "timestamp"] ... ).transform(time_interactions) ... ) >>> sequences user_id item_id_list timestamp_list label_item_id label_timestamp 0 1 [3] [1] 7 2 1 1 [3, 7] [1, 2] 10 3 2 2 [5] [3] 8 2 3 2 [5, 8] [3, 2] 11 1 4 3 [4] [3] 9 12 5 3 [4, 9] [3, 12] 2 1 6 3 [4, 9, 2] [3, 12, 1] 5 4
- transform(interactions)
Create sequences from given interactions.
- Parameters
interactions (Union[DataFrame, DataFrame, DataFrame]) – DataFrame.
- Returns
DataFrame with transformed interactions. Sequential interactions in list.
- Return type
Union[DataFrame, DataFrame, DataFrame]
Data Preparation (Experimental)
Replay has a number of requirements for input data.
We await that input columns are in the form [user_id, item_id, timestamp, relevance].
And internal format is a spark DataFrame with indexed integer values for [user_idx, item_idx].
You can convert indexes of your Spark DataFrame with Indexer class.
- class replay.experimental.preprocessing.data_preparator.Indexer(user_col='user_id', item_col='item_id')
This class is used to convert arbitrary id to numerical idx and back.
- fit(users, items)
Creates indexers to map raw id to numerical idx so that spark can handle them. :param users: SparkDataFrame containing user column :param items: SparkDataFrame containing item column :return:
- inverse_transform(df)
Convert SparkDataFrame to the initial indexes.
- Parameters
df (DataFrame) – SparkDataFrame with numerical
user_idx/item_idxcolumns- Returns
SparkDataFrame with original user/item columns
- Return type
DataFrame
- transform(df)
Convert raw
user_colanditem_colto numericaluser_idxanditem_idx- Parameters
df (DataFrame) – dataframe with raw indexes
- Returns
dataframe with converted indexes
- Return type
Optional[DataFrame]
If your DataFrame is in the form of Pandas DataFrame and has different column names, you can either
preprocess it yourself with convert2spark function or use DataPreparator class
- class replay.experimental.preprocessing.data_preparator.DataPreparator
- Transforms data to a library format:
read as a spark dataframe/ convert pandas dataframe to spark
check for nulls
create relevance/timestamp columns if absent
convert dates to TimestampType
Examples:
Loading log DataFrame
>>> import pandas as pd >>> from replay.experimental.preprocessing.data_preparator import DataPreparator >>> >>> log = pd.DataFrame({"user": [2, 2, 2, 1], ... "item_id": [1, 2, 3, 3], ... "rel": [5, 5, 5, 5]} ... ) >>> dp = DataPreparator() >>> correct_log = dp.transform(data=log, ... columns_mapping={"user_id": "user", ... "item_id": "item_id", ... "relevance": "rel"} ... ) >>> correct_log.show(2) +-------+-------+---------+-------------------+ |user_id|item_id|relevance| timestamp| +-------+-------+---------+-------------------+ | 2| 1| 5.0|2099-01-01 00:00:00| | 2| 2| 5.0|2099-01-01 00:00:00| +-------+-------+---------+-------------------+ only showing top 2 rows
Loading user features
>>> import pandas as pd >>> from replay.experimental.preprocessing.data_preparator import DataPreparator >>> >>> log = pd.DataFrame({"user": ["user1", "user1", "user2"], ... "f0": ["feature1","feature2","feature1"], ... "f1": ["left","left","center"], ... "ts": ["2019-01-01","2019-01-01","2019-01-01"]} ... ) >>> dp = DataPreparator() >>> correct_log = dp.transform(data=log, ... columns_mapping={"user_id": "user"}, ... ) >>> correct_log.show(3) +-------+--------+------+----------+ |user_id| f0| f1| ts| +-------+--------+------+----------+ | user1|feature1| left|2019-01-01| | user1|feature2| left|2019-01-01| | user2|feature1|center|2019-01-01| +-------+--------+------+----------+
- static add_absent_log_cols(dataframe, columns_mapping, default_relevance=1.0, default_ts='2099-01-01')
Add
relevanceandtimestampcolumns with default values ifrelevanceortimestampis absent among mapping keys.- Parameters
dataframe (DataFrame) – interactions log to process
columns_mapping (dict[str, str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:
[user_id, user_id, timestamp, relevance]default_relevance (float) – default value for generated relevance column
default_ts (str) – str, default value for generated timestamp column
- Returns
spark DataFrame with generated
timestampandrelevancecolumns if absent in original dataframe
- check_df(dataframe, columns_mapping)
Check: - if dataframe is not empty, - if columns from
columns_mappingare present in dataframe - warn about nulls in columns fromcolumns_mapping- warn about absent oftimestamp/relevancecolumns for interactions log - warn about wrong relevance DataType- Parameters
dataframe (DataFrame) – spark DataFrame to process
columns_mapping (dict[str, str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:
[user_id, user_id, timestamp, relevance]columns_mappingvalues specifies the nature of the DataFrame: - if both[user_id, item_id]are present, then the dataframe is a log of interactions. Specifytimestamp, relevancecolumns in mapping if available. - if etheruser_idoritem_idis present, then the dataframe is a dataframe of user/item features
- property logger: Logger
- Returns
get library logger
- static read_as_spark_df(data=None, path=None, format_type=None, **kwargs)
Read spark dataframe from file of transform pandas dataframe.
- Parameters
data (Optional[Union[DataFrame, DataFrame, DataFrame]]) – DataFrame to process (
passordatashould be defined)path (Optional[str]) – path to data (
passordatashould be defined)format_type (Optional[str]) – file type, one of
[csv , parquet , json , table]kwargs – extra arguments passed to
spark.read.<format>(path, **reader_kwargs)
- Returns
spark DataFrame
- Return type
DataFrame
- transform(columns_mapping, data=None, path=None, format_type=None, date_format=None, reader_kwargs=None)
Transforms log, user or item features into a Spark DataFrame
[user_id, user_id, timestamp, relevance],[user_id, *features], or[item_id, *features]. Input is either file offormat_typeatpath, orpandas.DataFrameorspark.DataFrame. Transform performs: - dataframe reading/convert to spark DataFrame format - check dataframe (nulls, columns_mapping) - rename columns from mapping to standard names (user_id, user_id, timestamp, relevance) - for interactions log: create absent columns, converttimestampcolumn to TimestampType andrelevanceto DoubleType- Parameters
columns_mapping (dict[str, str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:
[user_id, user_id, timestamp, relevance]columns_mappingvalues specifies the nature of the DataFrame: - if both[user_id, item_id]are present, then the dataframe is a log of interactions. Specifytimestamp, relevancecolumns in mapping if present. - if etheruser_idoritem_idis present, then the dataframe is a dataframe of user/item featuresdata (Optional[Union[DataFrame, DataFrame, DataFrame]]) – DataFrame to process
path (Optional[str]) – path to data
format_type (Optional[str]) – file type, one of
[csv , parquet , json , table]date_format (Optional[str]) – format for the
timestampcolumnreader_kwargs (Optional[dict]) – extra arguments passed to
spark.read.<format>(path, **reader_kwargs)
- Returns
processed DataFrame
- Return type
DataFrame