Source code for rfwtools.extractor.tsf
"""This module provides tsfresh-based time series statistical feature extraction tools.
Typically, these will be used by DataSet.produce_feature_set(). However there is no reason why these can't be run
externally.
Basic Usage Example:
::
from rfwtools.data_set import DataSet
from rfwtools.extractor.tsf import tsfresh_extractor
# Setup a DataSet object and get some example data to work with
ds = DataSet()
ds.load_example_set_csv("my_example_set.csv")
# Get a single example to work on
ex = ds.example_set.loc[0, 'example']
# Run on one example with defaults
tsfresh_extractor(ex)
# Run on one example with only 2 signals being processed
tsfresh_extractor(ex, signals=['1_GMES', '1_PMES'])
# Run on one example, but only include values before the fault on set.
tsfresh_extractor(ex, query="Time < 0")
# Run this on every example in the example set and produce a corresponding feature set for pre-fault signal data.
ds.produce_feature_set(tsfresh_extractor, query="Time < 0")
"""
import re
import pandas as pd
from typing import Union, List
from .utils import get_example_data
from tsfresh import extract_features
from tsfresh.feature_extraction import EfficientFCParameters
from tsfresh.utilities.dataframe_functions import impute
from ..example import Example
from ..utils import get_signal_names
[docs]def tsfresh_extractor(example: Example, signals: List[str] = None, query: str = None,
impute_function: Union[callable, None] = impute, disable_progress_bar: bool = True,
n_jobs: int = 0, default_fc_parameters: dict = None, **kwargs) -> pd.DataFrame:
"""Use tsfresh to extract features specified.
This is a thin wrapper over tsfresh.feature_extraction.extraction.extract_features. See that method for more
details.
Arguments:
example:
The Example for which features are extracted
signals:
A list of signal names to extract features from. Default: combination of cavities 1-8 and waveforms =
['GMES', 'GASK', 'CRFP', 'DETA2']
query:
Argument passed to the ex.event_df to filter data prior to feature extraction, e.g. "Time <= 0".
impute_function:
The function used to impute missing values about the data
disable_progress_bar:
Should the progress bar be displayed?
n_jobs:
The number of jobs should be run concurrently. Defaults to zero, which disables parallelization.
default_fc_parameters:
mapping of feature calculator names to parameters. If None, defaults to EfficientFCParameters(). See
tsfresh.feature_extraction.extraction.extract_features for more details.
**kwargs:
All other key word arguments are passed directly to tsfresh.extract_features
Returns:
A DataFrame of the calculated features.
"""
# Get the Example's data
event_df = get_example_data(example, query)
# List of signals for feature extraction
sel_col = signals
if signals is None:
sel_col = get_signal_names(cavities=['1', '2', '3', '4', '5', '6', '7', '8'],
waveforms=["GMES", "GASK", "CRFP", "DETA2"])
# Set the default feature parameters
if default_fc_parameters is None:
default_fc_parameters = EfficientFCParameters()
# Get the data that matches the request
event_df = event_df[["Time"] + sel_col]
# Add the ID column tsfresh wants. Mostly useless here since we only give tsfresh a single example at a time.
event_df.insert(loc=0, column='id', value=1)
# Do the feature extraction
feature_df = extract_features(event_df.astype('float64'),
column_id="id",
column_sort="Time",
impute_function=impute_function,
default_fc_parameters=default_fc_parameters,
disable_progressbar=disable_progress_bar,
n_jobs=n_jobs,
**kwargs
).reset_index()
feature_df.drop(columns='index', inplace=True)
return feature_df
[docs]def tsfresh_extractor_faulted_cavity(example: Example, waveforms: List[str] = None, query: str = None,
impute_function: Union[callable, None] = impute, disable_progress_bar: bool = True,
n_jobs: int = 0, default_fc_parameters: dict = None,
**kwargs) -> Union[pd.DataFrame, None]:
"""Use tsfresh to extract features for only the cavity that faulted. Returns None if cavity_label=='0'.
This is a thin wrapper over tsfresh.feature_extraction.extraction.extract_features. See that method for more
details.
Arguments:
example:
The Example for which features are extracted
waveforms:
A list of waveform names to extract features from. Default is ['GMES', 'GASK', 'CRFP', 'DETA2'].
query:
Argument passed to the ex.event_df to filter data prior to feature extraction, e.g. "Time <= 0".
impute_function:
The function used to impute missing values about the data
disable_progress_bar:
Should the progress bar be displayed?
n_jobs:
The number of jobs should be run concurrently. Defaults to zero, which disables parallelization.
default_fc_parameters:
mapping of feature calculator names to parameters. If None, defaults to EfficientFCParameters(). See
tsfresh.feature_extraction.extraction.extract_features for more details.
**kwargs:
All other key word arguments are passed directly to tsfresh.extract_features
Returns:
A DataFrame of the calculated features or None if cavity_label=='0'.
"""
if example.cavity_label == "0":
return None
# Get the Example's data
event_df = get_example_data(example, query)
# List of signals for feature extraction
sel_col = get_signal_names(cavities=example.cavity_label, waveforms=["GMES", "GASK", "CRFP", "DETA2"])
if waveforms is not None:
sel_col = get_signal_names(cavities=example.cavity_label, waveforms=waveforms)
# Set the default feature parameters
if default_fc_parameters is None:
default_fc_parameters = EfficientFCParameters()
# Get the requested columns for the cavity that faulted. Then drop the cavity id from the column name so features
# for all examples will have same column names.
event_df = event_df[["Time"] + sel_col]
event_df = event_df.rename(lambda x: re.sub('\d_', '', x), axis='columns')
# Add the ID column tsfresh wants. Mostly useless here since we only give tsfresh a single example at a time.
event_df.insert(loc=0, column='id', value=1)
# Do the feature extraction
feature_df = extract_features(event_df.astype('float64'),
column_id="id",
column_sort="Time",
impute_function=impute_function,
default_fc_parameters=default_fc_parameters,
disable_progressbar=disable_progress_bar,
n_jobs=n_jobs,
**kwargs
).reset_index()
feature_df.drop(columns='index', inplace=True)
return feature_df