Preprocessing

This module contains tools for preprocessing data including:

  • filters

  • processors for feature transforms

Filters

Select or remove data by some criteria

class replay.preprocessing.filters.EntityDaysFilter(days=10, first=True, entity_column='user_id', timestamp_column='timestamp')

Get first/last days of interactions by entity.

>>> import pandas as pd
>>> from replay.utils.spark_utils import convert2spark
>>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"],
...                     "item_id": ["i1", "i2","i3", "i1", "i2","i3"],
...                     "rating": [1., 0.5, 3, 1, 0, 1],
...                     "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00",
...                                   "2020-02-01 00:00:01", "2020-01-01 00:04:15",
...                                   "2020-01-02 00:04:14", "2020-01-05 23:59:59"]},
...             )
>>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601")
>>> log_sp = convert2spark(log_pd)
>>> log_sp.orderBy('user_id', 'item_id').show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
|     u3|     i3|   1.0|2020-01-05 23:59:59|
+-------+-------+------+-------------------+

Get first day by users:

>>> EntityDaysFilter(1, True, entity_column='user_id').transform(log_sp).orderBy('user_id', 'item_id').show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
+-------+-------+------+-------------------+

Get last day by item:

>>> EntityDaysFilter(1, False, entity_column='item_id').transform(log_sp).orderBy('item_id', 'user_id').show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
+-------+-------+------+-------------------+
class replay.preprocessing.filters.GlobalDaysFilter(days=10, first=True, timestamp_column='timestamp')

Select first/last days from interactions.

>>> import pandas as pd
>>> from replay.utils.spark_utils import convert2spark
>>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"],
...                     "item_id": ["i1", "i2","i3", "i1", "i2","i3"],
...                     "rating": [1., 0.5, 3, 1, 0, 1],
...                     "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00",
...                                   "2020-02-01 00:00:01", "2020-01-01 00:04:15",
...                                   "2020-01-02 00:04:14", "2020-01-05 23:59:59"]},
...             )
>>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601")
>>> log_sp = convert2spark(log_pd)
>>> log_sp.show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
|     u3|     i3|   1.0|2020-01-05 23:59:59|
+-------+-------+------+-------------------+
>>> GlobalDaysFilter(1).transform(log_sp).show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
+-------+-------+------+-------------------+
>>> GlobalDaysFilter(1, first=False).transform(log_sp).show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
+-------+-------+------+-------------------+
class replay.preprocessing.filters.InteractionEntriesFilter(query_column='user_id', item_column='item_id', min_inter_per_user=None, max_inter_per_user=None, min_inter_per_item=None, max_inter_per_item=None, allow_caching=True)

Remove interactions less than minimum constraint value and greater than maximum constraint value for each column.

>>> import pandas as pd
>>> interactions = pd.DataFrame({
...    "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3],
...    "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5],
...    "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4]
... })
>>> interactions
    user_id  item_id  rating
0        1        3       1
1        1        7       2
2        1       10       3
3        2        5       3
4        2        8       2
5        2       11       1
6        3        4       3
7        3        9      12
8        3        2       1
9        3        5       4
>>> filtered_interactions = InteractionEntriesFilter(min_inter_per_user=4).transform(interactions)
>>> filtered_interactions
    user_id  item_id  rating
6        3        4       3
7        3        9      12
8        3        2       1
9        3        5       4
class replay.preprocessing.filters.LowRatingFilter(value, rating_column='rating')

Remove records with records less than value in column.

>>> import pandas as pd
>>> data_frame = pd.DataFrame({"rating": [1, 5, 3.5, 4]})
>>> LowRatingFilter(3.5).transform(data_frame)
     rating
1       5.0
2       3.5
3       4.0
class replay.preprocessing.filters.MinCountFilter(num_entries, groupby_column='user_id')

Remove entries with entities (e.g. users, items) which are presented in interactions less than num_entries times. The interactions is grouped by groupby_column, which is entry column name, to calculate counts.

>>> import pandas as pd
>>> data_frame = pd.DataFrame({"user_id": [1, 1, 2]})
>>> MinCountFilter(2).transform(data_frame)
    user_id
0         1
1         1
class replay.preprocessing.filters.NumInteractionsFilter(num_interactions=10, first=True, query_column='user_id', timestamp_column='timestamp', item_column=None)

Get first/last num_interactions interactions for each query.

>>> import pandas as pd
>>> from replay.utils.spark_utils import convert2spark
>>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"],
...                     "item_id": ["i1", "i2","i3", "i1", "i2","i3"],
...                     "rating": [1., 0.5, 3, 1, 0, 1],
...                     "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00",
...                                   "2020-02-01 00:00:01", "2020-01-01 00:04:15",
...                                   "2020-01-02 00:04:14", "2020-01-05 23:59:59"]},
...             )
>>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601")
>>> log_sp = convert2spark(log_pd)
>>> log_sp.show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
|     u3|     i3|   1.0|2020-01-05 23:59:59|
+-------+-------+------+-------------------+

Only first interaction:

>>> NumInteractionsFilter(1, True, item_column='item_id').transform(log_sp).orderBy('user_id').show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
+-------+-------+------+-------------------+

Only last interaction:

>>> NumInteractionsFilter(1, False).transform(log_sp).orderBy('user_id').show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i3|   1.0|2020-01-05 23:59:59|
+-------+-------+------+-------------------+
>>> NumInteractionsFilter(1, False, item_column='item_id').transform(log_sp).orderBy('user_id').show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i3|   1.0|2020-01-05 23:59:59|
+-------+-------+------+-------------------+
class replay.preprocessing.filters.QuantileItemsFilter(alpha_quantile=0.99, items_proportion=0.5, query_column='query_id', item_column='item_id')

Filter is aimed on undersampling the interactions dataset.

Filter algorithm performs undersampling by removing items_proportion of interactions for each items counts that exceeds the alpha_quantile value in distribution. Filter firstly removes popular items (items that have most interactions). Filter also keeps the original relation of items popularity among each other by removing interactions only in range of current item count and quantile count (specified by alpha_quantile).

>>> import pandas as pd
>>> from replay.utils.spark_utils import convert2spark
>>> log_pd = pd.DataFrame({
...        "user_id": [0, 0, 1, 2, 2, 2, 2],
...        "item_id": [0, 2, 1, 1, 2, 2, 2]
... })
>>> log_spark = convert2spark(log_pd)
>>> log_spark.show()
+-------+-------+
|user_id|item_id|
+-------+-------+
|      0|      0|
|      0|      2|
|      1|      1|
|      2|      1|
|      2|      2|
|      2|      2|
|      2|      2|
+-------+-------+
>>> QuantileItemsFilter(query_column="user_id").transform(log_spark).show()
+-------+-------+
|user_id|item_id|
+-------+-------+
|      0|      0|
|      1|      1|
|      2|      1|
|      2|      2|
|      2|      2|
|      0|      2|
+-------+-------+
class replay.preprocessing.filters.TimePeriodFilter(start_date=None, end_date=None, timestamp_column='timestamp', time_column_format='%Y-%m-%d %H:%M:%S')

Select a part of data between [start_date, end_date).

>>> import pandas as pd
>>> from replay.utils.spark_utils import convert2spark
>>> log_pd = pd.DataFrame({"user_id": ["u1", "u2", "u2", "u3", "u3", "u3"],
...                     "item_id": ["i1", "i2","i3", "i1", "i2","i3"],
...                     "rating": [1., 0.5, 3, 1, 0, 1],
...                     "timestamp": ["2020-01-01 23:59:59", "2020-02-01 00:00:00",
...                                   "2020-02-01 00:00:01", "2020-01-01 00:04:15",
...                                   "2020-01-02 00:04:14", "2020-01-05 23:59:59"]},
...             )
>>> log_pd["timestamp"] = pd.to_datetime(log_pd["timestamp"], format="ISO8601")
>>> log_sp = convert2spark(log_pd)
>>> log_sp.show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u2|     i2|   0.5|2020-02-01 00:00:00|
|     u2|     i3|   3.0|2020-02-01 00:00:01|
|     u3|     i1|   1.0|2020-01-01 00:04:15|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
|     u3|     i3|   1.0|2020-01-05 23:59:59|
+-------+-------+------+-------------------+
>>> TimePeriodFilter(
...    start_date="2020-01-01 14:00:00",
...    end_date=datetime(2020, 1, 3, 0, 0, 0)
... ).transform(log_sp).show()
+-------+-------+------+-------------------+
|user_id|item_id|rating|          timestamp|
+-------+-------+------+-------------------+
|     u1|     i1|   1.0|2020-01-01 23:59:59|
|     u3|     i2|   0.0|2020-01-02 00:04:14|
+-------+-------+------+-------------------+

CSRConverter

Convert input data to csr sparse matrix.

class replay.preprocessing.converter.CSRConverter(first_dim_column, second_dim_column, data_column=None, row_count=None, column_count=None, allow_collect_to_master=False)

Convert input data to csr sparse matrix. Where data_column, first_dim_column and second_dim_column satisfy the relationship matrix[first_dim_column[i], second_dim_column[i]] = data_column[i].

>>> import pandas as pd
>>> interactions = pd.DataFrame({
...    "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3],
...    "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5],
...    "rating": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4]
... })
>>> interactions
    user_id  item_id  rating
0        1        3       1
1        1        7       2
2        1       10       3
3        2        5       3
4        2        8       2
5        2       11       1
6        3        4       3
7        3        9      12
8        3        2       1
9        3        5       4
>>> csr_interactions = CSRConverter(
...    first_dim_column="user_id",
...    second_dim_column="item_id",
...    data_column="rating",
... ).transform(interactions)
>>> csr_interactions.todense()
matrix([[ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
        [ 0,  0,  0,  1,  0,  0,  0,  2,  0,  0,  3,  0],
        [ 0,  0,  0,  0,  0,  3,  0,  0,  2,  0,  0,  1],
        [ 0,  0,  1,  0,  3,  4,  0,  0,  0, 12,  0,  0]])
transform(data)

Transform Spark or Pandas Data Frame to csr.

Parameters

data (Union[DataFrame, DataFrame, DataFrame]) – Spark or Pandas Data Frame containing columns first_dim_column, second_dim_column, and optional data_column.

Return type

csr_matrix

Returns

Sparse interactions.

Sessionizer

Create and filter sessions from given interactions.

class replay.preprocessing.sessionizer.Sessionizer(user_column='user_id', time_column='timestamp', session_column='session_id', session_gap=86400, time_column_format='yyyy-MM-dd HH:mm:ss', min_inter_per_session=None, max_inter_per_session=None, min_sessions_per_user=None, max_sessions_per_user=None)

Create and filter sessions from given interactions. Session ids are formed as subtraction between unique users cumulative sum and number of entries inside each user history that are greater than session gap.

>>> import pandas as pd
>>> time_interactions = pd.DataFrame({
...    "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3],
...    "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5],
...    "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4]
... })
>>> time_interactions
   user_id  item_id  timestamp
0        1        3          1
1        1        7          2
2        1       10          3
3        2        5          3
4        2        8          2
5        2       11          1
6        3        4          3
7        3        9         12
8        3        2          1
9        3        5          4
>>> Sessionizer(session_gap=5).transform(time_interactions)
   user_id  item_id  timestamp  session_id
0        1        3          1           2
1        1        7          2           2
2        1       10          3           2
3        2        5          3           5
4        2        8          2           5
5        2       11          1           5
6        3        4          3           9
7        3        9         12           8
8        3        2          1           9
9        3        5          4           9
transform(interactions)

Create and filter sessions from given interactions.

Parameters

interactions (Union[DataFrame, DataFrame, DataFrame]) – DataFrame containing columns user_column, time_column.

Return type

Union[DataFrame, DataFrame, DataFrame]

Returns

DataFrame with created and filtered sessions.

Padder (Experimental)

Pad array columns in dataframe.

class replay.experimental.preprocessing.padder.Padder(pad_columns, padding_side='right', padding_value=0, array_size=None, cut_array=True, cut_side='right')

Pad array columns in dataframe.

>>> import pandas as pd
>>> pad_interactions = pd.DataFrame({
...    "user_id": [1, 1, 1, 1, 2, 2, 3, 3, 3],
...    "timestamp": [[1], [1, 2], [1, 2, 4], [1, 2, 4, 6], [4, 7, 12],
...                  [4, 7, 12, 126], [1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6],
...                  [1, 2, 3, 4, 5, 6, 7]],
...    "item_id": [['a'], ['a', 'b'], ['a', 'b', 'd'], ['a', 'b', 'd', 'f'], ['d', 'e', 'm'],
...                ['d', 'e', 'm', 'g'], ['a', 'b', 'c', 'd', 'a'], ['a', 'b', 'c', 'd', 'a', 'f'],
...                ['a', 'b', 'c', 'd', 'a', 'f', 'e']]
... })
>>> pad_interactions
    user_id              timestamp                item_id
0        1                    [1]                    [a]
1        1                 [1, 2]                 [a, b]
2        1              [1, 2, 4]              [a, b, d]
3        1           [1, 2, 4, 6]           [a, b, d, f]
4        2             [4, 7, 12]              [d, e, m]
5        2        [4, 7, 12, 126]           [d, e, m, g]
6        3        [1, 2, 3, 4, 5]        [a, b, c, d, a]
7        3     [1, 2, 3, 4, 5, 6]     [a, b, c, d, a, f]
8        3  [1, 2, 3, 4, 5, 6, 7]  [a, b, c, d, a, f, e]
>>> Padder(
...    pad_columns=["item_id", "timestamp"],
...    padding_side="right",
...    padding_value=["[PAD]", 0],
...    array_size=5,
...    cut_array=True,
...    cut_side="right"
... ).transform(pad_interactions)
   user_id           timestamp                          item_id
0        1     [1, 0, 0, 0, 0]  [a, [PAD], [PAD], [PAD], [PAD]]
1        1     [1, 2, 0, 0, 0]      [a, b, [PAD], [PAD], [PAD]]
2        1     [1, 2, 4, 0, 0]          [a, b, d, [PAD], [PAD]]
3        1     [1, 2, 4, 6, 0]              [a, b, d, f, [PAD]]
4        2    [4, 7, 12, 0, 0]          [d, e, m, [PAD], [PAD]]
5        2  [4, 7, 12, 126, 0]              [d, e, m, g, [PAD]]
6        3     [1, 2, 3, 4, 5]                  [a, b, c, d, a]
7        3     [2, 3, 4, 5, 6]                  [b, c, d, a, f]
8        3     [3, 4, 5, 6, 7]                  [c, d, a, f, e]
transform(interactions)

Pad dataframe.

Parameters

interactions (Union[DataFrame, DataFrame, DataFrame]) – DataFrame with array columns with names pad_columns.

Return type

Union[DataFrame, DataFrame, DataFrame]

Returns

DataFrame with padded array columns.

SequenceGenerator (Experimental)

Creating sequences for sequential models.

class replay.experimental.preprocessing.sequence_generator.SequenceGenerator(groupby_column, orderby_column=None, transform_columns=None, len_window=50, sequence_prefix=None, sequence_suffix='_list', label_prefix='label_', label_suffix=None, get_list_len=False, list_len_column='list_len')

Creating sequences for sequential models.

E.g., u1 has purchase sequence <i1, i2, i3, i4>, then after processing, there will be generated three cases.

u1, <i1> | i2

(Which means given user_id u1 and item_seq <i1>, model need to predict the next item i2.)

The other cases are below:

u1, <i1, i2> | i3

u1, <i1, i2, i3> | i4

>>> import pandas as pd
>>> time_interactions = pd.DataFrame({
...    "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 3],
...    "item_id": [3, 7, 10, 5, 8, 11, 4, 9, 2, 5],
...    "timestamp": [1, 2, 3, 3, 2, 1, 3, 12, 1, 4]
... })
>>> time_interactions
   user_id  item_id  timestamp
0        1        3          1
1        1        7          2
2        1       10          3
3        2        5          3
4        2        8          2
5        2       11          1
6        3        4          3
7        3        9         12
8        3        2          1
9        3        5          4
>>> sequences = (
...    SequenceGenerator(
...        groupby_column="user_id", transform_columns=["item_id", "timestamp"]
...    ).transform(time_interactions)
... )
>>> sequences
   user_id item_id_list timestamp_list  label_item_id  label_timestamp
0        1          [3]            [1]              7                2
1        1       [3, 7]         [1, 2]             10                3
2        2          [5]            [3]              8                2
3        2       [5, 8]         [3, 2]             11                1
4        3          [4]            [3]              9               12
5        3       [4, 9]        [3, 12]              2                1
6        3    [4, 9, 2]     [3, 12, 1]              5                4
transform(interactions)

Create sequences from given interactions.

Parameters

interactions (Union[DataFrame, DataFrame, DataFrame]) – DataFrame.

Return type

Union[DataFrame, DataFrame, DataFrame]

Returns

DataFrame with transformed interactions. Sequential interactions in list.

Data Preparation (Experimental)

Replay has a number of requirements for input data. We await that input columns are in the form [user_id, item_id, timestamp, relevance]. And internal format is a spark DataFrame with indexed integer values for [user_idx, item_idx]. You can convert indexes of your Spark DataFrame with Indexer class.

class replay.experimental.preprocessing.data_preparator.Indexer(user_col='user_id', item_col='item_id')

This class is used to convert arbitrary id to numerical idx and back.

fit(users, items)

Creates indexers to map raw id to numerical idx so that spark can handle them. :type users: DataFrame :param users: SparkDataFrame containing user column :type items: DataFrame :param items: SparkDataFrame containing item column :rtype: None :return:

inverse_transform(df)

Convert SparkDataFrame to the initial indexes.

Parameters

df (DataFrame) – SparkDataFrame with numerical user_idx/item_idx columns

Return type

DataFrame

Returns

SparkDataFrame with original user/item columns

transform(df)

Convert raw user_col and item_col to numerical user_idx and item_idx

Parameters

df (DataFrame) – dataframe with raw indexes

Return type

Optional[DataFrame]

Returns

dataframe with converted indexes

If your DataFrame is in the form of Pandas DataFrame and has different column names, you can either preprocess it yourself with convert2spark function or use DataPreparator class

class replay.experimental.preprocessing.data_preparator.DataPreparator
Transforms data to a library format:
  • read as a spark dataframe/ convert pandas dataframe to spark

  • check for nulls

  • create relevance/timestamp columns if absent

  • convert dates to TimestampType

Examples:

Loading log DataFrame

>>> import pandas as pd
>>> from replay.experimental.preprocessing.data_preparator import DataPreparator
>>>
>>> log = pd.DataFrame({"user": [2, 2, 2, 1],
...                     "item_id": [1, 2, 3, 3],
...                     "rel": [5, 5, 5, 5]}
...                    )
>>> dp = DataPreparator()
>>> correct_log = dp.transform(data=log,
...                            columns_mapping={"user_id": "user",
...                                           "item_id": "item_id",
...                                           "relevance": "rel"}
...                             )
>>> correct_log.show(2)
+-------+-------+---------+-------------------+
|user_id|item_id|relevance|          timestamp|
+-------+-------+---------+-------------------+
|      2|      1|      5.0|2099-01-01 00:00:00|
|      2|      2|      5.0|2099-01-01 00:00:00|
+-------+-------+---------+-------------------+
only showing top 2 rows

Loading user features

>>> import pandas as pd
>>> from replay.experimental.preprocessing.data_preparator import DataPreparator
>>>
>>> log = pd.DataFrame({"user": ["user1", "user1", "user2"],
...                     "f0": ["feature1","feature2","feature1"],
...                     "f1": ["left","left","center"],
...                     "ts": ["2019-01-01","2019-01-01","2019-01-01"]}
...             )
>>> dp = DataPreparator()
>>> correct_log = dp.transform(data=log,
...                            columns_mapping={"user_id": "user"},
...                             )
>>> correct_log.show(3)
+-------+--------+------+----------+
|user_id|      f0|    f1|        ts|
+-------+--------+------+----------+
|  user1|feature1|  left|2019-01-01|
|  user1|feature2|  left|2019-01-01|
|  user2|feature1|center|2019-01-01|
+-------+--------+------+----------+
static add_absent_log_cols(dataframe, columns_mapping, default_relevance=1.0, default_ts='2099-01-01')

Add relevance and timestamp columns with default values if relevance or timestamp is absent among mapping keys.

Parameters
  • dataframe (DataFrame) – interactions log to process

  • columns_mapping (Dict[str, str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys: [user_id, user_id, timestamp, relevance]

  • default_relevance (float) – default value for generated relevance column

  • default_ts (str) – str, default value for generated timestamp column

Returns

spark DataFrame with generated timestamp and relevance columns if absent in original dataframe

check_df(dataframe, columns_mapping)

Check: - if dataframe is not empty, - if columns from columns_mapping are present in dataframe - warn about nulls in columns from columns_mapping - warn about absent of timestamp/relevance columns for interactions log - warn about wrong relevance DataType

Parameters
  • dataframe (DataFrame) – spark DataFrame to process

  • columns_mapping (Dict[str, str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys: [user_id, user_id, timestamp, relevance] columns_mapping values specifies the nature of the DataFrame: - if both [user_id, item_id] are present, then the dataframe is a log of interactions. Specify timestamp, relevance columns in mapping if available. - if ether user_id or item_id is present, then the dataframe is a dataframe of user/item features

Return type

None

property logger: Logger
Returns

get library logger

static read_as_spark_df(data=None, path=None, format_type=None, **kwargs)

Read spark dataframe from file of transform pandas dataframe.

Parameters
  • data (Union[DataFrame, DataFrame, DataFrame, None]) – DataFrame to process (pass or data should be defined)

  • path (Optional[str]) – path to data (pass or data should be defined)

  • format_type (Optional[str]) – file type, one of [csv , parquet , json , table]

  • kwargs – extra arguments passed to spark.read.<format>(path, **reader_kwargs)

Return type

DataFrame

Returns

spark DataFrame

transform(columns_mapping, data=None, path=None, format_type=None, date_format=None, reader_kwargs=None)

Transforms log, user or item features into a Spark DataFrame [user_id, user_id, timestamp, relevance], [user_id, *features], or [item_id, *features]. Input is either file of format_type at path, or pandas.DataFrame or spark.DataFrame. Transform performs: - dataframe reading/convert to spark DataFrame format - check dataframe (nulls, columns_mapping) - rename columns from mapping to standard names (user_id, user_id, timestamp, relevance) - for interactions log: create absent columns, convert timestamp column to TimestampType and relevance to DoubleType

Parameters
  • columns_mapping (Dict[str, str]) – dictionary mapping “key: column name in input DataFrame”. Possible keys: [user_id, user_id, timestamp, relevance] columns_mapping values specifies the nature of the DataFrame: - if both [user_id, item_id] are present, then the dataframe is a log of interactions. Specify timestamp, relevance columns in mapping if present. - if ether user_id or item_id is present, then the dataframe is a dataframe of user/item features

  • data (Union[DataFrame, DataFrame, DataFrame, None]) – DataFrame to process

  • path (Optional[str]) – path to data

  • format_type (Optional[str]) – file type, one of [csv , parquet , json , table]

  • date_format (Optional[str]) – format for the timestamp column

  • reader_kwargs (Optional[Dict]) – extra arguments passed to spark.read.<format>(path, **reader_kwargs)

Return type

DataFrame

Returns

processed DataFrame