Source code for datareactor.dataset

import logging
import os

import pandas as pd

from metad import MetaData

logger = logging.getLogger(__name__)


def _read_csv_dtypes(table_meta):
    """Get the dtypes specification that needs to be passed to read_csv."""
    dtypes = dict()
    for field in table_meta['fields']:
        if 'data_type' not in field:
            continue
        field_type = field['data_type']
        if field_type == 'categorical':
            dtypes[field['name']] = str
        elif field_type == 'id' and field.get('data_subtype', 'integer') == 'string':
            dtypes[field['name']] = str
    return dtypes


def _parse_dtypes(data, table_meta):
    """Convert the data columns to the right dtype after loading the CSV."""
    for field in table_meta['fields']:
        if 'data_type' not in field:
            continue
        field_type = field['data_type']
        if field_type == 'datetime':
            datetime_format = field.get('format')
            data[field['name']] = pd.to_datetime(
                data[field['name']], format=datetime_format, exact=False)
        elif field_type == 'numerical' and field.get('data_subtype') == 'integer':
            data[field['name']] = data[field['name']].dropna().astype(int)
        elif field_type == 'id' and field.get('data_subtype', 'integer') == 'integer':
            data[field['name']] = data[field['name']].dropna().astype(int)
    return data


[docs]class DerivedColumn(): """Derived column with known lineage. The `DerivedColumn` represents a derived column which belongs to the specified table. Attributes: table_name (str): The name of the table the column will belong to. values (list): A series of values which can be added to the table. field (dict): A field object which specifies the name and data type. constraint (:obj:`dict`, None): An optional constraint object. """ table_name = None values = None field = None constraint = None
[docs]class Dataset(): """Read and write `metad` datasets. The `Dataset` object provides methods for reading datasets, adding derived columns, and writing datasets. Attributes: path_to_dataset: The path to the dataset files. metadata: The metad.MetaData object. tables: A dictionary mapping table names to dataframes. """ def __init__(self, path_to_dataset): """Read the dataset from disk. Args: path_to_dataset (str): The path to the dataset. """ self.path_to_dataset = os.path.expanduser(path_to_dataset) path_to_metadata = os.path.join(self.path_to_dataset, "metadata.json") self.metadata = MetaData.from_json(path_to_metadata) self.tables = {} for table_name in self.metadata.get_table_names(): path_to_csv = os.path.join(self.path_to_dataset, "%s.csv" % table_name) table_metadata = self.metadata.get_table(table_name) dtypes = _read_csv_dtypes(table_metadata) self.tables[table_name] = pd.read_csv(path_to_csv, dtype=dtypes) _parse_dtypes(self.tables[table_name], table_metadata) logger.info("Loaded table %s (%s rows, %s cols)" % ( table_name, len(self.tables[table_name]), len(self.tables[table_name].columns) ))
[docs] def export(self, path_to_dataset): """Write the dataset to disk. This writes the dataset to the target directory. It creates the tables as CSV files and writes the metadata as a JSON file. Args: path_to_dataset (str): The path to the dataset. """ path_to_dataset = os.path.expanduser(path_to_dataset) os.makedirs(path_to_dataset, exist_ok=True) self.metadata.to_json(os.path.join(path_to_dataset, "metadata.json")) for table_name, dataframe in self.tables.items(): path_to_csv = os.path.join(path_to_dataset, "%s.csv" % table_name) dataframe.to_csv(path_to_csv, index=False) logger.info("Exported table %s (%s rows, %s cols)" % ( table_name, len(self.tables[table_name]), len(self.tables[table_name].columns) ))
[docs] def add_columns(self, columns): """Add the derived columns to the dataset. This modifies the tables and metadata in-place and adds the specified derived columns. Args: columns (:obj:`list` of :obj:`DerivedColumn`): The derived columns. """ for col in columns: if col.field["name"] in self.tables[col.table_name].columns: raise ValueError("Column aleady exists!") self.tables[col.table_name][col.field["name"]] = col.values self.metadata.add_field(col.table_name, col.field) if col.constraint: self.metadata.add_constraint(col.constraint)