Source code for autoflow.utils.pipeline

from typing import Optional, Dict

from sklearn.pipeline import Pipeline, FeatureUnion

from autoflow.workflow.ml_workflow import ML_Workflow


[docs]def concat_pipeline(*args) -> Optional[ML_Workflow]: pipeline_list = [] resource_manager = None should_store_intermediate_result = False for node in args: if isinstance(node, ML_Workflow): pipeline_list.extend(node.steps) resource_manager = node.resource_manager should_store_intermediate_result = node.should_store_intermediate_result if pipeline_list: return ML_Workflow(pipeline_list, should_store_intermediate_result, resource_manager) else: return None
[docs]def purify_node(node): if isinstance(node, Pipeline) and len(node) == 1: return node[0] return node
[docs]def union_pipeline(preprocessors: Dict) -> Optional[Pipeline]: name = "feature_union" pipeline_list = [] for key, value in preprocessors.items(): if isinstance(value, Pipeline): pipeline_list.append(( key, purify_node(value) )) if pipeline_list: return Pipeline([( name, FeatureUnion(pipeline_list, n_jobs=1) )]) else: return None