Source code for datareactor.atoms.featuretools

import functools
import logging

import featuretools as ft

from datareactor.atoms.base import Atom
from datareactor.dataset import DerivedColumn

logger = logging.getLogger(__name__)


[docs]class FeatureToolsAtom(Atom): """Generate derived columns with featuretools. The `FeatureToolsAtom` generates derived columns based on the features generated by the `featuretools` library. """
[docs] def derive(self, dataset, table_name): """Generate features with featuretools. Note that featuretools does not support all of the types of relational datasets supported by Metadata.JSON. Args: dataset (Dataset): The dataset. table_name (str): The name of the target table. Returns: (:obj:`list` of :obj:`DerivedColumn`): The derived columns. """ es = self._to_entityset(dataset) try: es[table_name] except BaseException: return feature_matrix, feature_defs = ft.dfs(entityset=es, target_entity=table_name, max_depth=1) for (feature_name, feature_column), feature_def in zip( feature_matrix.iteritems(), feature_defs): derived_column = DerivedColumn() derived_column.table_name = table_name derived_column.values = feature_column.values derived_column.field = { "name": feature_name, "data_type": "numerical" } arguments = feature_def.get_arguments() if "base_features" in arguments: skip_feature = False related_fields = [] for feature in arguments["base_features"]: related_table_name, related_field_name = map( lambda x: x.strip(), feature.split(":")) if related_field_name == "_ft_id": skip_feature = True related_fields.append({ "table": related_table_name, "field": related_field_name }) derived_column.constraint = { "constraint_type": "lineage", "related_fields": related_fields, "fields_under_consideration": [ {"table": table_name, "field": feature_name} ], "expression": "datareactor.atoms.FeatureToolsAtom" } if not skip_feature: yield derived_column
@functools.lru_cache() def _to_entityset(self, dataset): es = ft.EntitySet() for table_name, df in dataset.tables.items(): if len(df.columns) == 1: continue # skipping single column tables table = dataset.metadata.get_table(table_name) primary_key = table["primary_key"] if "primary_key" in table else None if isinstance(primary_key, str): es = es.entity_from_dataframe( entity_id=table_name, dataframe=df.copy(), index=primary_key) else: es = es.entity_from_dataframe( entity_id=table_name, dataframe=df.copy(), make_index=True, index="_ft_id") if not primary_key: logger.warning("Table %s has no primary key.", table_name) else: logger.warning( "Table %s has a composite primary key, it will be ignored.", table_name) for foreign_key in dataset.metadata.get_foreign_keys(): if foreign_key["table"] not in es.entity_dict: continue if foreign_key["ref_table"] not in es.entity_dict: continue if not isinstance(foreign_key["ref_field"], str): logger.warning( "Tables %s and %s have a composite foreign key, it will be ignored.", foreign_key["ref_table"], foreign_key["table"]) continue try: es = es.add_relationship(ft.Relationship( es[foreign_key["ref_table"]][foreign_key["ref_field"]], es[foreign_key["table"]][foreign_key["field"]] )) except ValueError as err: logger.warning(err) return es