Source code for datareactor.atoms.aggregation

import logging

import pandas as pd

from datareactor import DerivedColumn
from datareactor.atoms.base import Atom

logger = logging.getLogger(__name__)


[docs]class AggregationAtom(Atom): """Apply aggregation functions to child rows. The `AggregationAtom` generates derived columns which are the resultt of applying aggregation functions to groups of child rows. """
[docs] def derive(self, dataset, table_name): """Apply pandas aggregation functions to groups of rows. Returns: (:obj:`list` of :obj:`DerivedColumn`): The derived columns. """ seen = set() for fk in dataset.metadata.get_foreign_keys(table_name): if fk["table"] == table_name: # Skip this relationship if the target table is the child continue for op, op_name in [ (lambda x: x.sum(), "sum"), (lambda x: x.max(), "max"), (lambda x: x.min(), "min"), ]: logger.info("Applying aggregator %s to foreign key %s -> %s." % ( op_name, fk["ref_table"], fk["table"] )) # Count the number of rows for each key. child_table = dataset.tables[fk["table"]].copy() if len(child_table.columns) <= 1: continue child_table = child_table.set_index(fk["field"]).select_dtypes("number") child_counts = op(child_table.groupby(fk["field"])) column_names = list(child_counts.columns) child_counts.columns = ["%s(%s)" % (op_name, col) for col in column_names] # Merge the counts into the parent table parent_table = dataset.tables[fk["ref_table"]].copy() parent_table = pd.merge( parent_table.reset_index(), child_counts.reset_index(), how='left', left_on=fk["ref_field"], right_on=fk["field"] ).set_index(fk["ref_field"]) for column_name, derived_name in zip(column_names, child_counts.columns): if parent_table[derived_name].dtype.kind != "f": continue if derived_name in seen: continue seen.add(derived_name) values = parent_table[derived_name].fillna(0.0).values column = DerivedColumn() column.table_name = table_name column.values = values column.field = { "name": derived_name, "data_type": "numerical" } column.constraint = { "constraint_type": "lineage", "related_fields": [ {"table": fk["table"], "field": column_name} ], "fields_under_consideration": [ {"table": fk["ref_table"], "field": derived_name} ], "expression": "datareactor.atoms.AggregationAtom" } yield column