Source code for autoflow.manager.data_manager

# -*- encoding: utf-8 -*-
from copy import deepcopy
from typing import Union, Any, Dict, Sequence, List

import numpy as np
import pandas as pd

from autoflow.pipeline.components.utils import stack_Xs
from autoflow.pipeline.dataframe import GenericDataFrame
from autoflow.utils.data import is_nan, is_cat, is_highR_nan, to_array
from autoflow.utils.dataframe import pop_if_exists
from autoflow.utils.klass import StrSignatureMixin
from autoflow.utils.logging_ import get_logger
from autoflow.utils.ml_task import MLTask, get_ml_task_from_y


[docs]class DataManager(StrSignatureMixin): ''' DataManager is a Dataset manager to store the pattern of dataset. ''' def __init__( self, X_train: Union[pd.DataFrame, GenericDataFrame, np.ndarray, None] = None, y_train: Union[pd.Series, np.ndarray, str, None] = None, X_test: Union[pd.DataFrame, GenericDataFrame, np.ndarray, None] = None, y_test: Union[pd.Series, np.ndarray, str, None] = None, dataset_metadata: Dict[str, Any] = frozenset(), column_descriptions: Dict[str, Union[List[str], str]] = None, highR_nan_threshold: float = 0.5, ): ''' Parameters ---------- X_train: :class:`numpy.ndarray` or :class:`pandas.DataFrame` y_train: :class:`numpy.ndarray` X_test: :class:`numpy.ndarray` or :class:`pandas.DataFrame` y_test: :class:`numpy.ndarray` dataset_metadata: dict column_descriptions: dict ``column_descriptions`` is a dict, key is ``feature_group``, value is column (column name) or columns (list of column names). This is a list of some frequently-used built-in ``feature_group`` * ``id`` - id of this table. * ``ignore`` - some columns which contains irrelevant information. * ``target`` - column in the dataset is what your model will learn to predict. * ``nan`` - Not a Number, a column contain missing values. * ``num`` - numerical features, such as [1, 2, 3]. * ``cat`` - categorical features, such as ["a", "b", "c"]. * ``num_nan`` - numerical features contains missing values. such as [1, 2, NaN]. * ``cat_nan`` - categorical features contains missing values. such as ["a", "b", NaN]. * ``highR_nan`` - highly ratio NaN. You can find explain in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` * ``lowR_nan`` - lowly ratio NaN. You can find explain in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` * ``highC_cat`` - highly cardinality ratio categorical. You can find explain in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` * ``lowR_cat`` - lowly cardinality ratio categorical. You can find explain in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` highR_nan_threshold: float high ratio NaN threshold, you can find examples and practice in :class:`autoflow.hdl.hdl_constructor.HDL_Constructor` ''' self.logger = get_logger(self) dataset_metadata = dict(dataset_metadata) self.highR_nan_threshold = highR_nan_threshold self.dataset_metadata = dataset_metadata X_train = deepcopy(X_train) y_train = deepcopy(y_train) X_test = deepcopy(X_test) y_test = deepcopy(y_test) X_train, y_train, X_test, y_test, feature_groups, column2feature_groups = self.parse_column_descriptions( column_descriptions, X_train, y_train, X_test, y_test ) self.feature_groups = feature_groups self.column2feature_groups = column2feature_groups self.ml_task: MLTask = get_ml_task_from_y(y_train) self.X_train = GenericDataFrame(X_train, feature_groups=feature_groups) self.y_train = y_train self.X_test = GenericDataFrame(X_test, feature_groups=feature_groups) if X_test is not None else None self.y_test = y_test if y_test is not None else None # todo: 用户自定义验证集可以通过RandomShuffle 或者mlxtend指定 # fixme: 不支持multilabel if len(y_train.shape) > 2: raise ValueError('y must not have more than two dimensions, ' 'but has %d.' % len(y_train.shape)) if X_train.shape[0] != y_train.shape[0]: raise ValueError('X and y must have the same number of ' 'datapoints, but have %d and %d.' % (X_train.shape[0], y_train.shape[0]))
[docs] def parse_feature_groups(self, series: pd.Series): if is_nan(series): if is_highR_nan(series, self.highR_nan_threshold): return "highR_nan" else: return "nan" elif is_cat(series): return "cat" else: return "num"
[docs] def type_check(self, X): if isinstance(X, GenericDataFrame): X = pd.DataFrame(X) elif isinstance(X, np.ndarray): X = pd.DataFrame(X) elif isinstance(X, pd.DataFrame): pass elif X is None: pass else: raise TypeError return X
[docs] def parse_column_descriptions(self, column_descriptions, X_train, y_train, X_test, y_test): # todo: 校验X是否存在重名列 X_train = self.type_check(X_train) X_test = self.type_check(X_test) both_set = False if X_train is not None and X_test is None: X = X_train y = y_train elif X_train is None and X_test is not None: X = X_test y = y_test elif X_train is not None and X_test is not None: both_set = True X = X_train y = y_train self.logger.info("X_train and X_test are both set.") else: self.logger.error("X_train and X_test are both None, it is invalide.") raise ValueError if column_descriptions is None: column_descriptions = {} # fixme : DataManager存在只托管X的情况 # assert y is not None # --确定target-- if isinstance(y, str) or "target" in column_descriptions: if isinstance(y, str): target_col = y elif "target" in column_descriptions: target_col = column_descriptions["target"] else: raise NotImplementedError y_train = pop_if_exists(X_train, target_col) y_test = pop_if_exists(X_test, target_col) # --确定id-- if "id" in column_descriptions: id_col = column_descriptions["id"] self.id_seq = pop_if_exists(X_train, id_col) self.test_id_seq = pop_if_exists(X_test, id_col) # --确定ignore-- if "ignore" in column_descriptions: ignore_cols = column_descriptions["ignore"] if not isinstance(ignore_cols, Sequence): ignore_cols = [ignore_cols] for ignore_col in ignore_cols: pop_if_exists(X_train, ignore_col) pop_if_exists(X_test, ignore_col) # --验证X与X_test的列应该相同 if both_set: assert X_train.shape[1] == X_test.shape[1] assert np.all(X_train.columns == X_test.columns) # --确定其他列-- column2feature_groups = {} for key, values in column_descriptions.items(): if key in ("id", "target", "ignore"): continue if isinstance(values, str): values = [values] for value in values: column2feature_groups[value] = key # ----尝试将X_train与X_test拼在一起,然后做解析--------- X = stack_Xs(X_train, None, X_test) # ----对于没有标注的列,打上nan,highR_nan,cat,num三种标记 for column in X.columns: if column not in column2feature_groups: feature_group = self.parse_feature_groups(X[column]) column2feature_groups[column] = feature_group feature_groups = [column2feature_groups[column] for column in X.columns] L1 = X_train.shape[0] if X_train is not None else 0 if X_test is not None: L2 = X_test.shape[0] X_test.index = range(L1, L1 + L2) X_train.index = range(L1) y_train = to_array(y_train) y_test = to_array(y_test) return X_train, y_train, X_test, y_test, feature_groups, column2feature_groups
[docs] def process_X(self, X): if X is None: return None X: pd.DataFrame = self.type_check(X) # delete id, ignore, target columns = [column for column in X.columns if column in self.column2feature_groups] if len(self.feature_groups) != len(columns): self.logger.error( "In DataManager.process_X, processed columns' length don't equal to feature_groups' length.") raise ValueError X = X[columns] X = GenericDataFrame(X, feature_groups=self.feature_groups) return X
[docs] def set_data(self, X_train=None, y_train=None, X_test=None, y_test=None): self.X_train = self.process_X(X_train) self.X_test = self.process_X(X_test) self.y_train = y_train self.y_test = y_test