Source code for pqagent.preprocessor
from dataclasses import dataclass, field
from typing import Optional
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder
from dwrappr import DataSet, DataSetMeta
import logging
logger = logging.getLogger(__name__)
[docs]
class Encoder:
"""
A class for handling categorical datasets encoding using OneHotEncoder or OrdinalEncoder.
Attributes:
encoder (OneHotEncoder or OrdinalEncoder): Encoder object to transform categorical datasets.
encoder_type (str): Type of the encoder ('onehot' or 'ordinal').
categorical_columns (pd.Index): Categorical columns identified for encoding.
new_columns (list): New column names generated by the OneHotEncoder.
Methods:
initialize_encoder(encoder_type: str): Initializes the encoder based on the provided encoder type.
fit_transform(df: pd.DataFrame) -> pd.DataFrame: Fits the encoder to the categorical columns and transforms them.
transform(df: pd.DataFrame) -> pd.DataFrame: Transforms the provided dataframe using the pre-fitted encoder.
inverse_transform(df: pd.DataFrame) -> pd.DataFrame: Reverts the transformed columns back to their original form.
"""
def __init__(self, encoder_type: str):
"""
Initialize the Endoder with the specified encoder type.
:param encoder_type: Type of encoder to use ('onehot' or 'ordinal').
"""
self.encoder = self.initialize_encoder(encoder_type=encoder_type)
self.encoder_type = encoder_type.lower()
self.categorical_columns = None
self.new_columns = None
[docs]
def initialize_encoder(self, encoder_type: str):
"""
Initializes the encoder based on the provided encoder type.
:param encoder_type: Type of encoder ('onehot' or 'ordinal').
:return: Initialized encoder (OneHotEncoder or OrdinalEncoder).
:raises NameError: If the encoder type is unknown.
"""
if encoder_type.lower() == 'onehot':
logger.info(f"Setting up {encoder_type} Encoder")
return OneHotEncoder(sparse_output=False, drop=None, handle_unknown='ignore')
elif encoder_type.lower() == 'ordinal':
logger.info(f"Setting up {encoder_type} Encoder")
return OrdinalEncoder()
else:
raise NameError("Unknown encoder type")
[docs]
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Fits the encoder to the categorical columns and transforms them.
:param df: DataFrame containing the datasets to be encoded.
:return: Transformed DataFrame with encoded categorical columns.
"""
self.categorical_columns = df.select_dtypes(include=['object', 'category']).columns
if not self.categorical_columns.any():
logger.info("No categorical columns, encoding skipped")
return df
if self.encoder_type == 'onehot':
transformed = self.encoder.fit_transform(df[self.categorical_columns])
self.new_columns = self.encoder.get_feature_names_out(self.categorical_columns)
df_transformed = pd.DataFrame(transformed, columns=self.new_columns, index=df.index)
df_remaining = df.drop(columns=self.categorical_columns)
return pd.concat([df_remaining, df_transformed], axis=1)
else:
transformed = self.encoder.fit_transform(df[self.categorical_columns])
df[self.categorical_columns] = transformed
return df
[docs]
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transforms the provided DataFrame using the pre-fitted encoder.
:param df: DataFrame containing the datasets to be transformed.
:return: Transformed DataFrame with encoded categorical columns.
"""
if self.encoder_type == 'onehot':
transformed = self.encoder.transform(df[self.categorical_columns])
df_transformed = pd.DataFrame(transformed, columns=self.new_columns, index=df.index)
df_remaining = df.drop(columns=self.categorical_columns)
return pd.concat([df_remaining, df_transformed], axis=1)
else:
transformed = self.encoder.transform(df[self.categorical_columns])
df[self.categorical_columns] = transformed
return df
[docs]
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Reverts the transformed columns back to their original form.
:param df: DataFrame containing the transformed datasets.
:return: DataFrame with the original categorical columns restored.
"""
if not self.categorical_columns.any():
return df
if self.encoder_type == 'onehot':
df_transformed = df[self.new_columns]
inversed = self.encoder.inverse_transform(df_transformed)
df_inversed = pd.DataFrame(inversed, columns=self.categorical_columns, index=df.index)
df_remaining = df.drop(columns=self.new_columns)
return pd.concat([df_remaining, df_inversed], axis=1)
else:
df[self.categorical_columns] = self.encoder.inverse_transform(df[self.categorical_columns])
return df
[docs]
class Scaler:
"""
A class for handling scaling of numerical datasets using StandardScaler or MinMaxScaler.
Attributes:
scaler (StandardScaler or MinMaxScaler): Scaler object to scale numerical datasets.
numerical_columns (pd.Index): Numerical columns identified for scaling.
Methods:
initialize_scaler(scaler_type: str): Initializes the scaler based on the provided scaler type.
fit_transform(df: pd.DataFrame) -> pd.DataFrame: Fits the scaler to the numerical columns and transforms them.
transform(df: pd.DataFrame) -> pd.DataFrame: Transforms the numerical columns using the pre-fitted scaler.
inverse_transform(df: pd.DataFrame) -> pd.DataFrame: Reverts the scaled numerical columns back to their original form.
"""
def __init__(self, scaler_type: str):
"""
Initialize the Scaler with the specified scaler type.
:param scaler_type: Type of scaler to use ('standardization' or 'minmax').
"""
self.scaler = self.initialize_scaler(scaler_type=scaler_type)
self.numerical_columns = None
[docs]
def initialize_scaler(self, scaler_type: str):
"""
Initializes the scaler based on the provided scaler type.
:param scaler_type: Type of scaler ('standardization' or 'minmax').
:return: Initialized scaler (StandardScaler or MinMaxScaler).
:raises NameError: If the scaler type is unknown.
"""
if scaler_type.lower() == 'standardization':
return StandardScaler()
elif scaler_type.lower() == 'minmax':
return MinMaxScaler()
else:
raise NameError("Unknown scaler type")
[docs]
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Fits the scaler to the numerical columns and transforms them.
:param df: DataFrame containing the datasets to be scaled.
:return: Transformed DataFrame with scaled numerical columns.
"""
self.numerical_columns = df.select_dtypes(include=['float64', 'int64']).columns
df[self.numerical_columns] = self.scaler.fit_transform(df[self.numerical_columns])
return df
[docs]
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transforms the numerical columns using the pre-fitted scaler.
:param df: DataFrame containing the datasets to be transformed.
:return: Transformed DataFrame with scaled numerical columns.
"""
df[self.numerical_columns] = self.scaler.transform(df[self.numerical_columns])
return df
[docs]
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Reverts the scaled numerical columns back to their original form.
:param df: DataFrame containing the scaled datasets.
:return: DataFrame with original numerical values restored.
"""
df[self.numerical_columns] = self.scaler.inverse_transform(df[self.numerical_columns])
return df
[docs]
@dataclass
class Preprocessor:
encoder: Encoder = field(init=False)
scaler: Scaler = field(init=False)
fitted: bool = False
def __init__(self, scaler_type: str, encoder_type: str = None):
"""
Initialize Preprocessor with specified scaler and encoder types.
:param scaler_type: Type of scaler to use ('standardization' or 'minmax').
:param encoder_type: Type of encoder to use ('onehot' or 'ordinal'). If None, no encoding is applied.
"""
self.scaler = Scaler(scaler_type) if scaler_type else None
self.encoder = Encoder(encoder_type) if encoder_type else None
[docs]
@classmethod
def from_config(cls, config: dict):
"""
Instantiate Preprocessor using a configuration dictionary.
:param config: A dictionary with keys 'scaler_type' and 'encoder_type'.
:return: Preprocessor instance.
"""
return cls(scaler_type=config["scaler_type"],
encoder_type=config["encoder_type"])
[docs]
def fit_transform(self, dataset: DataSet, inplace=False) -> Optional[DataSet]:
"""
Fit the scaler and encoder to the provided datasets and transform them.
:param dataset: DataSet objects to preprocess.
:param inplace: If False, create and return a new DataSet. Inplace modification is not supported yet.
:return: Preprocessed DataSet.
"""
# self._check_feature_consistency(datasets)
# X, y = self._prepare_data_for_transform(dataset)
x_transformed = self._fit_transform_x(dataset.x_as_df)
df_transformed = pd.concat([x_transformed, dataset.y_as_df, dataset.z_as_df], axis=1)
if not inplace:
return self._build_transformed_dataset(df_transformed=df_transformed,
feature_names=x_transformed.columns.tolist(),
target_names=dataset.target_names,
auxiliary_names=dataset.auxiliary_names
)
else:
raise NotImplementedError
def _fit_transform_x(self, x: pd.DataFrame) -> pd.DataFrame:
"""
Apply fit_transform method to the feature datasets using the defined scaler and encoder.
:param x: DataFrame containing feature datasets.
:return: Transformed DataFrame.
"""
if self.encoder is not None:
x = self.encoder.fit_transform(x)
if self.scaler is not None:
x = self.scaler.fit_transform(x)
if not any(hasattr(self, attr) for attr in ['encoder', 'scaler']):
raise ValueError("Neither scaler nor encoder defined. Transformation not possible")
return x
[docs]
def transform(self, dataset: DataSet, inplace: bool = False) -> DataSet:
self._check_preprocessor_defined()
# X, y = self._prepare_data_for_transform(datasets)
if not inplace:
x_transformed = self._transform_X(dataset.x_as_df)
df_transformed = pd.concat([x_transformed, dataset.y_as_df, dataset.z_as_df], axis=1)
# transform back into dataset
return self._build_transformed_dataset(df_transformed=df_transformed,
feature_names=x_transformed.columns.tolist(),
target_names=dataset.target_names,
auxiliary_names=dataset.auxiliary_names)
else:
raise NotImplementedError
def _transform_X(self, X: pd.DataFrame) -> pd.DataFrame:
"""
Apply transform method to the feature datasets using the pre-fitted scaler and encoder.
:param X: DataFrame containing feature datasets.
:return: Transformed DataFrame.
"""
if self.encoder and self.encoder.categorical_columns.any():
X = self.encoder.transform(X)
if self.scaler:
X = self.scaler.transform(X)
return X
[docs]
def inverse_transform(self, dataset: DataSet, inplace: bool = False) -> DataSet:
"""
Apply inverse transformation to the datasets using the pre-fitted scaler and encoder.
:param dataset: DataSet objects to preprocess.
:param inplace: If False, create and return a new DataSet. Inplace modification is not supported yet.
:return: Inversely transformed DataSet.
"""
self._check_preprocessor_defined()
# datasets = to_list(datasets)
# X, y = self._prepare_data_for_transform(datasets)
if not inplace:
x_transformed = self._inverse_transform_x(dataset.x_as_df)
df_transformed = pd.concat([x_transformed, dataset.y_as_df, dataset.z_as_df], axis=1)
# return self._build_dataset_from_df(df, dataset)
return self._build_transformed_dataset(df_transformed=df_transformed,
feature_names=x_transformed.columns.tolist(),
target_names=dataset.target_names,
auxiliary_names=dataset.auxiliary_names)
else:
raise NotImplementedError
def _inverse_transform_x(self, x: pd.DataFrame) -> pd.DataFrame:
"""
Apply inverse transformation to the feature datasets using the pre-fitted scaler and encoder.
:param x: DataFrame containing feature datasets.
:return: Inversely transformed DataFrame.
"""
if self.scaler is not None:
x = self.scaler.inverse_transform(x)
if self.encoder is not None:
x = self.encoder.inverse_transform(x)
return x
@staticmethod
def _build_transformed_dataset(df_transformed: pd.DataFrame,
feature_names: list[str],
target_names: list[str],
auxiliary_names: list[str]) -> DataSet:
ds_transformed = DataSet.from_dataframe(
df=df_transformed,
meta=DataSetMeta(
name="transformed dataset",
feature_names=feature_names,
target_names=target_names,
auxiliary_names=auxiliary_names
)
)
return ds_transformed
# def _build_dataset_from_df(self, df: pd.DataFrame, datasets: list[DataSet]) -> DataSet:
# """
# Rebuild a DataSet object from the transformed DataFrame.
#
# :param df: Transformed DataFrame.
# :param datasets: Original list of DataSet objects.
# :return: New DataSet constructed from the DataFrame.
# """
# ds = DataSet.from_dataframe(name="tmp_name",
# df=df,
# feature_columns=df.columns[:-len(datasets[0].targets)].tolist(),
# target_columns=datasets[0].targets,
# check_df=False)
#
# if len(datasets) > 1:
# ds.meta["contained_datasets"] = [dataset.name for dataset in datasets]
# ds.name = "combined_datasets"
# else:
# ds.name = datasets[0].name
#
# self.fitted = True
# return ds
#
# def _prepare_data_for_transform(self, datasets: list[DataSet]) -> (pd.DataFrame, pd.DataFrame):
# """
# Prepare the feature and target datasets for transformation.
#
# :param datasets: List of DataSet objects.
# :return: Tuple of DataFrames (X, y), where X is the feature datasets and y is the target datasets.
# """
# X = pd.concat([dataset.x_as_df for dataset in datasets], ignore_index=True)
# y = pd.concat([dataset.y_as_df for dataset in datasets], ignore_index=True)
# return X, y
def _check_feature_consistency(self, datasets: list[DataSet]):
"""
Ensure that all datasets have the same feature and target structures.
:param datasets: List of DataSet objects to check.
:return: Features and targets.
:raises ValueError: If feature or target structures are inconsistent across datasets.
"""
features = datasets[0].feature_names
targets = datasets[0].target_names
auxiliraries = datasets[0].auxiliary_names
for ds in datasets:
if ds.feature_names != features or ds.target_names != targets or ds.auxiliary_names != auxiliraries:
raise ValueError("Inconsistent feature or target structure detected in datasets.")
return features, targets
def _check_preprocessor_defined(self):
"""
Check if either a scaler or encoder is defined.
:raises AttributeError: If neither scaler nor encoder is defined.
"""
if not any(hasattr(self, attr) for attr in ['encoder', 'scaler']):
raise AttributeError("No preprocessor defined yet (scaler and/or encoder). Fit-transform first.")
[docs]
def to_list(input: any) -> list:
"""
Ensure that the input is a list. If not, convert it to a list.
:param input: Any value or list.
:return: List containing the input value or the input itself if it's already a list.
"""
return [input] if not isinstance(input, list) else input