Skip to content

Extraction

ChangesDetector

Bases: TransformerStep

Compute how many changes there are in a categorical variable

['a', 'a', 'b', 'c] -> [0, 0, 1, 1]

Source code in ceruleo/transformation/features/extraction.py
class ChangesDetector(TransformerStep):
    """Compute how many changes there are in a categorical variable


    ['a', 'a', 'b', 'c] -> [0, 0, 1, 1]
    """

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Appply the transformation to the input life

        Parameters:
            X: The input life 

        Returns:
            A DataFrame with boolean values representing weather changes were applied to the input variable or not
        """
        return X != X.shift(axis=0)

transform(X)

Appply the transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A DataFrame with boolean values representing weather changes were applied to the input variable or not

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Appply the transformation to the input life

    Parameters:
        X: The input life 

    Returns:
        A DataFrame with boolean values representing weather changes were applied to the input variable or not
    """
    return X != X.shift(axis=0)

ColumnWiseSum

Bases: TransformerStep

Compute the column-wise sum each column

Parameters:

Name Type Description Default
column_name str

Name of the unique column which is returned

required
Source code in ceruleo/transformation/features/extraction.py
class ColumnWiseSum(TransformerStep):
    """
    Compute the column-wise sum each column

    Parameters:
        column_name: Name of the unique column which is returned 
    """

    def __init__(self, column_name: str, name: Optional[str] = None):
        super().__init__(name=name)
        self.column_name = column_name

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """ 
        Apply the transformation to the input life 

        Parameters:
            X: The input life 

        Returns:
            A single-column DataFrame containing the column-wise sum for each input sample
        """
        return pd.DataFrame(X.sum(axis=1), columns=[self.column_name])

transform(X)

Apply the transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A single-column DataFrame containing the column-wise sum for each input sample

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """ 
    Apply the transformation to the input life 

    Parameters:
        X: The input life 

    Returns:
        A single-column DataFrame containing the column-wise sum for each input sample
    """
    return pd.DataFrame(X.sum(axis=1), columns=[self.column_name])

Difference

Bases: TransformerStep

Compute the difference between two set of features

Example:

X[features1] - X[features2]

Parameters:

Name Type Description Default
feature_set1 List[str]

Feature list of the first group to substract

required
feature_set2 List[str]

Feature list of the second group to substract

required
name Optional[str]

Name of the step, by default None

None
Source code in ceruleo/transformation/features/extraction.py
class Difference(TransformerStep):
    """Compute the difference between two set of features

    Example:

        X[features1] - X[features2]

    Parameters:
        feature_set1: Feature list of the first group to substract
        feature_set2:Feature list of the second group to substract
        name: Name of the step, by default None
    """

    def __init__(
        self, *, feature_set1: List[str], feature_set2: List[str], name: Optional[str] = None
    ):
        super().__init__(name=name)
        if len(feature_set1) != len(feature_set2):
            raise ValueError(
                "Feature set 1 and feature set 2 must have the same length"
            )
        self.feature_set1 = feature_set1
        self.feature_set2 = feature_set2
        self.feature_names_computed = False

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """ 
        Apply the transformation to the input life 

        Parameters:
            X: The input life

        Returns: 
            A DataFrame with two columns containing the result of the differences between the two sets of input features 
        """
        if not self.feature_names_computed:
            self.feature_set1 = [self.find_feature(X, c) for c in self.feature_set1]
            self.feature_set2 = [self.find_feature(X, c) for c in self.feature_set2]
            feature_names_computed = True
        new_X = X[self.feature_set1].copy()
        new_X = new_X - X[self.feature_set2].values
        return new_X

transform(X)

Apply the transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A DataFrame with two columns containing the result of the differences between the two sets of input features

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """ 
    Apply the transformation to the input life 

    Parameters:
        X: The input life

    Returns: 
        A DataFrame with two columns containing the result of the differences between the two sets of input features 
    """
    if not self.feature_names_computed:
        self.feature_set1 = [self.find_feature(X, c) for c in self.feature_set1]
        self.feature_set2 = [self.find_feature(X, c) for c in self.feature_set2]
        feature_names_computed = True
    new_X = X[self.feature_set1].copy()
    new_X = new_X - X[self.feature_set2].values
    return new_X

EMD

Bases: TransformerStep

Compute the empirical mode decomposition of each feature

Parameters:

Name Type Description Default
n int

Number of modes to compute

required
name Optional[str]

Name of the step, by default None

'EMD'
Source code in ceruleo/transformation/features/extraction.py
class EMD(TransformerStep):
    """Compute the empirical mode decomposition of each feature

    Parameters:
        n: Number of modes to compute
        name: Name of the step, by default None
    """

    def __init__(self, *, n: int, name: Optional[str] = "EMD"):
        super().__init__(name=name)
        self.n = n

    def transform(self, X:pd.DataFrame) -> pd.DataFrame:
        """ Apply transformation to the input life

        Parameters:
            X: The input life

        Returns:
            A DataFrame where the number of columns is n times the one of the input life, since each features is substituted by the n modes of its EMD

        """
        new_X = pd.DataFrame(index=X.index)
        for c in X.columns:
            try:
                imf = emd.sift.sift(X[c].values, max_imfs=self.n)
                for j in range(self.n):
                    if j < imf.shape[1]:
                        new_X[f"{c}_{j}"] = imf[:, j]
                    else:
                        new_X[f"{c}_{j}"] = np.nan
            except Exception as e:
                for j in range(self.n):
                    new_X[f"{c}_{j}"] = np.nan

        return new_X

transform(X)

Apply transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A DataFrame where the number of columns is n times the one of the input life, since each features is substituted by the n modes of its EMD

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X:pd.DataFrame) -> pd.DataFrame:
    """ Apply transformation to the input life

    Parameters:
        X: The input life

    Returns:
        A DataFrame where the number of columns is n times the one of the input life, since each features is substituted by the n modes of its EMD

    """
    new_X = pd.DataFrame(index=X.index)
    for c in X.columns:
        try:
            imf = emd.sift.sift(X[c].values, max_imfs=self.n)
            for j in range(self.n):
                if j < imf.shape[1]:
                    new_X[f"{c}_{j}"] = imf[:, j]
                else:
                    new_X[f"{c}_{j}"] = np.nan
        except Exception as e:
            for j in range(self.n):
                new_X[f"{c}_{j}"] = np.nan

    return new_X

EMDFilter

Bases: TransformerStep

Filter the signals using Empirical Mode decomposition

Parameters:

Name Type Description Default
n int

Number of modes

required
min_imf int

Min Intrinsic Mode Function

required
max_imf int

Max Intrinsic Mode Function

required
Source code in ceruleo/transformation/features/extraction.py
class EMDFilter(TransformerStep):
    """
    Filter the signals using Empirical Mode decomposition

    Parameters:
        n: Number of modes
        min_imf: Min Intrinsic Mode Function
        max_imf: Max Intrinsic Mode Function
    """

    def __init__(
        self, *, n: int, min_imf: int, max_imf: int, name: Optional[str] = "EMD"
    ):
        super().__init__(name=name)
        self.n = n
        self.min_imf = min_imf
        self.max_imf = max_imf

    def transform(self, X:pd.DataFrame) -> pd.DataFrame:
        """ 
        Apply the transformation to the input life

        Parameters:
            X: The input life 

        Returns: 
            A DataFrame with the same shape of the input life and with the result of the EMD Filter application
        """
        new_X = pd.DataFrame(index=X.index)

        for c in X.columns:
            try:
                imf = emd.sift.sift(X[c].values, max_imfs=self.n)
                new_X[c] = np.sum(imf[:, self.min_imf : self.max_imf], axis=1)
            except Exception as e:
                new_X[c] = X[c]

        return new_X

transform(X)

Apply the transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A DataFrame with the same shape of the input life and with the result of the EMD Filter application

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X:pd.DataFrame) -> pd.DataFrame:
    """ 
    Apply the transformation to the input life

    Parameters:
        X: The input life 

    Returns: 
        A DataFrame with the same shape of the input life and with the result of the EMD Filter application
    """
    new_X = pd.DataFrame(index=X.index)

    for c in X.columns:
        try:
            imf = emd.sift.sift(X[c].values, max_imfs=self.n)
            new_X[c] = np.sum(imf[:, self.min_imf : self.max_imf], axis=1)
        except Exception as e:
            new_X[c] = X[c]

    return new_X

ExpandingStatistics

Bases: TransformerStep

Compute diverse number of features using an expandign window

For each feature present in the life a number of feature will be computed for each time stamp

The possible features are:

  • Kurtosis
  • Skewness
  • Max
  • Min
  • Std
  • Peak
  • Impulse
  • Clearance
  • RMS
  • Shape
  • Crest
  • Hurst

Parameters:

Name Type Description Default
min_points int

The minimun number of points of the expanding window, by default 2

2
to_compute List[str]

List of the features to compute, by default None. Valid values are: 'kurtosis', 'skewness', 'max', 'min', 'std', 'peak', 'impulse','clearance', 'rms', 'shape', 'crest', 'hurst'

None
name Optional[str]

Name of the step, by default None

None
Source code in ceruleo/transformation/features/extraction.py
class ExpandingStatistics(TransformerStep):
    """Compute diverse number of features using an expandign window

    For each feature present in the life a number of feature will be computed for each time stamp

    The possible features are:

    - Kurtosis
    - Skewness
    - Max
    - Min
    - Std
    - Peak
    - Impulse
    - Clearance
    - RMS
    - Shape
    - Crest
    - Hurst


    Parameters:
        min_points: The minimun number of points of the expanding window, by default 2
        to_compute: List of the features to compute, by default None. Valid values are: 'kurtosis', 'skewness', 'max', 'min', 'std', 'peak', 'impulse','clearance', 'rms', 'shape', 'crest', 'hurst'
        name: Name of the step, by default None

    """

    def __init__(
        self,
        *,
        min_points: int=2,
        to_compute: List[str] = None,
        specific: Optional[Dict[str, List[str]]] = None,
        name: Optional[str] = None,
    ):
        super().__init__(name=name)
        self.min_points = min_points
        valid_stats = [
            "kurtosis",
            "skewness",
            "max",
            "min",
            "std",
            "peak",
            "impulse",
            "clearance",
            "rms",
            "shape",
            "crest",
            "mean",
            "deviance",
            "std_atan",
            "energy",
            "std_acosh",
            "std_asinh",
        ]
        not_default = ["energy", "deviance"]
        if to_compute is not None and specific is not None:
            raise ValueError("Only one of to_compute or specific should be used")
        self.specific = specific
        self.to_compute = to_compute
        if to_compute is None:
            if specific is None:
                self.to_compute = list(set(valid_stats) - set(not_default))
            else:
                self.specific = specific
        else:
            for f in to_compute:
                if f not in valid_stats:
                    raise ValueError(
                        f"Invalid feature to compute {f}. Valids are {valid_stats}"
                    )
            self.to_compute = to_compute

    def partial_fit(self, X, y=None):
        return self

    def fit(self, X, y=None):
        return self

    def _std_asinh(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return x.apply(np.arcsinh).expanding(self.min_points).std(numeric_only=True)

    def _std_acosh(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return x.apply(np.arccosh).expanding(self.min_points).std(numeric_only=True)

    def _energy(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return x.pow(2).expanding(self.min_points).sum(numeric_only=True)

    def _std_atan(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return x.apply(np.arctan).expanding(self.min_points).std(numeric_only=True)

    def _kurtosis(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.kurt(numeric_only=True)

    def _skewness(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.skew(numeric_only=True)

    def _max(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.max(numeric_only=True)

    def _min(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.min(numeric_only=True)

    def _std(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.std(numeric_only=True)

    def _peak(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.max(numeric_only=True) - s.min(numeric_only=True)

    def _impulse(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return self._peak(x, s, s_abs, s_abs_sqrt, s_sq) / s_abs.mean()

    def _deviance(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return (x - s.mean(numeric_only=True)) / (s.std(numeric_only=True) + 0.00000000001)

    def _clearance(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return self._peak(x, s, s_abs, s_abs_sqrt, s_sq) / s_abs_sqrt.mean().pow(2)

    # def _hurst(
    #    self,
    #    x: pd.Series,
    #    s: Expanding,
    #    s_abs: Expanding,
    #    s_abs_sqrt: Expanding,
    #    s_sq: Expanding,
    # ):
    #    return s.apply(lambda s: hurst_exponent(s, method="RS"))

    def _rms(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s_sq.mean(numeric_only=True).pow(1 / 2.0)

    def _mean(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return s.mean(numeric_only=True)

    def _shape(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return self._rms(x, s, s_abs, s_abs_sqrt, s_sq) / s_abs.mean(numeric_only=True)

    def _crest(
        self,
        x: pd.Series,
        s: Expanding,
        s_abs: Expanding,
        s_abs_sqrt: Expanding,
        s_sq: Expanding,
    ):
        return self._peak(x, s, s_abs, s_abs_sqrt, s_sq) / self._rms(
            x, s, s_abs, s_abs_sqrt, s_sq
        )

    def _compute_column_names(self, X: pd.DataFrame):
        columns = []
        if self.to_compute is not None:
            for stats in self.to_compute:
                for c in X.columns:
                    columns.append(f"{c}_{stats}")
        else:
            for c in self.specific.keys():
                for stats in self.specific[c]:
                    columns.append(f"{c}_{stats}")
        return columns

    def _transform_all_features(
        self, X: pd.DataFrame, X_new: pd.DataFrame, expanding, s_abs, s_abs_sqrt, s_sq
    ):
        for stats in self.to_compute:
            columns_to_assign = [f"{c}_{stats}" for c in X.columns]
            out = getattr(self, f"_{stats}")(X, expanding, s_abs, s_abs_sqrt, s_sq)
            X_new.loc[:, columns_to_assign] = out.values

    def _transform_specific(
        self, X: pd.DataFrame, X_new: pd.DataFrame, expanding, s_abs, s_abs_sqrt, s_sq
    ):
        for c in self.specific.keys():
            for stats in self.specific[c]:
                feature = f"{c}_{stats}"
                out = getattr(self, f"_{stats}")(
                    X[c], expanding[c], s_abs[c], s_abs_sqrt[c], s_sq[c]
                )
                X_new.loc[:, feature] = out.values

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Compute features from the given life

        Parameters:
            X: The input life

        Returns:
            A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f,
        """

        columns = self._compute_column_names(X)

        X_new = pd.DataFrame(index=X.index, columns=columns)
        expanding = X.expanding(self.min_points)
        s_abs = X.abs().expanding(self.min_points)
        s_abs_sqrt = X.abs().pow(1.0 / 2).expanding(self.min_points)
        s_sq = X.pow(2).expanding(self.min_points)
        if self.to_compute is not None:
            self._transform_all_features(X, X_new, expanding, s_abs, s_abs_sqrt, s_sq)
        else:
            self._transform_specific(X, X_new, expanding, s_abs, s_abs_sqrt, s_sq)
        return X_new

transform(X)

Compute features from the given life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f,

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Compute features from the given life

    Parameters:
        X: The input life

    Returns:
        A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f,
    """

    columns = self._compute_column_names(X)

    X_new = pd.DataFrame(index=X.index, columns=columns)
    expanding = X.expanding(self.min_points)
    s_abs = X.abs().expanding(self.min_points)
    s_abs_sqrt = X.abs().pow(1.0 / 2).expanding(self.min_points)
    s_sq = X.pow(2).expanding(self.min_points)
    if self.to_compute is not None:
        self._transform_all_features(X, X_new, expanding, s_abs, s_abs_sqrt, s_sq)
    else:
        self._transform_specific(X, X_new, expanding, s_abs, s_abs_sqrt, s_sq)
    return X_new

HashingEncodingCategorical

Bases: TransformerStep

Compute a simple numerical encoding for a given feature

Parameters:

Name Type Description Default
nbins int

Number of bins after the hash

required
feature Optional[str]

Feature name from which compute the simple encoding

None
name Optional[str]

Step name

None
Source code in ceruleo/transformation/features/extraction.py
class HashingEncodingCategorical(TransformerStep):
    """Compute a simple numerical encoding for a given feature

    Parameters:
        nbins: Number of bins after the hash
        feature: Feature name from which compute the simple encoding
        name: Step name
    """

    def __init__(
        self, *, nbins: int, feature: Optional[str] = None, name: Optional[str] = None
    ):
        super().__init__(name=name)
        self.nbins = nbins
        self.feature = feature
        self.categories = set()
        self.encoder = None

    def transform(self, X: pd.DataFrame, y: Optional[type]=None) -> pd.DataFrame:
        """
        Return a new DataFrame with the feature  encoded with integer numbers

        Parameters;
            X: The input life
            y: [type], optional

        Parameters:
            X: The input life

        Returns:
            A new dataframe with the same index as the input with 1 column containing the encoding of the input feature. 
        """

        def hash(x):
            if isinstance(x, int):
                x = x.to_bytes((x.bit_length() + 7) // 8, "little")
            return (mmh3.hash(x) & 0xFFFFFFFF) % self.nbins

        if self.feature is None:
            self.feature = X.columns[0]
        X_new = pd.DataFrame(index=X.index)
        X_new["encoding"] = X[self.feature].map(hash)
        return X_new

transform(X, y=None)

Return a new DataFrame with the feature encoded with integer numbers

Parameters; X: The input life y: [type], optional

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A new dataframe with the same index as the input with 1 column containing the encoding of the input feature.

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame, y: Optional[type]=None) -> pd.DataFrame:
    """
    Return a new DataFrame with the feature  encoded with integer numbers

    Parameters;
        X: The input life
        y: [type], optional

    Parameters:
        X: The input life

    Returns:
        A new dataframe with the same index as the input with 1 column containing the encoding of the input feature. 
    """

    def hash(x):
        if isinstance(x, int):
            x = x.to_bytes((x.bit_length() + 7) // 8, "little")
        return (mmh3.hash(x) & 0xFFFFFFFF) % self.nbins

    if self.feature is None:
        self.feature = X.columns[0]
    X_new = pd.DataFrame(index=X.index)
    X_new["encoding"] = X[self.feature].map(hash)
    return X_new

Interactions

Bases: TransformerStep

Compute pairwise interactions between the features

Source code in ceruleo/transformation/features/extraction.py
class Interactions(TransformerStep):
    """Compute pairwise interactions between the features"""

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """ 
        Apply the transformation to one life 

        Parameters:
            X: The input life

        Returns:
            DataFrame containing the pairwise interaction values 

        """
        X_new = pd.DataFrame(index=X.index)
        for c1, c2 in itertools.combinations(X.columns, 2):
            X_new[f"{c1}_{c2}"] = X[c1] * X[c2]
        return X_new

transform(X)

Apply the transformation to one life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

DataFrame containing the pairwise interaction values

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """ 
    Apply the transformation to one life 

    Parameters:
        X: The input life

    Returns:
        DataFrame containing the pairwise interaction values 

    """
    X_new = pd.DataFrame(index=X.index)
    for c1, c2 in itertools.combinations(X.columns, 2):
        X_new[f"{c1}_{c2}"] = X[c1] * X[c2]
    return X_new

LifeStatistics

Bases: TransformerStep

Compute diverse number of features for each life.

Returns a 1 row with the statistics computed for every feature

The possible features are:

  • Kurtosis
  • Skewness
  • Max
  • Min
  • Std
  • Peak
  • Impulse
  • Clearance
  • RMS
  • Shape
  • Crest
  • Hurst

Parameters:

Name Type Description Default
to_compute Optional[List[str]]

List of the features to compute, by default None. Valid values are:'kurtosis', 'skewness', 'max', 'min', 'std', 'peak', 'impulse','clearance', 'rms', 'shape', 'crest', 'hurst'

None
name Optional[str]

Name of the step, by default None

None
Source code in ceruleo/transformation/features/extraction.py
class LifeStatistics(TransformerStep):
    """Compute diverse number of features for each life.

    Returns a 1 row with the statistics computed for every feature


    The possible features are:

    - Kurtosis
    - Skewness
    - Max
    - Min
    - Std
    - Peak
    - Impulse
    - Clearance
    - RMS
    - Shape
    - Crest
    - Hurst


    Parameters:
        to_compute: List of the features to compute, by default None. Valid values are:'kurtosis', 'skewness', 'max', 'min', 'std', 'peak', 'impulse','clearance', 'rms', 'shape', 'crest', 'hurst'
        name: Name of the step, by default None
    """

    def __init__(
        self, *, to_compute: Optional[List[str]] = None, name: Optional[str] = None
    ):
        super().__init__(name=name)
        valid_stats = [
            "kurtosis",
            "skewness",
            "max",
            "min",
            "std",
            "peak",
            "impulse",
            "clearance",
            "rms",
            "shape",
            "crest",
        ]
        if to_compute is None:
            self.to_compute = valid_stats
        else:
            for f in to_compute:
                if f not in valid_stats:
                    raise ValueError(
                        f"Invalid feature to compute {f}. Valids are {valid_stats}"
                    )
            self.to_compute = to_compute

    def partial_fit(self, X, y=None):
        return self

    def fit(self, X, y=None):
        return self

    def _kurtosis(self, s: pd.Series):
        return s.kurt(skipna=True)

    def _skewness(self, s: pd.Series):
        return s.skew(skipna=True)

    def _max(self, s: pd.Series):
        return s.max(skipna=True)

    def _min(self, s: pd.Series):
        return s.min(skipna=True)

    def _std(self, s: pd.Series):
        return s.std(skipna=True)

    def _peak(self, s: pd.Series):
        return s.max(skipna=True) - s.min(skipna=True)

    def _impulse(self, s: pd.Series):
        m = s.abs().mean()
        if m > 0:
            return self._peak(s) / m
        else:
            return 0

    def _clearance(self, s: pd.Series):
        m = s.abs().pow(1.0 / 2).mean()
        if m > 0:
            return (self._peak(s) / m) ** 2
        else:
            return 0

    def _rms(self, s: pd.Series):
        return np.sqrt(s.pow(2).mean(skipna=True))

    def _shape(self, s: pd.Series):
        m = s.abs().mean(skipna=True)
        if m > 0:
            return self._rms(s) / m
        else:
            return 0

    def _crest(self, s: pd.Series):
        m = self._rms(s)
        if m > 0:
            return self._peak(s) / m
        else:
            return 0

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Compute features from the given life

        Parameters:
            X: The input life

        Returns:
            A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f,
        """
        X_new = pd.DataFrame(index=[0])
        for c in X.columns:
            for stats in self.to_compute:
                X_new[f"{c}_{stats}"] = getattr(self, f"_{stats}")(X[c])
        return X_new

transform(X)

Compute features from the given life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f,

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Compute features from the given life

    Parameters:
        X: The input life

    Returns:
        A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f,
    """
    X_new = pd.DataFrame(index=[0])
    for c in X.columns:
        for stats in self.to_compute:
            X_new[f"{c}_{stats}"] = getattr(self, f"_{stats}")(X[c])
    return X_new

OneHotCategorical

Bases: TransformerStep

Compute a one-hot encoding for a given feature

Parameters:

Name Type Description Default
feature Optional[str]

Feature name from which compute the one-hot encoding

None
name Optional[str]

Step name, by default None

None
Source code in ceruleo/transformation/features/extraction.py
class OneHotCategorical(TransformerStep):
    """Compute a one-hot encoding for a given feature

    Parameters:
        feature: Feature name from which compute the one-hot encoding
        name: Step name, by default None
    """

    def __init__(
        self,
        *,
        feature: Optional[str] = None,
        categories: Optional[List[any]] = None,
        name: Optional[str] = None,
    ):
        super().__init__(name=name)
        self.feature = feature
        self.categories = categories
        self.fixed_categories = True
        if self.categories is None:
            self.categories = set()
            self.fixed_categories = False
        self.encoder = None

    def partial_fit(self, X: pd.DataFrame, y=None):
        if self.fixed_categories:
            return self
        if self.feature is None:
            self.feature = X.columns[0]
        self.categories.update(set(X[self.feature].unique()))
        return self

    def fit(self, X: pd.DataFrame, y=None):
        if self.fixed_categories:
            return self
        if self.feature is None:
            self.feature = X.columns[0]
        self.categories.update(set(X[self.feature].unique()))
        return self

    def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        """
        Apply the transformation to the input life

        Parameters:
            X: The input life

        Returns:
            A DataFrame with shape equal to (X.shape[0],n_unique(feature)) containin the One Hot Encoding for the input feature
        """
        categories = sorted(list([c for c in self.categories if c is not None]))
        d = pd.Categorical(X[self.feature], categories=categories)

        df = pd.get_dummies(d)
        df.index = X.index
        return df

transform(X, y=None)

Apply the transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A DataFrame with shape equal to (X.shape[0],n_unique(feature)) containin the One Hot Encoding for the input feature

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
    """
    Apply the transformation to the input life

    Parameters:
        X: The input life

    Returns:
        A DataFrame with shape equal to (X.shape[0],n_unique(feature)) containin the One Hot Encoding for the input feature
    """
    categories = sorted(list([c for c in self.categories if c is not None]))
    d = pd.Categorical(X[self.feature], categories=categories)

    df = pd.get_dummies(d)
    df.index = X.index
    return df

RollingStatistics

Bases: TransformerStep

Compute diverse number of features using an rolling window.

For each feature present in the life a number of feature will be computed for each time stamp

The possible features are:

Time domain:

  • Kurtosis
  • Skewness
  • Max
  • Min
  • Std
  • Peak
  • Impulse
  • Clearance
  • RMS
  • Shape
  • Crest

Parameters:

Name Type Description Default
window int

Size of the rolling window, by default 15

15
min_points

The minimun number of points of the expanding window

2
to_compute Optional[List[str]]

Name of features to compute. Possible values are: 'kurtosis', 'skewness', 'max', 'min', 'std', 'peak', 'impulse', 'clearance', 'rms', 'shape', 'crest'

None
name Optional[str]

Name of the step, by default None

None
Source code in ceruleo/transformation/features/extraction.py
class RollingStatistics(TransformerStep):
    """Compute diverse number of features using an rolling window.

    For each feature present in the life a number of feature will be computed for each time stamp

    The possible features are:

    Time domain:

    - Kurtosis
    - Skewness
    - Max
    - Min
    - Std
    - Peak
    - Impulse
    - Clearance
    - RMS
    - Shape
    - Crest

    Parameters:
        window: Size of the rolling window, by default 15
        min_points: The minimun number of points of the expanding window
        to_compute: Name of features to compute. Possible values are: 'kurtosis', 'skewness', 'max', 'min', 'std', 'peak', 'impulse', 'clearance', 'rms', 'shape', 'crest'
        name: Name of the step, by default None

    """

    def __init__(
        self,
        *,
        window: int = 15,
        min_points=2,
        to_compute: Optional[List[str]] = None,
        specific: Optional[Dict[str, List[str]]] = None,
        name: Optional[str] = None,
    ):
        super().__init__(name=name)
        self.window = window
        self.min_points = min_points
        valid_stats = [
            "mean",
            "kurtosis",
            "skewness",
            "max",
            "min",
            "std",
            "peak",
            "impulse",
            "clearance",
            "rms",
            "shape",
            "crest",
            "deviance",
            "std_atan",
            "std_acosh",
            "std_asinh",
            "energy",
        ]

        if to_compute is not None and specific is not None:
            raise ValueError("Only one of to_compute or specific should be used")
        self.specific = specific
        self.to_compute = to_compute
        if to_compute is None:
            if specific is None:
                self.to_compute = valid_stats
            else:
                self.specific = specific
        else:
            for f in to_compute:
                if f not in valid_stats:
                    raise ValueError(
                        f"Invalid feature to compute {f}. Valids are {valid_stats}"
                    )
            self.to_compute = to_compute

    def partial_fit(self, X, y=None):
        return self

    def fit(self, X, y=None):
        return self

    def _std_asinh(self, X, rolling, abs_rolling):
        return (
            X.apply(np.arcsinh).rolling(self.window, self.min_points).std(numeric_only=True)
        )

    def _std_acosh(self, X, rolling, abs_rolling):
        return (
            X.apply(np.arccosh).rolling(self.window, self.min_points).std(numeric_only=True)
        )

    def _energy(self, X, rolling, abs_rolling):
        return X.pow(2).rolling(self.window, self.min_points).sum()

    def _std_atan(self, X, rolling, abs_rolling):
        return (
            X.apply(np.arctan)
            .rolling(self.window, self.min_points)
            .std(numeric_only=True)
        )

    def _mean(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.mean(numeric_only=True)

    def _kurtosis(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.kurt(numeric_only=True)

    def _skewness(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.skew(numeric_only=True)

    def _max(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.max(numeric_only=True)

    def _min(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.min(numeric_only=True)

    def _std(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.std(numeric_only=True)

    def _peak(self, X, rolling: Rolling, abs_rolling: Rolling):
        return rolling.max(numeric_only=True) - rolling.min(numeric_only=True)

    def _impulse(self, X, rolling: Rolling, abs_rolling: Rolling):
        return self._peak(X, rolling, abs_rolling) / abs_rolling.mean()

    def _deviance(self, X, rolling: Rolling, abs_rolling: Rolling):
        return (X - rolling.mean()) / rolling.std()

    def _clearance(self, X, rolling: Rolling, abs_rolling: Rolling):
        return self._peak(X, rolling, abs_rolling) / X.abs().pow(1.0 / 2).rolling(
            self.window, self.min_points
        ).mean().pow(2)

    def _rms(self, X, rolling: Rolling, abs_rolling: Rolling):
        return (
            X.pow(2)
            .rolling(self.window, self.min_points)
            .mean(numeric_only=True)
            .pow(1 / 2.0)
        )

    def _shape(self, X, rolling: Rolling, abs_rolling: Rolling):
        return self._rms(X, rolling, abs_rolling) / abs_rolling.mean(numeric_only=True)

    def _crest(self, X, rolling, abs_rolling):
        return self._peak(X, rolling, abs_rolling) / self._rms(X, rolling, abs_rolling)

    def _compute_column_names(self, X: pd.DataFrame):
        columns = []
        if self.to_compute is not None:
            for stats in self.to_compute:
                for c in X.columns:
                    columns.append(f"{c}_{stats}")
        else:
            for c in self.specific.keys():
                for stats in self.specific[c]:
                    columns.append(f"{c}_{stats}")
        return columns

    def _transform_all_features(
        self, X: pd.DataFrame, X_new: pd.DataFrame, rolling, abs_rolling
    ):
        for stats in self.to_compute:
            columns_to_assign = [f"{c}_{stats}" for c in X.columns]
            out = getattr(self, f"_{stats}")(X, rolling, abs_rolling)
            X_new.loc[:, columns_to_assign] = out.values

    def _transform_specific(
        self, X: pd.DataFrame, X_new: pd.DataFrame, rolling, abs_rolling
    ):
        for c in self.specific.keys():
            for stats in self.specific[c]:
                feature = f"{c}_{stats}"
                out = getattr(self, f"_{stats}")(X[c], rolling[c], abs_rolling[c])
                X_new.loc[:, feature] = out.values

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """
        Compute features from the given life

        Parameters:
            X: The input life

        Returns:
            A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f
        """
        columns = self._compute_column_names(X)

        X_new = pd.DataFrame(index=X.index, columns=columns)
        rolling = X.rolling(self.window, self.min_points)
        abs_rolling = X.abs().rolling(self.window, self.min_points)
        if self.to_compute is not None:
            self._transform_all_features(X, X_new, rolling, abs_rolling)
        else:
            self._transform_specific(X, X_new, rolling, abs_rolling)
        return X_new

transform(X)

Compute features from the given life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """
    Compute features from the given life

    Parameters:
        X: The input life

    Returns:
        A new DataFrame with one row and with n columns. Let m be the number of features of the life and f the len(to_compute), then n = m x f
    """
    columns = self._compute_column_names(X)

    X_new = pd.DataFrame(index=X.index, columns=columns)
    rolling = X.rolling(self.window, self.min_points)
    abs_rolling = X.abs().rolling(self.window, self.min_points)
    if self.to_compute is not None:
        self._transform_all_features(X, X_new, rolling, abs_rolling)
    else:
        self._transform_specific(X, X_new, rolling, abs_rolling)
    return X_new

SampleNumber

Bases: TransformerStep

Return a column with increasing number

Source code in ceruleo/transformation/features/extraction.py
class SampleNumber(TransformerStep):
    """Return a column with increasing number"""

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """ 
        Apply the transformation to the input life

        Parameters:
            X: The input life

        Returns:
            A DataFrame with increasing sample indexes. 
        """
        df = pd.DataFrame(index=X.index)
        df["sample_number"] = list(range(X.shape[0]))
        return df

transform(X)

Apply the transformation to the input life

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A DataFrame with increasing sample indexes.

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """ 
    Apply the transformation to the input life

    Parameters:
        X: The input life

    Returns:
        A DataFrame with increasing sample indexes. 
    """
    df = pd.DataFrame(index=X.index)
    df["sample_number"] = list(range(X.shape[0]))
    return df

SimpleEncodingCategorical

Bases: TransformerStep

Compute a simple numerical encoding for a given feature

Parameters:

Name Type Description Default
feature Optional[str]

Feature name from which compute the simple encoding

None
name Optional[str]

Step name, by default None

None
Source code in ceruleo/transformation/features/extraction.py
class SimpleEncodingCategorical(TransformerStep):
    """Compute a simple numerical encoding for a given feature

    Parameters:
        feature: Feature name from which compute the simple encoding
        name: Step name, by default None
    """

    def __init__(self, *, feature: Optional[str] = None, name: Optional[str] = None):
        super().__init__(name=name)
        self.feature = feature
        self.categories = set()
        self.encoder = None

    def partial_fit(self, X: pd.DataFrame, y=None) -> "SimpleEncodingCategorical":
        """Compute incrementally the set of possible categories

        Parameters:
            X: The input life

        Returns:
            Instance of class SimpleEncodingCategorical
        """
        if self.feature is None:
            self.feature = X.columns[0]

        self.categories.update(set(X[self.feature].unique()))

        return self

    def fit(self, X: pd.DataFrame, y=None) -> "SimpleEncodingCategorical":
        """
        Compute the set of possible categories

        Parameters:
            X: The input life

        Returns:
            Instance of class SimpleEncodingCategorical
        """
        if self.feature is None:
            self.feature = X.columns[0]

        self.categories.update(set(X[self.feature].unique()))
        return self

    def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        """Return a new DataFrame with the feature  encoded with integer numbers

        Parameters:
            X: The input life

        Returns:
            A new dataframe with the same index as the input with 1 column with the Simple Encoding of the input feature. 
        """
        categories = sorted(list([c for c in self.categories if c is not None]))
        d = pd.Categorical(X[self.feature], categories=categories)
        return pd.DataFrame({"encoding": d.codes}, index=X.index)

fit(X, y=None)

Compute the set of possible categories

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
SimpleEncodingCategorical

Instance of class SimpleEncodingCategorical

Source code in ceruleo/transformation/features/extraction.py
def fit(self, X: pd.DataFrame, y=None) -> "SimpleEncodingCategorical":
    """
    Compute the set of possible categories

    Parameters:
        X: The input life

    Returns:
        Instance of class SimpleEncodingCategorical
    """
    if self.feature is None:
        self.feature = X.columns[0]

    self.categories.update(set(X[self.feature].unique()))
    return self

partial_fit(X, y=None)

Compute incrementally the set of possible categories

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
SimpleEncodingCategorical

Instance of class SimpleEncodingCategorical

Source code in ceruleo/transformation/features/extraction.py
def partial_fit(self, X: pd.DataFrame, y=None) -> "SimpleEncodingCategorical":
    """Compute incrementally the set of possible categories

    Parameters:
        X: The input life

    Returns:
        Instance of class SimpleEncodingCategorical
    """
    if self.feature is None:
        self.feature = X.columns[0]

    self.categories.update(set(X[self.feature].unique()))

    return self

transform(X, y=None)

Return a new DataFrame with the feature encoded with integer numbers

Parameters:

Name Type Description Default
X DataFrame

The input life

required

Returns:

Type Description
DataFrame

A new dataframe with the same index as the input with 1 column with the Simple Encoding of the input feature.

Source code in ceruleo/transformation/features/extraction.py
def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
    """Return a new DataFrame with the feature  encoded with integer numbers

    Parameters:
        X: The input life

    Returns:
        A new dataframe with the same index as the input with 1 column with the Simple Encoding of the input feature. 
    """
    categories = sorted(list([c for c in self.categories if c is not None]))
    d = pd.Categorical(X[self.feature], categories=categories)
    return pd.DataFrame({"encoding": d.codes}, index=X.index)

SlidingNonOverlappingWaveletDecomposition

Bases: TransformerStep

TODO TEST

X = signal coeffs = pywt.wavedec(X, 'db1', level=level) A4 = wrcoef(X, 'a', coeffs, 'db1', level) D4 = wrcoef(X, 'd', coeffs, 'db1', level) D3 = wrcoef(X, 'd', coeffs, 'db1', 3) D2 = wrcoef(X, 'd', coeffs, 'db1', 2) D1 = wrcoef(X, 'd', coeffs, 'db1', 1) r = A4 + D4 + D3 + D2 + D1 assert(np.mean(r-X) < 0.00000)

Parameters

TransformerStep : [type] [description]

Source code in ceruleo/transformation/features/extraction.py
class SlidingNonOverlappingWaveletDecomposition(TransformerStep):
    """

    # TODO TEST
    X = signal
    coeffs = pywt.wavedec(X, 'db1', level=level)
    A4 = wrcoef(X, 'a', coeffs, 'db1', level)
    D4 = wrcoef(X, 'd', coeffs, 'db1', level)
    D3 = wrcoef(X, 'd', coeffs, 'db1', 3)
    D2 = wrcoef(X, 'd', coeffs, 'db1', 2)
    D1 = wrcoef(X, 'd', coeffs, 'db1', 1)
    r = A4 + D4 + D3 + D2 + D1
    assert(np.mean(r-X) < 0.00000)

    Parameters
    ----------
    TransformerStep : [type]
        [description]
    """

    def __init__(
        self, *, window_size: int, level: int, wavelet: str, keep: List[str], **kwargs
    ):
        super().__init__(*kwargs)
        self.wavelet = wavelet
        self.level = level
        self.keep = keep
        self.window_size = window_size
        self.strides = window_size

    def transform(self, X: pd.DataFrame):
        def _wavelet(values: np.ndarray):
            coeffs = pywt.wavedec(values, self.wavelet, level=self.level)
            out = np.zeros((values.shape[0], len(self.keep)))
            for i, s in enumerate(self.keep):
                part, level = s
                out[:, i] = wrcoef(
                    values, part.lower(), coeffs, self.wavelet, int(level)
                )
            return out

        column_list = []
        for c in X.columns:
            for name in self.keep:
                column_list.append(f"wavelet_{name}_{c}")
        out = pd.DataFrame(index=X.index, columns=column_list, dtype=np.float32)
        for c in X.columns:
            wv_computed = apply_rolling_data(
                X[c].values, _wavelet, self.window_size, self.strides
            )

            out.loc[:, [f"wavelet_{name}_{c}" for name in self.keep]] = wv_computed
        return out

TimeToPreviousBinaryValue

Bases: TransformerStep

Return a column with increasing number

Source code in ceruleo/transformation/features/extraction.py
class TimeToPreviousBinaryValue(TransformerStep):
    """Return a column with increasing number"""

    def time_to_previous_event(self, X: pd.DataFrame, c: str):
        def min_idex(group):
            if group.iloc[0, 0] == 0:
                return np.nan
            else:
                return np.min(group.index)

        X_c_cumsum = X[[c]].cumsum()
        min_index = X_c_cumsum.groupby(c).apply(min_idex)
        X_merged = X_c_cumsum.merge(
            pd.DataFrame(min_index, columns=["start"]), left_on=c, right_index=True
        )
        return X_merged.index - X_merged["start"]

    def transform(self, X: pd.DataFrame):
        new_X = pd.DataFrame(index=X.index)
        for c in X.columns:
            new_X[f"ttp_{c}"] = self.time_to_previous_event(X, c)
        return new_X