Source code for datareactor.atoms.featuretools
import functools
import logging
import featuretools as ft
from datareactor.atoms.base import Atom
from datareactor.dataset import DerivedColumn
logger = logging.getLogger(__name__)
[docs]class FeatureToolsAtom(Atom):
"""Generate derived columns with featuretools.
The `FeatureToolsAtom` generates derived columns based on the features
generated by the `featuretools` library.
"""
[docs] def derive(self, dataset, table_name):
"""Generate features with featuretools.
Note that featuretools does not support all of the types of
relational datasets supported by Metadata.JSON.
Args:
dataset (Dataset): The dataset.
table_name (str): The name of the target table.
Returns:
(:obj:`list` of :obj:`DerivedColumn`): The derived columns.
"""
es = self._to_entityset(dataset)
try:
es[table_name]
except BaseException:
return
feature_matrix, feature_defs = ft.dfs(entityset=es, target_entity=table_name, max_depth=1)
for (feature_name, feature_column), feature_def in zip(
feature_matrix.iteritems(), feature_defs):
derived_column = DerivedColumn()
derived_column.table_name = table_name
derived_column.values = feature_column.values
derived_column.field = {
"name": feature_name,
"data_type": "numerical"
}
arguments = feature_def.get_arguments()
if "base_features" in arguments:
skip_feature = False
related_fields = []
for feature in arguments["base_features"]:
related_table_name, related_field_name = map(
lambda x: x.strip(), feature.split(":"))
if related_field_name == "_ft_id":
skip_feature = True
related_fields.append({
"table": related_table_name,
"field": related_field_name
})
derived_column.constraint = {
"constraint_type": "lineage",
"related_fields": related_fields,
"fields_under_consideration": [
{"table": table_name, "field": feature_name}
],
"expression": "datareactor.atoms.FeatureToolsAtom"
}
if not skip_feature:
yield derived_column
@functools.lru_cache()
def _to_entityset(self, dataset):
es = ft.EntitySet()
for table_name, df in dataset.tables.items():
if len(df.columns) == 1:
continue # skipping single column tables
table = dataset.metadata.get_table(table_name)
primary_key = table["primary_key"] if "primary_key" in table else None
if isinstance(primary_key, str):
es = es.entity_from_dataframe(
entity_id=table_name, dataframe=df.copy(), index=primary_key)
else:
es = es.entity_from_dataframe(
entity_id=table_name,
dataframe=df.copy(),
make_index=True,
index="_ft_id")
if not primary_key:
logger.warning("Table %s has no primary key.", table_name)
else:
logger.warning(
"Table %s has a composite primary key, it will be ignored.",
table_name)
for foreign_key in dataset.metadata.get_foreign_keys():
if foreign_key["table"] not in es.entity_dict:
continue
if foreign_key["ref_table"] not in es.entity_dict:
continue
if not isinstance(foreign_key["ref_field"], str):
logger.warning(
"Tables %s and %s have a composite foreign key, it will be ignored.",
foreign_key["ref_table"],
foreign_key["table"])
continue
try:
es = es.add_relationship(ft.Relationship(
es[foreign_key["ref_table"]][foreign_key["ref_field"]],
es[foreign_key["table"]][foreign_key["field"]]
))
except ValueError as err:
logger.warning(err)
return es