Preprocessing
This module contains tools for preprocessing data including:
filters
processors for feature transforms
Filters
Select or remove data by some criteria
- class replay.preprocessing.filters.EntityDaysFilter(days=10, first=True, entity_column='user_id', timestamp_column='timestamp')
Get first/last
days
of interactions by entity.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.orderBy('user_id', 'item_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
Get first day by users:
>>> EntityDaysFilter(1, True, entity_column='user_id').transform(log_sp).orderBy('user_id', 'item_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
Get last day by item:
>>> EntityDaysFilter(1, False, entity_column='item_id').transform(log_sp).orderBy('item_id', 'user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i1| 1.0|2020-01-01 00:04:15| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.GlobalDaysFilter(days=10, first=True, timestamp_column='timestamp')
Select first/last days from
interactions
.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> GlobalDaysFilter(1).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
>>> GlobalDaysFilter(1, first=False).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.InteractionEntriesFilter(query_column='user_id', item_column='item_id', min_inter_per_user=None, max_inter_per_user=None, min_inter_per_item=None, max_inter_per_item=None, allow_caching=True)
Remove interactions less than minimum constraint value and greater than maximum constraint value for each column.
>>> import pandas as pd >>> interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> interactions user_id item_id rating 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> filtered_interactions = InteractionEntriesFilter(min_inter_per_user=4).transform(interactions) >>> filtered_interactions user_id item_id rating 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4
- class replay.preprocessing.filters.LowRatingFilter(value, rating_column='rating')
Remove records with records less than
value
incolumn
.>>> import pandas as pd >>> data_frame = pd.DataFrame({"rating": [1, 5, 3.5, 4]}) >>> LowRatingFilter(3.5).transform(data_frame) rating 1 5.0 2 3.5 3 4.0
- class replay.preprocessing.filters.MinCountFilter(num_entries, groupby_column='user_id')
Remove entries with entities (e.g. users, items) which are presented in interactions less than num_entries times. The interactions is grouped by groupby_column, which is entry column name, to calculate counts.
>>> import pandas as pd >>> data_frame = pd.DataFrame({"user_id": [1, 1, 2]}) >>> MinCountFilter(2).transform(data_frame) user_id 0 1 1 1
- class replay.preprocessing.filters.NumInteractionsFilter(num_interactions=10, first=True, query_column='user_id', timestamp_column='timestamp', item_column=None)
Get first/last
num_interactions
interactions for each query.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
Only first interaction:
>>> NumInteractionsFilter(1, True, item_column='item_id').transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u3| i1| 1.0|2020-01-01 00:04:15| +-------+-------+------+-------------------+
Only last interaction:
>>> NumInteractionsFilter(1, False).transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> NumInteractionsFilter(1, False, item_column='item_id').transform(log_sp).orderBy('user_id').show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
- class replay.preprocessing.filters.QuantileItemsFilter(alpha_quantile=0.99, items_proportion=0.5, query_column='query_id', item_column='item_id')
Filter is aimed on undersampling the interactions dataset.
Filter algorithm performs undersampling by removing items_proportion of interactions for each items counts that exceeds the alpha_quantile value in distribution. Filter firstly removes popular items (items that have most interactions). Filter also keeps the original relation of items popularity among each other by removing interactions only in range of current item count and quantile count (specified by alpha_quantile).
>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({ ... "user_id": [0, 0, 1, 2, 2, 2, 2], ... "item_id": [0, 2, 1, 1, 2, 2, 2] ... }) >>> log_spark = convert2spark(log_pd) >>> log_spark.show() +-------+-------+ |user_id|item_id| +-------+-------+ | 0| 0| | 0| 2| | 1| 1| | 2| 1| | 2| 2| | 2| 2| | 2| 2| +-------+-------+
>>> QuantileItemsFilter(query_column="user_id").transform(log_spark).show() +-------+-------+ |user_id|item_id| +-------+-------+ | 0| 0| | 1| 1| | 2| 1| | 2| 2| | 2| 2| | 0| 2| +-------+-------+
- class replay.preprocessing.filters.TimePeriodFilter(start_date=None, end_date=None, timestamp_column='timestamp', time_column_format='%Y-%m-%d %H:%M:%S')
Select a part of data between
[start_date, end_date)
.>>> import pandas as pd >>> from replay.utils.spark_utils import convert2spark >>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"], ... "item_id": ["i1", "i2","i3", "i1", "i2","i3"], ... "rating": [1., 0.5, 3, 1, 0, 1], ... "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00", ... "2020-02-01 00:00:01", "2020-01-01 00:04:15", ... "2020-01-02 00:04:14", "2020-01-05 23:59:59"]}, ... ) >>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601") >>> log_sp = convert2spark(log_pd) >>> log_sp.show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u2| i2| 0.5|2020-02-01 00:00:00| | u2| i3| 3.0|2020-02-01 00:00:01| | u3| i1| 1.0|2020-01-01 00:04:15| | u3| i2| 0.0|2020-01-02 00:04:14| | u3| i3| 1.0|2020-01-05 23:59:59| +-------+-------+------+-------------------+
>>> TimePeriodFilter( ... start_date="2020-01-01 14:00:00", ... end_date=datetime(2020, 1, 3, 0, 0, 0) ... ).transform(log_sp).show() +-------+-------+------+-------------------+ |user_id|item_id|rating| timestamp| +-------+-------+------+-------------------+ | u1| i1| 1.0|2020-01-01 23:59:59| | u3| i2| 0.0|2020-01-02 00:04:14| +-------+-------+------+-------------------+
CSRConverter
Convert input data to csr sparse matrix.
- class replay.preprocessing.converter.CSRConverter(first_dim_column, second_dim_column, data_column=None, row_count=None, column_count=None, allow_collect_to_master=False)
Convert input data to csr sparse matrix. Where
data_column
,first_dim_column
andsecond_dim_column
satisfy the relationshipmatrix[first_dim_column[i], second_dim_column[i]] = data_column[i]
.>>> import pandas as pd >>> interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> interactions user_id item_id rating 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> csr_interactions = CSRConverter( ... first_dim_column="user_id", ... second_dim_column="item_id", ... data_column="rating", ... ).transform(interactions) >>> csr_interactions.todense() matrix([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [ 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 3, 0], [ 0, 0, 0, 0, 0, 3, 0, 0, 2, 0, 0, 1], [ 0, 0, 1, 0, 3, 4, 0, 0, 0, 12, 0, 0]])
- transform(data)
Transform Spark or Pandas Data Frame to csr.
- Parameters
data (
Union
[DataFrame
,DataFrame
,DataFrame
]) – Spark or Pandas Data Frame containing columnsfirst_dim_column
,second_dim_column
, and optionaldata_column
.- Return type
csr_matrix
- Returns
Sparse interactions.
Sessionizer
Create and filter sessions from given interactions.
- class replay.preprocessing.sessionizer.Sessionizer(user_column='user_id', time_column='timestamp', session_column='session_id', session_gap=86400, time_column_format='yyyy-MM-dd HH:mm:ss', min_inter_per_session=None, max_inter_per_session=None, min_sessions_per_user=None, max_sessions_per_user=None)
Create and filter sessions from given interactions. Session ids are formed as subtraction between unique users cumulative sum and number of entries inside each user history that are greater than session gap.
>>> import pandas as pd >>> time_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> time_interactions user_id item_id timestamp 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> Sessionizer(session_gap=5).transform(time_interactions) user_id item_id timestamp session_id 0 1 3 1 2 1 1 7 2 2 2 1 10 3 2 3 2 5 3 5 4 2 8 2 5 5 2 11 1 5 6 3 4 3 9 7 3 9 12 8 8 3 2 1 9 9 3 5 4 9
- transform(interactions)
Create and filter sessions from given interactions.
- Parameters
interactions (
Union
[DataFrame
,DataFrame
,DataFrame
]) – DataFrame containing columnsuser_column
,time_column
.- Return type
Union
[DataFrame
,DataFrame
,DataFrame
]- Returns
DataFrame with created and filtered sessions.
Padder (Experimental)
Pad array columns in dataframe.
- class replay.experimental.preprocessing.padder.Padder(pad_columns, padding_side='right', padding_value=0, array_size=None, cut_array=True, cut_side='right')
Pad array columns in dataframe.
>>> import pandas as pd >>> pad_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 1, 2, 2, 3, 3, 3], ... "timestamp": [[1], [1, 2], [1, 2, 4], [1, 2, 4, 6], [4, 7, 12], ... [4, 7, 12, 126], [1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6], ... [1, 2, 3, 4, 5, 6, 7]], ... "item_id": [['a'], ['a', 'b'], ['a', 'b', 'd'], ['a', 'b', 'd', 'f'], ['d', 'e', 'm'], ... ['d', 'e', 'm', 'g'], ['a', 'b', 'c', 'd', 'a'], ['a', 'b', 'c', 'd', 'a', 'f'], ... ['a', 'b', 'c', 'd', 'a', 'f', 'e']] ... }) >>> pad_interactions user_id timestamp item_id 0 1 [1] [a] 1 1 [1, 2] [a, b] 2 1 [1, 2, 4] [a, b, d] 3 1 [1, 2, 4, 6] [a, b, d, f] 4 2 [4, 7, 12] [d, e, m] 5 2 [4, 7, 12, 126] [d, e, m, g] 6 3 [1, 2, 3, 4, 5] [a, b, c, d, a] 7 3 [1, 2, 3, 4, 5, 6] [a, b, c, d, a, f] 8 3 [1, 2, 3, 4, 5, 6, 7] [a, b, c, d, a, f, e] >>> Padder( ... pad_columns=["item_id", "timestamp"], ... padding_side="right", ... padding_value=["[PAD]", 0], ... array_size=5, ... cut_array=True, ... cut_side="right" ... ).transform(pad_interactions) user_id timestamp item_id 0 1 [1, 0, 0, 0, 0] [a, [PAD], [PAD], [PAD], [PAD]] 1 1 [1, 2, 0, 0, 0] [a, b, [PAD], [PAD], [PAD]] 2 1 [1, 2, 4, 0, 0] [a, b, d, [PAD], [PAD]] 3 1 [1, 2, 4, 6, 0] [a, b, d, f, [PAD]] 4 2 [4, 7, 12, 0, 0] [d, e, m, [PAD], [PAD]] 5 2 [4, 7, 12, 126, 0] [d, e, m, g, [PAD]] 6 3 [1, 2, 3, 4, 5] [a, b, c, d, a] 7 3 [2, 3, 4, 5, 6] [b, c, d, a, f] 8 3 [3, 4, 5, 6, 7] [c, d, a, f, e]
- transform(interactions)
Pad dataframe.
- Parameters
interactions (
Union
[DataFrame
,DataFrame
,DataFrame
]) – DataFrame with array columns with names pad_columns.- Return type
Union
[DataFrame
,DataFrame
,DataFrame
]- Returns
DataFrame with padded array columns.
SequenceGenerator (Experimental)
Creating sequences for sequential models.
- class replay.experimental.preprocessing.sequence_generator.SequenceGenerator(groupby_column, orderby_column=None, transform_columns=None, len_window=50, sequence_prefix=None, sequence_suffix='_list', label_prefix='label_', label_suffix=None, get_list_len=False, list_len_column='list_len')
Creating sequences for sequential models.
E.g.,
u1
has purchase sequence<i1, i2, i3, i4>
, then after processing, there will be generated three cases.u1, <i1> | i2
(Which means given user_id
u1
and item_seq<i1>
, model need to predict the next itemi2
.)The other cases are below:
u1, <i1, i2> | i3
u1, <i1, i2, i3> | i4
>>> import pandas as pd >>> time_interactions = pd.DataFrame({ ... "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3], ... "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5], ... "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4] ... }) >>> time_interactions user_id item_id timestamp 0 1 3 1 1 1 7 2 2 1 10 3 3 2 5 3 4 2 8 2 5 2 11 1 6 3 4 3 7 3 9 12 8 3 2 1 9 3 5 4 >>> sequences = ( ... SequenceGenerator( ... groupby_column="user_id", transform_columns=["item_id", "timestamp"] ... ).transform(time_interactions) ... ) >>> sequences user_id item_id_list timestamp_list label_item_id label_timestamp 0 1 [3] [1] 7 2 1 1 [3, 7] [1, 2] 10 3 2 2 [5] [3] 8 2 3 2 [5, 8] [3, 2] 11 1 4 3 [4] [3] 9 12 5 3 [4, 9] [3, 12] 2 1 6 3 [4, 9, 2] [3, 12, 1] 5 4
- transform(interactions)
Create sequences from given interactions.
- Parameters
interactions (
Union
[DataFrame
,DataFrame
,DataFrame
]) – DataFrame.- Return type
Union
[DataFrame
,DataFrame
,DataFrame
]- Returns
DataFrame with transformed interactions. Sequential interactions in list.
Data Preparation (Experimental)
Replay has a number of requirements for input data.
We await that input columns are in the form [user_id, item_id, timestamp, relevance]
.
And internal format is a spark DataFrame with indexed integer values for [user_idx, item_idx]
.
You can convert indexes of your Spark DataFrame with Indexer
class.
- class replay.experimental.preprocessing.data_preparator.Indexer(user_col='user_id', item_col='item_id')
This class is used to convert arbitrary id to numerical idx and back.
- fit(users, items)
Creates indexers to map raw id to numerical idx so that spark can handle them. :type users:
DataFrame
:param users: SparkDataFrame containing user column :type items:DataFrame
:param items: SparkDataFrame containing item column :rtype:None
:return:
- inverse_transform(df)
Convert SparkDataFrame to the initial indexes.
- Parameters
df (
DataFrame
) – SparkDataFrame with numericaluser_idx/item_idx
columns- Return type
DataFrame
- Returns
SparkDataFrame with original user/item columns
- transform(df)
Convert raw
user_col
anditem_col
to numericaluser_idx
anditem_idx
- Parameters
df (
DataFrame
) – dataframe with raw indexes- Return type
Optional
[DataFrame
]- Returns
dataframe with converted indexes
If your DataFrame is in the form of Pandas DataFrame and has different column names, you can either
preprocess it yourself with convert2spark
function or use DataPreparator
class
- class replay.experimental.preprocessing.data_preparator.DataPreparator
- Transforms data to a library format:
read as a spark dataframe/ convert pandas dataframe to spark
check for nulls
create relevance/timestamp columns if absent
convert dates to TimestampType
Examples:
Loading log DataFrame
>>> import pandas as pd >>> from replay.experimental.preprocessing.data_preparator import DataPreparator >>> >>> log = pd.DataFrame({"user": [2, 2, 2, 1], ... "item_id": [1, 2, 3, 3], ... "rel": [5, 5, 5, 5]} ... ) >>> dp = DataPreparator() >>> correct_log = dp.transform(data=log, ... columns_mapping={"user_id": "user", ... "item_id": "item_id", ... "relevance": "rel"} ... ) >>> correct_log.show(2) +-------+-------+---------+-------------------+ |user_id|item_id|relevance| timestamp| +-------+-------+---------+-------------------+ | 2| 1| 5.0|2099-01-01 00:00:00| | 2| 2| 5.0|2099-01-01 00:00:00| +-------+-------+---------+-------------------+ only showing top 2 rows
Loading user features
>>> import pandas as pd >>> from replay.experimental.preprocessing.data_preparator import DataPreparator >>> >>> log = pd.DataFrame({"user": ["user1", "user1", "user2"], ... "f0": ["feature1","feature2","feature1"], ... "f1": ["left","left","center"], ... "ts": ["2019-01-01","2019-01-01","2019-01-01"]} ... ) >>> dp = DataPreparator() >>> correct_log = dp.transform(data=log, ... columns_mapping={"user_id": "user"}, ... ) >>> correct_log.show(3) +-------+--------+------+----------+ |user_id| f0| f1| ts| +-------+--------+------+----------+ | user1|feature1| left|2019-01-01| | user1|feature2| left|2019-01-01| | user2|feature1|center|2019-01-01| +-------+--------+------+----------+
- static add_absent_log_cols(dataframe, columns_mapping, default_relevance=1.0, default_ts='2099-01-01')
Add
relevance
andtimestamp
columns with default values ifrelevance
ortimestamp
is absent among mapping keys.- Parameters
dataframe (
DataFrame
) – interactions log to processcolumns_mapping (
Dict
[str
,str
]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:[user_id, user_id, timestamp, relevance]
default_relevance (
float
) – default value for generated relevance columndefault_ts (
str
) – str, default value for generated timestamp column
- Returns
spark DataFrame with generated
timestamp
andrelevance
columns if absent in original dataframe
- check_df(dataframe, columns_mapping)
Check: - if dataframe is not empty, - if columns from
columns_mapping
are present in dataframe - warn about nulls in columns fromcolumns_mapping
- warn about absent oftimestamp/relevance
columns for interactions log - warn about wrong relevance DataType- Parameters
dataframe (
DataFrame
) – spark DataFrame to processcolumns_mapping (
Dict
[str
,str
]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:[user_id, user_id, timestamp, relevance]
columns_mapping
values specifies the nature of the DataFrame: - if both[user_id, item_id]
are present, then the dataframe is a log of interactions. Specifytimestamp, relevance
columns in mapping if available. - if etheruser_id
oritem_id
is present, then the dataframe is a dataframe of user/item features
- Return type
None
- property logger: Logger
- Returns
get library logger
- static read_as_spark_df(data=None, path=None, format_type=None, **kwargs)
Read spark dataframe from file of transform pandas dataframe.
- Parameters
data (
Union
[DataFrame
,DataFrame
,DataFrame
,None
]) – DataFrame to process (pass
ordata
should be defined)path (
Optional
[str
]) – path to data (pass
ordata
should be defined)format_type (
Optional
[str
]) – file type, one of[csv , parquet , json , table]
kwargs – extra arguments passed to
spark.read.<format>(path, **reader_kwargs)
- Return type
DataFrame
- Returns
spark DataFrame
- transform(columns_mapping, data=None, path=None, format_type=None, date_format=None, reader_kwargs=None)
Transforms log, user or item features into a Spark DataFrame
[user_id, user_id, timestamp, relevance]
,[user_id, *features]
, or[item_id, *features]
. Input is either file offormat_type
atpath
, orpandas.DataFrame
orspark.DataFrame
. Transform performs: - dataframe reading/convert to spark DataFrame format - check dataframe (nulls, columns_mapping) - rename columns from mapping to standard names (user_id, user_id, timestamp, relevance) - for interactions log: create absent columns, converttimestamp
column to TimestampType andrelevance
to DoubleType- Parameters
columns_mapping (
Dict
[str
,str
]) – dictionary mapping “key: column name in input DataFrame”. Possible keys:[user_id, user_id, timestamp, relevance]
columns_mapping
values specifies the nature of the DataFrame: - if both[user_id, item_id]
are present, then the dataframe is a log of interactions. Specifytimestamp, relevance
columns in mapping if present. - if etheruser_id
oritem_id
is present, then the dataframe is a dataframe of user/item featuresdata (
Union
[DataFrame
,DataFrame
,DataFrame
,None
]) – DataFrame to processpath (
Optional
[str
]) – path to dataformat_type (
Optional
[str
]) – file type, one of[csv , parquet , json , table]
date_format (
Optional
[str
]) – format for thetimestamp
columnreader_kwargs (
Optional
[Dict
]) – extra arguments passed tospark.read.<format>(path, **reader_kwargs)
- Return type
DataFrame
- Returns
processed DataFrame