Preprocessing
This module contains tools for preprocessing data including:
filters
processors for feature transforms
Filters
Select or remove data by some criteria
- class replay.preprocessing.filters.ConsecutiveDuplicatesFilter(keep='first', query_column='query_id', item_column='item_id', timestamp_column='timestamp')
Removes consecutive duplicate items from sequential dataset.
>>> import datetime as dt >>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> interactions = pd.DataFrame({ ... "user_id": ["u0", "u1", "u1", "u0", "u0", "u0", "u1", "u0"], ... "item_id": ["i0", "i1", "i1", "i2", "i0", "i1", "i2", "i1"], ... "timestamp": [dt.datetime(2024, 1, 1) + dt.timedelta(days=i) for i in range(8)] ... }) >>> interactions = convert2spark(interactions) >>> interactions.show() +-------+-------+-------------------+ |user_id|item_id| timestamp| +-------+-------+-------------------+ | u0| i0|2024-01-01 00:00:00| | u1| i1|2024-01-02 00:00:00| | u1| i1|2024-01-03 00:00:00| | u0| i2|2024-01-04 00:00:00| | u0| i0|2024-01-05 00:00:00| | u0| i1|2024-01-06 00:00:00| | u1| i2|2024-01-07 00:00:00| | u0| i1|2024-01-08 00:00:00| +-------+-------+-------------------+
>>> ConsecutiveDuplicatesFilter(query_column="user_id").transform(interactions).show() +-------+-------+-------------------+ |user_id|item_id| timestamp| +-------+-------+-------------------+ | u0| i0|2024-01-01 00:00:00| | u0| i2|2024-01-04 00:00:00| | u0| i0|2024-01-05 00:00:00| | u0| i1|2024-01-06 00:00:00| | u1| i1|2024-01-02 00:00:00| | u1| i2|2024-01-07 00:00:00| +-------+-------+-------------------+
- class replay.preprocessing.filters.EntityDaysFilter(days=10, first=True, entity_column='user_id', timestamp_column='timestamp')
Get first/last
daysof interactions by entity.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.orderBy('user_id', 'item_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
Get first day by users:
>>> EntityDaysFilter(1, True, entity_column='user_id').transform(log_sp).orderBy('user_id', 'item_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
Get last day by item:
>>> EntityDaysFilter(1, False, entity_column='item_id').transform(log_sp).orderBy('item_id', 'user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i1| 1.0|2020-01-01 00:04:15| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.GlobalDaysFilter(days=10, first=True, timestamp_column='timestamp')
Select first/last days from
interactions.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> GlobalDaysFilter(1).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
>>> GlobalDaysFilter(1, first=False).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.InteractionEntriesFilter(query_column='user_id', item_column='item_id', min_inter_per_user=None, max_inter_per_user=None, min_inter_per_item=None, max_inter_per_item=None, allow_caching=True)
Remove interactions less than minimum constraint value and greater than maximum constraint value for each column.
>>> import pandas as pd >>> interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> interactions user_id item_id rating 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> filtered_interactions = InteractionEntriesFilter(min_inter_per_user=4).transform(interactions) >>> filtered_interactions user_id item_id rating 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4
- class replay.preprocessing.filters.LowRatingFilter(value, rating_column='rating')
Remove records with records less than
valueincolumn.>>> import pandas as pd >>> data_frame = pd.DataFrame({"rating": [1, 5, 3.5, 4]}) >>> LowRatingFilter(3.5).transform(data_frame) rating 1 5.0 2 3.5 3 4.0
- class replay.preprocessing.filters.MinCountFilter(num_entries, groupby_column='user_id')
Remove entries with entities (e.g. users, items) which are presented in interactions less than num_entries times. The interactions is grouped by groupby_column, which is entry column name, to calculate counts.
>>> import pandas as pd >>> data_frame = pd.DataFrame({"user_id": [1, 1, 2]}) >>> MinCountFilter(2).transform(data_frame) user_id 0 1 1 1
- class replay.preprocessing.filters.NumInteractionsFilter(num_interactions=10, first=True, query_column='user_id', timestamp_column='timestamp', item_column=None)
Get first/last
num_interactionsinteractions for each query.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
Only first interaction:
>>> NumInteractionsFilter(1, True, item_column='item_id').transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u3| i1| 1.0|2020-01-01 00:04:15| +-------+-------+------+-------------------+
Only last interaction:
>>> NumInteractionsFilter(1, False).transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> NumInteractionsFilter(1, False, item_column='item_id').transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.QuantileItemsFilter(alpha_quantile=0.99, items_proportion=0.5, query_column='query_id', item_column='item_id')
Filter is aimed on undersampling the interactions dataset.
Filter algorithm performs undersampling by removing items_proportion of interactions for each items counts that exceeds the alpha_quantile value in distribution. Filter firstly removes popular items (items that have most interactions). Filter also keeps the original relation of items popularity among each other by removing interactions only in range of current item count and quantile count (specified by alpha_quantile).
>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({ ... "user_id": [0, 0, 1, 2, 2, 2, 2], ... "item_id": [0, 2, 1, 1, 2, 2, 2] ... }) >>> log_spark = convert2spark(log_pd) >>> log_spark.show() +-------+-------+ |user_id|item_id| +-------+-------+ | 0| 0| | 0| 2| | 1| 1| | 2| 1| | 2| 2| | 2| 2| | 2| 2| +-------+-------+
>>> QuantileItemsFilter(query_column="user_id").transform(log_spark).show() +-------+-------+ |user_id|item_id| +-------+-------+ | 0| 0| | 1| 1| | 2| 1| | 2| 2| | 2| 2| | 0| 2| +-------+-------+
- class replay.preprocessing.filters.TimePeriodFilter(start_date=None, end_date=None, timestamp_column='timestamp', time_column_format='%Y-%m-%d %H:%M:%S')
Select a part of data between
[start_date, end_date).>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> TimePeriodFilter( ... start_date="2020-01-01 14:00:00", ... end_date=datetime(2020, 1, 3, 0, 0, 0) ... ).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
CSRConverter
Convert input data to csr sparse matrix.
- class replay.preprocessing.converter.CSRConverter(first_dim_column, second_dim_column, data_column=None, row_count=None, column_count=None, allow_collect_to_master=False)
Convert input data to csr sparse matrix. Where
data_column,first_dim_columnandsecond_dim_columnsatisfy the relationshipmatrix[first_dim_column[i], second_dim_column[i]] = data_column[i].>>> import pandas as pd >>> interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> interactions user_id item_id rating 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> csr_interactions = CSRConverter( ... first_dim_column="user_id", ... second_dim_column="item_id", ... data_column="rating", ... ).transform(interactions) >>> csr_interactions.todense() matrix([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [ 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 3, 0], [ 0, 0, 0, 0, 0, 3, 0, 0, 2, 0, 0, 1], [ 0, 0, 1, 0, 3, 4, 0, 0, 0, 12, 0, 0]])
- transform(data)
Transform Spark or Pandas Data Frame to csr.
- Parameters
data (
Union[DataFrame,DataFrame,DataFrame]) – Spark or Pandas Data Frame containing columnsfirst_dim_column,second_dim_column, and optionaldata_column.- Return type
csr_matrix- Returns
Sparse interactions.
Sessionizer
Create and filter sessions from given interactions.
- class replay.preprocessing.sessionizer.Sessionizer(user_column='user_id', time_column='timestamp', session_column='session_id', session_gap=86400, time_column_format='yyyy-MM-dd HH:mm:ss', min_inter_per_session=None, max_inter_per_session=None, min_sessions_per_user=None, max_sessions_per_user=None)
Create and filter sessions from given interactions. Session ids are formed as subtraction between unique users cumulative sum and number of entries inside each user history that are greater than session gap.
>>> import pandas as pd >>> time_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> time_interactions user_id item_id timestamp 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> Sessionizer(session_gap=5).transform(time_interactions) user_id item_id timestamp session_id 0 1 3 1 2 1 1 7 2 2 2 1 10 3 2 3 2 5 3 5 4 2 8 2 5 5 2 11 1 5 6 3 4 3 9 7 3 9 12 8 8 3 2 1 9 9 3 5 4 9
- transform(interactions)
Create and filter sessions from given interactions.
- Parameters
interactions (
Union[DataFrame,DataFrame,DataFrame]) – DataFrame containing columnsuser_column,time_column.- Return type
Union[DataFrame,DataFrame,DataFrame]- Returns
DataFrame with created and filtered sessions.
Padder (Experimental)
Pad array columns in dataframe.
- class replay.experimental.preprocessing.padder.Padder(pad_columns, padding_side='right', padding_value=0, array_size=None, cut_array=True, cut_side='right')
Pad array columns in dataframe.
>>> import pandas as pd >>> pad_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 1, 2, 2, 3, 3, 3], ... "timestamp": [[1], [1, 2], [1, 2, 4], [1, 2, 4, 6], [4, 7, 12], ... [4, 7, 12, 126], [1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6], ... [1, 2, 3, 4, 5, 6, 7]], ... "item_id": [['a'], ['a', 'b'], ['a', 'b', 'd'], ['a', 'b', 'd', 'f'], ['d', 'e', 'm'], ... ['d', 'e', 'm', 'g'], ['a', 'b', 'c', 'd', 'a'], ['a', 'b', 'c', 'd', 'a', 'f'], ... ['a', 'b', 'c', 'd', 'a', 'f', 'e']] ... }) >>> pad_interactions user_id timestamp item_id 0 1 [1] [a] 1 1 [1, 2] [a, b] 2 1 [1, 2, 4] [a, b, d] 3 1 [1, 2, 4, 6] [a, b, d, f] 4 2 [4, 7, 12] [d, e, m] 5 2 [4, 7, 12, 126] [d, e, m, g] 6 3 [1, 2, 3, 4, 5] [a, b, c, d, a] 7 3 [1, 2, 3, 4, 5, 6] [a, b, c, d, a, f] 8 3 [1, 2, 3, 4, 5, 6, 7] [a, b, c, d, a, f, e] >>> Padder( ... pad_columns=["item_id", "timestamp"], ... padding_side="right", ... padding_value=["[PAD]", 0], ... array_size=5, ... cut_array=True, ... cut_side="right" ... ).transform(pad_interactions) user_id timestamp item_id 0 1 [1, 0, 0, 0, 0] [a, [PAD], [PAD], [PAD], [PAD]] 1 1 [1, 2, 0, 0, 0] [a, b, [PAD], [PAD], [PAD]] 2 1 [1, 2, 4, 0, 0] [a, b, d, [PAD], [PAD]] 3 1 [1, 2, 4, 6, 0] [a, b, d, f, [PAD]] 4 2 [4, 7, 12, 0, 0] [d, e, m, [PAD], [PAD]] 5 2 [4, 7, 12, 126, 0] [d, e, m, g, [PAD]] 6 3 [1, 2, 3, 4, 5] [a, b, c, d, a] 7 3 [2, 3, 4, 5, 6] [b, c, d, a, f] 8 3 [3, 4, 5, 6, 7] [c, d, a, f, e]
- transform(interactions)
Pad dataframe.
- Parameters
interactions (
Union[DataFrame,DataFrame,DataFrame]) – DataFrame with array columns with names pad_columns.- Return type
Union[DataFrame,DataFrame,DataFrame]- Returns
DataFrame with padded array columns.
SequenceGenerator (Experimental)
Creating sequences for sequential models.
- class replay.experimental.preprocessing.sequence_generator.SequenceGenerator(groupby_column, orderby_column=None, transform_columns=None, len_window=50, sequence_prefix=None, sequence_suffix='_list', label_prefix='label_', label_suffix=None, get_list_len=False, list_len_column='list_len')
Creating sequences for sequential models.
E.g.,
u1has purchase sequence<i1, i2, i3, i4>, then after processing, there will be generated three cases.u1, <i1> | i2(Which means given user_id
u1and item_seq<i1>, model need to predict the next itemi2.)The other cases are below:
u1, <i1, i2> | i3u1, <i1, i2, i3> | i4>>> import pandas as pd >>> time_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> time_interactions user_id item_id timestamp 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> sequences = ( ... SequenceGenerator( ... groupby_column="user_id", transform_columns=["item_id", "timestamp"] ... ).transform(time_interactions) ... ) >>> sequences user_id item_id_list timestamp_list label_item_id label_timestamp 0 1 [3] [1] 7 2 1 1 [3, 7] [1, 2] 10 3 2 2 [5] [3] 8 2 3 2 [5, 8] [3, 2] 11 1 4 3 [4] [3] 9 12 5 3 [4, 9] [3, 12] 2 1 6 3 [4, 9, 2] [3, 12, 1] 5 4
- transform(interactions)
Create sequences from given interactions.
- Parameters
interactions (
Union[DataFrame,DataFrame,DataFrame]) – DataFrame.- Return type
Union[DataFrame,DataFrame,DataFrame]- Returns
DataFrame with transformed interactions. Sequential interactions in list.
Data Preparation (Experimental)
Replay has a number of requirements for input data.
We await that input columns are in the form [user_id, item_id, timestamp, relevance].
And internal format is a spark DataFrame with indexed integer values for [user_idx, item_idx].
You can convert indexes of your Spark DataFrame with Indexer class.
- class replay.experimental.preprocessing.data_preparator.Indexer(user_col='user_id', item_col='item_id')
This class is used to convert arbitrary id to numerical idx and back.
- fit(users, items)
Creates indexers to map raw id to numerical idx so that spark can handle them. :type users:
DataFrame:param users: SparkDataFrame containing user column :type items:DataFrame:param items: SparkDataFrame containing item column :rtype:None:return:
- inverse_transform(df)
Convert SparkDataFrame to the initial indexes.
- Parameters
df (
DataFrame) – SparkDataFrame with numericaluser_idx/item_idxcolumns- Return type
DataFrame- Returns
SparkDataFrame with original user/item columns
- transform(df)
Convert raw
user_colanditem_colto numericaluser_idxanditem_idx- Parameters
df (
DataFrame) – dataframe with raw indexes- Return type
Optional[DataFrame]- Returns
dataframe with converted indexes
If your DataFrame is in the form of Pandas DataFrame and has different column names, you can either
preprocess it yourself with convert2spark function or use DataPreparator class
- class replay.experimental.preprocessing.data_preparator.DataPreparator
- Transforms data to a library format:
read as a spark dataframe/ convert pandas dataframe to spark
check for nulls
create relevance/timestamp columns if absent
convert dates to TimestampType
Examples:
Loading log DataFrame
>>> import pandas as pd >>> from replay.experimental.preprocessing.data_preparator import DataPreparator >>> >>> log = pd.DataFrame({"user": [2, 2, 2, 1], ... "item_id": [1, 2, 3, 3], ... "rel": [5, 5, 5, 5]} ... ) >>> dp = DataPreparator() >>> correct_log = dp.transform(data=log, ... columns_mapping={"user_id": "user", ... "item_id": "item_id", ... "relevance": "rel"} ... ) >>> correct_log.show(2) +-------+-------+---------+-------------------+ |user_id|item_id|relevance| timestamp| +-------+-------+---------+-------------------+ | 2| 1| 5.0|2099-01-01 00:00:00| | 2| 2| 5.0|2099-01-01 00:00:00| +-------+-------+---------+-------------------+ only showing top 2 rows
Loading user features
>>> import pandas as pd >>> from replay.experimental.preprocessing.data_preparator import DataPreparator >>> >>> log = pd.DataFrame({"user": ["user1", "user1", "user2"], ... "f0": ["feature1","feature2","feature1"], ... "f1": ["left","left","center"], ... "ts": ["2019-01-01","2019-01-01","2019-01-01"]} ... ) >>> dp = DataPreparator() >>> correct_log = dp.transform(data=log, ... columns_mapping={"user_id": "user"}, ... ) >>> correct_log.show(3) +-------+--------+------+----------+ |user_id| f0| f1| ts| +-------+--------+------+----------+ | user1|feature1| left|2019-01-01| | user1|feature2| left|2019-01-01| | user2|feature1|center|2019-01-01| +-------+--------+------+----------+
- static add_absent_log_cols(dataframe, columns_mapping, default_relevance=1.0, default_ts='2099-01-01')
Add
relevanceandtimestampcolumns with default values ifrelevanceortimestampis absent among mapping keys.- Parameters
dataframe (
DataFrame) – interactions log to processcolumns_mapping (
dict[str,str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:[user_id, user_id, timestamp, relevance]default_relevance (
float) – default value for generated relevance columndefault_ts (
str) – str, default value for generated timestamp column
- Returns
spark DataFrame with generated
timestampandrelevancecolumns if absent in original dataframe
- check_df(dataframe, columns_mapping)
Check: - if dataframe is not empty, - if columns from
columns_mappingare present in dataframe - warn about nulls in columns fromcolumns_mapping- warn about absent oftimestamp/relevancecolumns for interactions log - warn about wrong relevance DataType- Parameters
dataframe (
DataFrame) – spark DataFrame to processcolumns_mapping (
dict[str,str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:[user_id, user_id, timestamp, relevance]columns_mappingvalues specifies the nature of the DataFrame: - if both[user_id, item_id]are present, then the dataframe is a log of interactions. Specifytimestamp, relevancecolumns in mapping if available. - if etheruser_idoritem_idis present, then the dataframe is a dataframe of user/item features
- Return type
None
- property logger: Logger
- Returns
get library logger
- static read_as_spark_df(data=None, path=None, format_type=None, **kwargs)
Read spark dataframe from file of transform pandas dataframe.
- Parameters
data (
Union[DataFrame,DataFrame,DataFrame,None]) – DataFrame to process (passordatashould be defined)path (
Optional[str]) – path to data (passordatashould be defined)format_type (
Optional[str]) – file type, one of[csv , parquet , json , table]kwargs – extra arguments passed to
spark.read.<format>(path, **reader_kwargs)
- Return type
DataFrame- Returns
spark DataFrame
- transform(columns_mapping, data=None, path=None, format_type=None, date_format=None, reader_kwargs=None)
Transforms log, user or item features into a Spark DataFrame
[user_id, user_id, timestamp, relevance],[user_id, *features], or[item_id, *features]. Input is either file offormat_typeatpath, orpandas.DataFrameorspark.DataFrame. Transform performs: - dataframe reading/convert to spark DataFrame format - check dataframe (nulls, columns_mapping) - rename columns from mapping to standard names (user_id, user_id, timestamp, relevance) - for interactions log: create absent columns, converttimestampcolumn to TimestampType andrelevanceto DoubleType- Parameters
columns_mapping (
dict[str,str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:[user_id, user_id, timestamp, relevance]columns_mappingvalues specifies the nature of the DataFrame: - if both[user_id, item_id]are present, then the dataframe is a log of interactions. Specifytimestamp, relevancecolumns in mapping if present. - if etheruser_idoritem_idis present, then the dataframe is a dataframe of user/item featuresdata (
Union[DataFrame,DataFrame,DataFrame,None]) – DataFrame to processpath (
Optional[str]) – path to dataformat_type (
Optional[str]) – file type, one of[csv , parquet , json , table]date_format (
Optional[str]) – format for thetimestampcolumnreader_kwargs (
Optional[dict]) – extra arguments passed tospark.read.<format>(path, **reader_kwargs)
- Return type
DataFrame- Returns
processed DataFrame