Skip to content

Pipeline

Pipeline

Pipeline

Bases: BaseEstimator, TransformerMixin

Transformation pipeline

Parameters:

Name Type Description Default
final_step

The final step of the transformation

required
cache_type CacheStoreType

Cache storage mode

MEMORY
Source code in ceruleo/transformation/functional/pipeline/pipeline.py
class Pipeline(BaseEstimator, TransformerMixin):
    """Transformation pipeline

    Parameters:
        final_step: The final step of the transformation
        cache_type: Cache storage mode
    """

    final_step: TransformerStep
    fitted_: bool
    cache_type: CacheStoreType
    runner: CachedPipelineRunner

    def __init__(self, final_step, cache_type: CacheStoreType = CacheStoreType.MEMORY):
        self.final_step = final_step
        self.fitted_ = False
        self.cache_type = cache_type
        self.runner = CachedPipelineRunner(final_step, cache_type)

    def find_node(
        self, name: str
    ) -> Union[List[TransformerStep], TransformerStep, None]:
        """Find a transformation node given a name

        Parameters:
            name: Name of the step to find

        Returns:

            steps: Steps located in the pipeline.

        """
        matches = []
        for node in dfs_iterator(self.final_step):
            if node.name == name:
                matches.append(node)

        if len(matches) == 1:
            return matches[0]
        elif len(matches) > 1:
            return matches
        else:
            return None

    def fit(
        self,
        dataset: Union[AbstractPDMDataset, pd.DataFrame],
        show_progress: bool = False,
    ):
        """Fit a pipeline using a dataset

        The CachedPipelineRunner is called to fit

        Parameters:

            dataset: A dataset of a run-to-failure cycle
            show_progress: Wether to show the progress when fitting

        Returns:
            s : Pipeline
        """
        if isinstance(dataset, pd.DataFrame):
            dataset = [dataset]
        c = self.runner.fit(dataset, show_progress=show_progress)
        self.column_names = c.columns
        self.fitted_ = True

        return self

    def partial_fit(
        self,
        dataset: Union[AbstractPDMDataset, pd.DataFrame],
        show_progress: bool = False,
    ):
        self.fit(dataset, show_progress=show_progress)

    def transform(self, df: Union[pd.DataFrame, Iterable[pd.DataFrame]]):
        """Transform a run-to-cycle failure or a dataset

        The CachedPipelineRunner is called to transform

        Parameters:

            df: A dataset of a run-to-failure cycle

        Returns:
            s : list of data frames
        """
        return self.runner.transform(df)

    def description(self):
        data = []
        for node in topological_sort_iterator(self):
            data.append(node.description())
        return data

    def get_params(self, deep: bool = False):
        params = {"cache_type": self.cache_type, "final_step": self.final_step}
        if deep:
            for node in topological_sort_iterator(self):
                p = node.get_params(deep)
                for k in p.keys():
                    params[f"{node.name}__{k}"] = p[k]
        return params

    def __sklearn_clone__(self):
        g = {node: sklearn_clone(node) for node  in dfs_iterator(self.final_step)}
        for k in g.keys():
            g[k].clear_connections()
        for node in dfs_iterator(self.final_step):
            for next_node in node.next:
                g[node].add_next(g[next_node])

        return Pipeline(
            final_step=g[self.final_step],
            cache_type=self.cache_type
        )

find_node(name)

Find a transformation node given a name

Parameters:

Name Type Description Default
name str

Name of the step to find

required
steps: Steps located in the pipeline.
Source code in ceruleo/transformation/functional/pipeline/pipeline.py
def find_node(
    self, name: str
) -> Union[List[TransformerStep], TransformerStep, None]:
    """Find a transformation node given a name

    Parameters:
        name: Name of the step to find

    Returns:

        steps: Steps located in the pipeline.

    """
    matches = []
    for node in dfs_iterator(self.final_step):
        if node.name == name:
            matches.append(node)

    if len(matches) == 1:
        return matches[0]
    elif len(matches) > 1:
        return matches
    else:
        return None

fit(dataset, show_progress=False)

Fit a pipeline using a dataset

The CachedPipelineRunner is called to fit

Parameters:

dataset: A dataset of a run-to-failure cycle
show_progress: Wether to show the progress when fitting

Returns:

Name Type Description
s

Pipeline

Source code in ceruleo/transformation/functional/pipeline/pipeline.py
def fit(
    self,
    dataset: Union[AbstractPDMDataset, pd.DataFrame],
    show_progress: bool = False,
):
    """Fit a pipeline using a dataset

    The CachedPipelineRunner is called to fit

    Parameters:

        dataset: A dataset of a run-to-failure cycle
        show_progress: Wether to show the progress when fitting

    Returns:
        s : Pipeline
    """
    if isinstance(dataset, pd.DataFrame):
        dataset = [dataset]
    c = self.runner.fit(dataset, show_progress=show_progress)
    self.column_names = c.columns
    self.fitted_ = True

    return self

transform(df)

Transform a run-to-cycle failure or a dataset

The CachedPipelineRunner is called to transform

Parameters:

df: A dataset of a run-to-failure cycle

Returns:

Name Type Description
s

list of data frames

Source code in ceruleo/transformation/functional/pipeline/pipeline.py
def transform(self, df: Union[pd.DataFrame, Iterable[pd.DataFrame]]):
    """Transform a run-to-cycle failure or a dataset

    The CachedPipelineRunner is called to transform

    Parameters:

        df: A dataset of a run-to-failure cycle

    Returns:
        s : list of data frames
    """
    return self.runner.transform(df)

make_pipeline(*steps, cache_type=CacheStoreType.MEMORY)

Build a pipeline

Example:

make_pipeline(
    ByNameFeatureSelector(features=FEATURES),
    Clip(lower=-2, upper=2),
    IndexMeanResampler(rule='500s')
)

Parameters:

steps: List of steps
cache_type: Where to store the pipeline intermediate steps

Returns:

TemporisPipeline: The created pipeline
Source code in ceruleo/transformation/functional/pipeline/pipeline.py
def make_pipeline(
    *steps, cache_type: CacheStoreType = CacheStoreType.MEMORY
) -> Pipeline:
    """Build a pipeline

    Example:

        make_pipeline(
            ByNameFeatureSelector(features=FEATURES),
            Clip(lower=-2, upper=2),
            IndexMeanResampler(rule='500s')
        )

    Parameters:

        steps: List of steps
        cache_type: Where to store the pipeline intermediate steps

    Returns:

        TemporisPipeline: The created pipeline
    """
    step = steps[0]
    for next_step in steps[1:]:
        step = next_step(step)

    return Pipeline(step, cache_type=cache_type)

Cache

CacheStoreType

Bases: Enum

Cache store modes

Values:

SHELVE = 1
MEMORY = 2
Source code in ceruleo/transformation/functional/pipeline/cache_store.py
class CacheStoreType(Enum):
    """Cache store modes

    Values:

        SHELVE = 1
        MEMORY = 2    
    """
    SHELVE = 1
    MEMORY = 2

GraphTraversalAbstractStore

Abstract Cache for the graph traversal

Source code in ceruleo/transformation/functional/pipeline/cache_store.py
class GraphTraversalAbstractStore:
    """Abstract Cache for the graph traversal
    """
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def close(self):
        raise NotImplementedError

    def reset(self):
        raise NotImplementedError

    def keys(self):
        raise NotImplementedError

    def pop(self, k):
        raise NotImplementedError

GraphTraversalCacheMemory

Cache all the intermediate steps in RAM

Source code in ceruleo/transformation/functional/pipeline/cache_store.py
class GraphTraversalCacheMemory:
    """Cache all the intermediate steps in RAM
    """
    def __init__(self):
        self.store = {}

    def __getitem__(self, k):
        return self.store[k]

    def __setitem__(self, k, v):
        self.store[k] = v

    def close(self):
        self.store = {}

    def reset(self):
        self.store = {}

    def keys(self):
        return self.store.keys()

    def pop(self, k):
        return self.store.pop(k)

GraphTraversalCacheShelveStore

Cache all the intermediate steps in a Shelve Store

Parameters:

cache_path: Path where the case is stored
Source code in ceruleo/transformation/functional/pipeline/cache_store.py
class GraphTraversalCacheShelveStore:
    """Cache all the intermediate steps in a Shelve Store

    Parameters:

        cache_path: Path where the case is stored
    """
    def __init__(self, cache_path: Path = CACHE_PATH):
        filename = "".join(str(uuid.uuid4()).split("-"))
        self.cache_path = cache_path / "GraphTraversalCache" / filename / "data"
        self.cache_path.parent.mkdir(exist_ok=True, parents=True)
        self.transformed_cache = shelve.open(str(self.cache_path))

    def __getitem__(self, k):
        return self.transformed_cache[k]

    def __setitem__(self, k, v):
        self.transformed_cache[k] = v

    def close(self):
        if self.cache_path.parent.is_dir():
            self.transformed_cache.close()
            shutil.rmtree(self.cache_path.parent)

    def reset(self):
        self.transformed_cache.close()
        self.transformed_cache = shelve.open(str(self.cache_path))

    def keys(self):
        return self.transformed_cache.keys()

    def pop(self, k):
        return self.transformed_cache.pop(k)

Cache

CachedPipelineRunner

Performs an execution of the transformation graph caching the intermediate results

Parameters:

final_step: Last step of the graph
cache_type: Mode for storing the cache
Source code in ceruleo/transformation/functional/pipeline/runner.py
class CachedPipelineRunner:
    """Performs an execution of the transformation graph caching the intermediate results

    Parameters:

        final_step: Last step of the graph
        cache_type: Mode for storing the cache
    """

    def __init__(
        self,
        final_step: TransformerStep,
        cache_type: CacheStoreType = CacheStoreType.SHELVE,
    ):

        self.final_step = final_step
        self.root_nodes = root_nodes(final_step)
        self.cache_type = cache_type

    def _run(
        self,
        dataset: Iterable[pd.DataFrame],
        fit: bool = True,
        show_progress: bool = False,
    ):
        dataset_size = len(dataset)

        with CachedGraphTraversal(
            self.root_nodes, dataset, cache_type=self.cache_type
        ) as cache:
            for node in topological_sort_iterator(self.final_step):
                if isinstance(node, TransformerStep) and fit:
                    if node.prefer_partial_fit:
                        for dataset_element in range(dataset_size):
                            d = cache.state_up_to(node, dataset_element)
                            node.partial_fit(d)
                    else:
                        data = pd.concat(
                            [
                                cache.state_up_to(node, dataset_element)
                                for dataset_element in range(dataset_size)
                            ]
                        )
                        node.fit(data)

                dataset_size = len(dataset)
                # if dataset_size > 1:
                # self._parallel_transform_step(cache, node, dataset_size, show_progress)
                # else:
                self._transform_step(cache, node, dataset_size, show_progress)

            last_state_key = cache.get_keys_of(None)[0]
            return cache.transformed_cache[last_state_key]

    def fit(self, dataset: Iterable[pd.DataFrame], show_progress: bool = False):
        return self._run(dataset, fit=True, show_progress=show_progress)

    def _update_step(self, cache, node, dataset_element, new_element):
        cache.clean_state_up_to(node, dataset_element)

        if len(node.next) > 0:
            for n in node.next:
                cache.store(n, node, dataset_element, new_element)
        else:
            cache.store(None, node, dataset_element, new_element)

    def _parallel_transform_step(
        self, cache: CachedGraphTraversal, node, dataset_size: int, show_progress: bool
    ):
        if show_progress:
            bar = tqdm(range(dataset_size))
            bar.set_description(node.name)
        else:
            bar = range(dataset_size)

        producers = []

        queue = JoinableQueue(dataset_size)
        for dataset_element in range(dataset_size):
            old_element = cache.state_up_to(
                node,
                dataset_element,
            )
            producers.append(
                Process(
                    target=_transform, args=(node, old_element, dataset_element, queue)
                )
            )
        for p in producers:
            p.start()

        for _ in bar:
            dataset_element, new_element = queue.get()
            queue.task_done()
            self._update_step(cache, node, dataset_element, new_element)
        cache.remove_state(node)
        for p in producers:
            p.join()
        queue.join()

    def _transform_step(
        self, cache: CachedGraphTraversal, node, dataset_size: int, show_progress: bool
    ):
        if show_progress:
            bar = tqdm(range(dataset_size))
            bar.set_description(node.name)
        else:
            bar = range(dataset_size)
        try:
            for dataset_element in bar:
                old_element = cache.state_up_to(node, dataset_element)
                new_element = node.transform(old_element)
                self._update_step(cache, node, dataset_element, new_element)
        except Exception as e:
            logger.error(f"There was an error when transforming with {node.name}")
            raise

    def transform(self, df: Union[pd.DataFrame, Iterable[pd.DataFrame]]):
        if isinstance(df, pd.DataFrame):
            return self._run([df], fit=False)
        else:
            return self._run(df, fit=False)

Traversal

CachedGraphTraversal

Iterator for a graph nodes.

The cache data structures has the following form Current Node -> Previous Nodes -> [Transformed Dataset]

  • cache[n]: contains a dict with one key for each previous node
  • cache[n][n.previous[0]] A list with each element of the dataset transformed in up to n.previous[0]

Parameters:

root_nodes: Initial nodes of the graph
dataset: Each node visit the dataset
cache_path: Where to store the cache
cache_type: Mode for storing the intermediate steps
Source code in ceruleo/transformation/functional/pipeline/traversal.py
class CachedGraphTraversal:
    """Iterator for a graph nodes. 


    The cache data structures has the following form
    Current Node -> Previous Nodes -> [Transformed Dataset]

    * cache[n]:
        contains a dict with one key for each previous node
    * cache[n][n.previous[0]]
        A list with each element of the dataset transformed in
        up to n.previous[0]


    Parameters:

        root_nodes: Initial nodes of the graph
        dataset: Each node visit the dataset
        cache_path: Where to store the cache
        cache_type: Mode for storing the intermediate steps

    """
    def __init__(
        self,
        root_nodes,
        dataset,
        cache_path: Optional[Path] = CACHE_PATH,
        cache_type: CacheStoreType = CacheStoreType.SHELVE,
    ):
        if cache_type == CacheStoreType.SHELVE:
            self.transformed_cache = GraphTraversalCacheShelveStore(cache_path)
        elif cache_type == CacheStoreType.MEMORY:
            self.transformed_cache = GraphTraversalCacheMemory()

        for r in root_nodes:
            for i, df in enumerate(dataset):
                self.transformed_cache[encode_tuple((r, None, i))] = df

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.transformed_cache.close()

    def clear_cache(self):
        self.transformed_cache.claer()

    def state_up_to(self, current_node: TransformerStep, dataset_element: int):

        previous_node = current_node.previous

        if len(previous_node) > 1:
            return [
                self.transformed_cache[encode_tuple((current_node, p, dataset_element))]
                for p in previous_node
            ]
        else:
            if len(previous_node) == 1:
                previous_node = previous_node[0]
            else:
                previous_node = None

            return self.transformed_cache[
                encode_tuple((current_node, previous_node, dataset_element))
            ]

    def clean_state_up_to(self, current_node: TransformerStep, dataset_element: int):

        previous_node = current_node.previous
        for p in previous_node:
            self.transformed_cache[
                encode_tuple((current_node, p, dataset_element))
            ] = None

    def store(
        self,
        next_node: Optional[TransformerStep],
        node: TransformerStep,
        dataset_element: int,
        new_element: pd.DataFrame,
    ):
        self.transformed_cache[
            encode_tuple((next_node, node, dataset_element))
        ] = new_element

    def remove_state(self, nodes: Union[TransformerStep, List[TransformerStep]]):
        if not isinstance(nodes, list):
            nodes = [nodes]
        for n in nodes:
            keys_to_remove = self.get_keys_of(n)
            for k in keys_to_remove:
                self.transformed_cache.pop(k)

    def get_keys_of(self, n):
        return [
            k
            for k in self.transformed_cache.keys()
            if decode_tuple(k)[0] == str(hash(n))
        ]

Utils

plot_pipeline(pipe, name)

Plot the transformation pipeline

Parameters:

pipe: The pipeline
name: Title of the graphic

Returns:

graphic: the diagram
Source code in ceruleo/transformation/functional/pipeline/utils.py
def plot_pipeline(pipe: "TemporisPipeline", name: str):
    """Plot the transformation pipeline

    Parameters:

        pipe: The pipeline
        name: Title of the graphic

    Returns:

        graphic: the diagram

    """
    import graphviz
    from ceruleo.transformation.functional.pipeline.pipeline import \
        Pipeline

    dot = graphviz.Digraph(name, comment="Transformation graph")

    node_name = {}
    for i, node in enumerate(nodes(pipe)):
        node_name[node] = str(i) + node.name
        dot.node(str(i) + node.name, label=str(node))

    for (e1, e2) in edges(pipe):
        dot.edge(node_name[e1], node_name[e2])

    return dot