Source code for ads.dataset.dataset

#!/usr/bin/env python
# -*- coding: utf-8 -*--

# Copyright (c) 2020, 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from __future__ import print_function, absolute_import, division

import copy
import datetime
import fsspec
import numpy as np
import os
import pandas as pd
import uuid

from collections import Counter
from sklearn.preprocessing import FunctionTransformer
from typing import Iterable, Tuple, Union

from ads import set_documentation_mode
from ads.common import utils
from ads.common.decorator.deprecate import deprecated
from ads.dataset import helper, logger
from ads.dataset.dataframe_transformer import DataFrameTransformer
from ads.dataset.exception import ValidationError
from ads.dataset.helper import (
    convert_columns,
    fix_column_names,
    generate_sample,
    DatasetDefaults,
    deprecate_default_value,
    deprecate_variable,
    get_dataset,
    infer_target_type,
)
from ads.dataset.label_encoder import DataFrameLabelEncoder
from ads.dataset.pipeline import TransformerPipeline
from ads.dataset.progress import DummyProgressBar
from ads.dataset.sampled_dataset import PandasDataset
from ads.type_discovery.type_discovery_driver import TypeDiscoveryDriver
from ads.dataset.helper import get_feature_type
from ads.dataset.correlation_plot import plot_correlation_heatmap
from ads.dataset.correlation import (
    _cat_vs_cts,
    _cat_vs_cat,
    _get_columns_by_type,
    _validate_correlation_methods,
)
from ads.common.decorator.runtime_dependency import (
    runtime_dependency,
    OptionalDependency,
)

N_Features_Wide_Dataset = 64


pd.set_option("display.max_colwidth", None)


[docs] class ADSDataset(PandasDataset): """ An ADSDataset Object. The ADSDataset object cannot be used for classification or regression problems until a target has been set using `set_target`. To see some rows in the data use any of the usual Pandas functions like `head()`. There are also a variety of converters, to_dask, to_pandas, to_h2o, to_xgb, to_csv, to_parquet, to_json & to_hdf . """ df_read_functions = ["head", "describe", "_get_numeric_data"] def __init__( self, df, sampled_df=None, shape=None, name="", description=None, type_discovery=True, types={}, metadata=None, progress=DummyProgressBar(), transformer_pipeline=None, interactive=False, **kwargs, ): # # to keep performance high and linear no matter the size of the distributed dataset we # create a pandas df that's used internally because this has a fixed upper size. # if shape is None: shape = df.shape if sampled_df is None: sampled_df = generate_sample( df, shape[0], DatasetDefaults.sampling_confidence_level, DatasetDefaults.sampling_confidence_interval, **kwargs, ) super().__init__( sampled_df, type_discovery=type_discovery, types=types, metadata=metadata, progress=progress, ) self.df = fix_column_names(df) self.name = name self.description = description self.shape = shape # store these args to reapply when building a new dataset for delegate operations on dataframe self.init_kwargs = {**kwargs, "type_discovery": type_discovery} if transformer_pipeline is None: # Update transformer pipeline to convert column types and fix names self.transformer_pipeline = TransformerPipeline( steps=[ ( "prepare", FunctionTransformer(func=fix_column_names, validate=False), ) ] ) self.transformer_pipeline = self._update_transformer_pipeline( steps=[ ( "type_discovery", FunctionTransformer( func=convert_columns, validate=False, kw_args={"dtypes": self.sampled_df.dtypes}, ), ) ] ) else: self.transformer_pipeline = transformer_pipeline def __repr__(self): rows, cols = self.shape return f"{rows:,} rows, {cols:,} columns" def __len__(self): return self.shape[0]
[docs] @staticmethod def from_dataframe( df, sampled_df=None, shape=None, name="", description=None, type_discovery=True, types={}, metadata=None, progress=DummyProgressBar(), transformer_pipeline=None, interactive=False, **kwargs, ) -> "ADSDataset": return ADSDataset( df=df, sampled_df=sampled_df, shape=shape, name=name, description=description, type_discovery=type_discovery, types=types, metadata=metadata, progress=progress, transformer_pipeline=transformer_pipeline, interactive=interactive, **kwargs, )
@property @deprecated( "2.5.2", details="The ddf attribute is deprecated. Use the df attribute." ) def ddf(self): return self.df
[docs] @deprecated( "2.5.2", details="The compute method is deprecated. Use the df attribute." ) def compute(self): return self.df
@runtime_dependency( module="ipywidgets", object="HTML", install_from=OptionalDependency.NOTEBOOK ) @runtime_dependency(module="IPython", install_from=OptionalDependency.NOTEBOOK) def _repr_html_(self): from IPython.core.display import display, HTML display( HTML( utils.horizontal_scrollable_div( self.sampled_df.head(5) .style.set_table_styles(utils.get_dataframe_styles()) .set_table_attributes("class=table") .hide_index() .render() ) ) ) def _head(self, n=5): """ Return the first `n` rows of the dataset. Parameters ---------- n : int, default 5 Number of rows to select. Returns ------- dataset_head : pandas.DataFrame The first `n` rows of the dataset Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("classfication_data.csv")) >>> ds.head() * displays the first 5 rows of the dataset, just as the traditional head() function would * """ df = self.df.head(n=n) # # we could just return the above but, jupyterlab doesn't render these well # when the width exceeds the screen area. To address that we wrap the dataframe # with a class that has an optimized _repr_html_ handler, this object # extends the pandas dataframe so it can still be used as-a dataframe # class FormattedDataFrame(pd.DataFrame): def __init__(self, *args, **kwargs): super(FormattedDataFrame, self).__init__(*args, **kwargs) @property def _constructor(self): return FormattedDataFrame @runtime_dependency( module="ipywidgets", object="HTML", install_from=OptionalDependency.NOTEBOOK, ) @runtime_dependency( module="IPython", install_from=OptionalDependency.NOTEBOOK ) def _repr_html_(self): from IPython.core.display import display, HTML display( HTML( utils.horizontal_scrollable_div( self.style.set_table_styles(utils.get_dataframe_styles()) .set_table_attributes("class=table") .hide_index() .render() ) ) ) return None def __repr__(self): return "{} rows, {} columns".format(*self.shape) return FormattedDataFrame(df)
[docs] def call(self, func, *args, sample_size=None, **kwargs): r""" Runs a custom function on dataframe func will receive the pandas dataframe (which represents the dataset) as an argument named 'df' by default. This can be overridden by specifying the dataframe argument name in a tuple (func, dataframe_name). Parameters ---------- func: Union[callable, tuple] Custom function that takes pandas dataframe as input Alternatively a (callable, data) tuple where data is a string indicating the keyword of callable that expects the dataframe name args: iterable, optional Positional arguments passed into func sample_size: int, Optional To use a sampled dataframe kwargs: mapping, optional A dictionary of keyword arguments passed into func Returns ------- func: function a plotting function that contains `*args` and `**kwargs` Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("classfication_data.csv")) >>> def f1(df): ... return(sum(df), axis=0) >>> sum_ds = ds.call(f1) """ data = "df" if isinstance(func, tuple): func, data = func if data in kwargs: raise ValueError( "'%s' is both the data argument and a keyword argument" % data ) if sample_size is None: # user has asked not to do sampling df = self.df.copy() else: df = self.df.sample(n=sample_size) kwargs[data] = df return func(*args, **kwargs)
[docs] def set_target(self, target, type_discovery=True, target_type=None): """ Returns a dataset tagged based on the type of target. Parameters ---------- target: str name of the feature to use as target. type_discovery: bool This is set as True by default. target_type: type If provided, then the target will be typed with the provided value. Returns ------- ds: ADSDataset tagged according to the type of the target column. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("classfication_data.csv")) >>> ds_with_target= ds.set_target("target_class") """ if target_type: target_series = self.sampled_df[target].astype(target_type) else: target_series = self.sampled_df[target] return get_dataset( self.df, self.sampled_df, target, infer_target_type(target, target_series, type_discovery), self.shape, **self.init_kwargs, )
[docs] @deprecated("2.5.2", details="Instead use `to_pandas`.") def to_pandas_dataframe( self, filter=None, frac=None, include_transformer_pipeline=False ): return self.to_pandas( filter=filter, frac=frac, include_transformer_pipeline=include_transformer_pipeline, )
[docs] def to_pandas(self, filter=None, frac=None, include_transformer_pipeline=False): """ Returns a copy of the data as pandas.DataFrame, and a sklearn pipeline optionally that holds the transformations run so far on the data. The pipeline returned can be updated with the transformations done offline and passed along with the dataframe to Dataset.open API if the transformations need to be reproduced at the time of scoring. Parameters ---------- filter: str, optional The query string to filter the dataframe, for example ds.to_pandas(filter="age > 50 and location == 'san francisco") See also https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html frac: float, optional fraction of original data to return. include_transformer_pipeline: bool, default: False If True, (dataframe, transformer_pipeline) is returned as a tuple Returns ------- dataframe : pandas.DataFrame if include_transformer_pipeline is False. (data, transformer_pipeline): tuple of pandas.DataFrame and dataset.pipeline.TransformerPipeline if include_transformer_pipeline is True. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_as_df = ds.to_pandas() Notes ----- See also https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline """ df = self.df.query(filter) if filter is not None else self.df.copy() if frac is not None: df = df.sample(frac=frac) return ( (df, copy.deepcopy(self.transformer_pipeline)) if include_transformer_pipeline else df )
[docs] @deprecated("2.5.2", details="Instead use `to_dask`.") def to_dask_dataframe( self, filter=None, frac=None, npartitions=None, include_transformer_pipeline=False, ): return self.to_dask( filter=filter, frac=frac, npartitions=npartitions, include_transformer_pipeline=include_transformer_pipeline, )
[docs] @runtime_dependency(module="dask.dataframe", short_name="dd") def to_dask( self, filter=None, frac=None, npartitions=None, include_transformer_pipeline=False, ): """ Returns a copy of the data as dask.dataframe.core.DataFrame, and a sklearn pipeline optionally that holds the transformations run so far on the data. The pipeline returned can be updated with the transformations done offline and passed along with the dataframe to Dataset.open API if the transformations need to be reproduced at the time of scoring. Parameters ---------- filter: str, optional The query string to filter the dataframe, for example ds.to_dask(filter="age > 50 and location == 'san francisco") See also https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html frac: float, optional fraction of original data to return. include_transformer_pipeline: bool, default: False If True, (dataframe, transformer_pipeline) is returned as a tuple. Returns ------- dataframe : dask.dataframe.core.DataFrame if include_transformer_pipeline is False. (data, transformer_pipeline): tuple of dask.dataframe.core.DataFrame and dataset.pipeline.TransformerPipeline if include_transformer_pipeline is True. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_dask = ds.to_dask() Notes ----- See also http://docs.dask.org/en/latest/dataframe-api.html#dataframe and https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline """ res = self.to_pandas( filter=filter, frac=frac, include_transformer_pipeline=include_transformer_pipeline, ) return ( (dd.from_pandas(res[0], npartitions=npartitions), res[1]) if include_transformer_pipeline else dd.from_pandas(res, npartitions=npartitions) )
[docs] @deprecated("2.5.2", details="Instead use `to_h2o`.") def to_h2o_dataframe( self, filter=None, frac=None, include_transformer_pipeline=False ): return self.to_h2o( filter=filter, frac=frac, include_transformer_pipeline=include_transformer_pipeline, )
[docs] @runtime_dependency(module="h2o") def to_h2o(self, filter=None, frac=None, include_transformer_pipeline=False): """ Returns a copy of the data as h2o.H2OFrame, and a sklearn pipeline optionally that holds the transformations run so far on the data. The pipeline returned can be updated with the transformations done offline and passed along with the dataframe to Dataset.open API if the transformations need to be reproduced at the time of scoring. Parameters ---------- filter: str, optional The query string to filter the dataframe, for example ds.to_h2o(filter="age > 50 and location == 'san francisco") See also https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html frac: float, optional fraction of original data to return. include_transformer_pipeline: bool, default: False If True, (dataframe, transformer_pipeline) is returned as a tuple. Returns ------- dataframe : h2o.H2OFrame if include_transformer_pipeline is False. (data, transformer_pipeline): tuple of h2o.H2OFrame and dataset.pipeline.TransformerPipeline if include_transformer_pipeline is True. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_as_h2o = ds.to_h2o() Notes ----- See also https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline """ res = self.to_pandas( filter=filter, frac=frac, include_transformer_pipeline=include_transformer_pipeline, ) return ( (h2o.H2OFrame(res[0]), res[1]) if include_transformer_pipeline else h2o.H2OFrame(res) )
[docs] @deprecated("2.5.2", details="Instead use `to_xgb`.") def to_xgb_dmatrix( self, filter=None, frac=None, include_transformer_pipeline=False ): return self.to_xgb( filter=filter, frac=frac, include_transformer_pipeline=include_transformer_pipeline, )
[docs] @runtime_dependency(module="xgboost", install_from=OptionalDependency.BOOSTED) def to_xgb(self, filter=None, frac=None, include_transformer_pipeline=False): """ Returns a copy of the data as xgboost.DMatrix, and a sklearn pipeline optionally that holds the transformations run so far on the data. The pipeline returned can be updated with the transformations done offline and passed along with the dataframe to Dataset.open API if the transformations need to be reproduced at the time of scoring. Parameters ---------- filter: str, optional The query string to filter the dataframe, for example ds.to_xgb(filter="age > 50 and location == 'san francisco") See also https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html frac: float, optional fraction of original data to return. include_transformer_pipeline: bool, default: False If True, (dataframe, transformer_pipeline) is returned as a tuple. Returns ------- dataframe : xgboost.DMatrix if include_transformer_pipeline is False. (data, transformer_pipeline): tuple of xgboost.DMatrix and dataset.pipeline.TransformerPipeline if include_transformer_pipeline is True. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> xgb_dmat = ds.to_xgb() Notes ----- See also https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html#sklearn.pipeline.Pipeline """ res = self.to_pandas( filter=filter, frac=frac, include_transformer_pipeline=include_transformer_pipeline, ) df = res[0] if include_transformer_pipeline else res le = DataFrameLabelEncoder() df = le.fit_transform(df) if include_transformer_pipeline: res[1].add(le) xgb_matrix = xgboost.DMatrix(df) return (xgb_matrix, res[1]) if include_transformer_pipeline else xgb_matrix
[docs] def sample(self, frac=None, random_state=utils.random_state): """ Returns random sample of dataset. Parameters ---------- frac : float, optional Fraction of axis items to return. random_state : int or ``np.random.RandomState`` If int we create a new RandomState with this as the seed Otherwise we draw from the passed RandomState Returns ------- sampled_dataset: ADSDataset An ADSDataset which was randomly sampled. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_sample = ds.sample() """ df = self.df.sample(frac=frac, random_state=random_state) return self._build_new_dataset(df)
[docs] def drop_columns(self, columns): """ Return new dataset with specified columns removed. Parameters ---------- columns : str or list columns to drop. Returns ------- dataset: same type as the caller a dataset with specified columns dropped. Raises ------ ValidationError If any of the feature names is not found in the dataset. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_smaller = ds.drop_columns(['col1', 'col2']) """ self._validate_feature(columns) return self.drop(columns, axis=1)
[docs] def assign_column(self, column, arg): """ Return new dataset with new column or values of the existing column mapped according to input correspondence. Used for adding a new column or substituting each value in a column with another value, that may be derived from a function, a :class:`pandas.Series` or a :class:`pandas.DataFrame`. Parameters ---------- column : str Name of the feature to update. arg : function, dict, Series or DataFrame Mapping correspondence. Returns ------- dataset: same type as the caller a dataset with the specified column assigned. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_same_size = ds.assign_column('target',lambda x: x>15 if x not None) >>> ds_bigger = ds.assign_column('new_col', np.arange(ds.shape[0])) """ target_name = ( self.target.name if not utils.is_same_class(self, ADSDataset) else None ) if isinstance(arg, Iterable) or isinstance(arg, ADSDataset): df = self.df.copy() if type(arg) == pd.DataFrame: col_to_add = arg elif type(arg) == ADSDataset: col_to_add = arg.df elif type(arg) == dict: col_to_add = pd.DataFrame.from_dict(arg) elif type(arg) in [list, np.ndarray]: col_to_add = pd.DataFrame(arg, columns=["new_col"]) elif type(arg) == pd.Series: col_to_add = arg.rename("new_col").to_frame() elif utils._is_dask_dataframe(arg): col_to_add = arg.compute() elif utils._is_dask_series(arg): col_to_add = arg.compute().rename("new_col").to_frame() else: raise ValueError( f"assign_column currently does not support arg of type {type(arg)}. Reformat " f"as types: Pandas, numpy, list, or dict" ) if column in df.columns: df = df.drop(columns=column) new_df = pd.concat([df, col_to_add], axis=1).rename( columns={"new_col": column} ) return self._build_new_dataset(new_df) else: sampled_df = self.sampled_df.copy() df = self.df.copy() sampled_df[column] = sampled_df[column].apply(arg) df[column] = df[column].apply(arg) if column == target_name: target_type = get_feature_type(target_name, sampled_df[target_name]) return self._build_new_dataset( df, sampled_df, target=target_name, target_type=target_type ) else: return self._build_new_dataset( df, sampled_df, target=target_name, target_type=self.target.type if target_name != column and target_name is not None else None, )
[docs] def rename_columns(self, columns): """ Returns a new dataset with altered column names. dict values must be unique (1-to-1). Labels not contained in a dict will be left as-is. Extra labels listed don't throw an error. Parameters ---------- columns: dict-like or function or list of str dict to rename columns selectively, or list of names to rename all columns, or a function like str.upper Returns ------- dataset: same type as the caller A dataset with specified columns renamed. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_renamed = ds.rename_columns({'col1': 'target'}) """ if isinstance(columns, list): assert len(columns) == len( self.columns.values ), "columns length do not match the dataset" columns = dict(zip(self.columns.values, columns)) return self.rename(columns=columns)
[docs] def set_name(self, name): """ Sets name for the dataset. This name will be used to filter the datasets returned by ds.list() API. Calling this API is optional. By default name of the dataset is set to empty. Parameters ---------- name: str Name of the dataset. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data1.csv")) >>> ds_renamed = ds.set_name("dataset1") """ self.name = name
[docs] def set_description(self, description): """ Sets description for the dataset. Give your dataset a description. Parameters ---------- description: str Description of the dataset. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data1.csv")) >>> ds_renamed = ds.set_description("dataset1 is from "data1.csv"") """ self.description = description
[docs] def snapshot(self, snapshot_dir=None, name="", storage_options=None): """ Snapshot the dataset with modifications made so far. Optionally caller can invoke ds.set_name() before saving to identify the dataset uniquely at the time of using ds.list(). The snapshot can be reloaded by providing the URI returned by this API to DatasetFactory.open() Parameters ---------- snapshot_dir: str, optional Directory path under which dataset snapshot will be created. Defaults to snapshots_dir set using DatasetFactory.set_default_storage(). name: str, optional, default: "" Name to uniquely identify the snapshot using DatasetFactory.list_snapshots(). If not provided, an auto-generated name is used. storage_options: dict, optional Parameters passed on to the backend filesystem class. Defaults to storage_options set using DatasetFactory.set_default_storage(). Returns ------- p_str: str the URI to access the snapshotted dataset. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_uri = ds.snapshot() """ if snapshot_dir is None: import ads.dataset.factory as factory snapshot_dir = factory.default_snapshots_dir if snapshot_dir is None: raise ValueError( "Specify snapshot_dir or use DatasetFactory.set_default_storage() to set default \ storage options" ) else: logger.info("Using default snapshots dir %s" % snapshot_dir) name = self._get_unique_name(name) if not snapshot_dir.endswith("/"): snapshot_dir = snapshot_dir + "/" parquet_file = "%s%s.parquet" % (snapshot_dir, name) os.makedirs(snapshot_dir, exist_ok=True) if storage_options is None and parquet_file[:3] == "oci": import ads.dataset.factory as factory storage_options = factory.default_storage_options logger.info("Using default storage options.") return helper.write_parquet( path=parquet_file, data=self.df, metadata_dict={ "metadata": self.feature_types, "transformer": self.transformer_pipeline, }, storage_options=storage_options, )
[docs] def to_csv(self, path, storage_options=None, **kwargs): """ Save the materialized dataframe to csv file. Parameters ---------- path: str Location to write to. If there are more than one partitions in df, should include a glob character to expand into a set of file names, or provide a `name_function=parameter`. Supports protocol specifications such as `"oci://"`, `"s3://"`. storage_options: dict, optional Parameters passed on to the backend filesystem class. Defaults to storage_options set using DatasetFactory.set_default_storage(). kwargs: dict, optional Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> [ds_link] = ds.to_csv("my/path.csv") """ if storage_options is None: import ads.dataset.factory as factory storage_options = factory.default_storage_options logger.info("Using default storage options") return self.df.to_csv(path, storage_options=storage_options, **kwargs)
[docs] def to_parquet(self, path, storage_options=None, **kwargs): """ Save data to parquet file. Parameters ---------- path: str Location to write to. If there are more than one partitions in df, should include a glob character to expand into a set of file names, or provide a `name_function=parameter`. Supports protocol specifications such as `"oci://"`, `"s3://"`. storage_options: dict, optional Parameters passed on to the backend filesystem class. Defaults to storage_options set using DatasetFactory.set_default_storage(). kwargs: dict, optional Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds.to_parquet("my/path") """ if storage_options is None: import ads.dataset.factory as factory storage_options = factory.default_storage_options logger.info("Using default storage options") return self.df.to_parquet(path, storage_options=storage_options, **kwargs)
[docs] def to_json(self, path, storage_options=None, **kwargs): """ Save data to JSON files. Parameters ---------- path: str Location to write to. If there are more than one partitions in df, should include a glob character to expand into a set of file names, or provide a `name_function=parameter`. Supports protocol specifications such as `"oci://"`, `"s3://"`. storage_options: dict, optional Parameters passed on to the backend filesystem class. Defaults to storage_options set using DatasetFactory.set_default_storage(). kwargs: dict, optional Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds.to_json("my/path.json") """ if storage_options is None: import ads.dataset.factory as factory storage_options = factory.default_storage_options logger.info("Using default storage options") return self.df.to_json(path, storage_options=storage_options, **kwargs)
[docs] def to_hdf( self, path: str, key: str, storage_options: dict = None, **kwargs ) -> str: """ Save data to Hierarchical Data Format (HDF) files. Parameters ---------- path : string Path to a target filename. key : string Datapath within the files. storage_options: dict, optional Parameters passed to the backend filesystem class. Defaults to storage_options set using DatasetFactory.set_default_storage(). kwargs: dict, optional Returns ------- str The filename of the HDF5 file created. Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds.to_hdf(path="my/path.h5", key="df") """ if storage_options is None: import ads.dataset.factory as factory storage_options = factory.default_storage_options logger.info("Using default storage options") with pd.HDFStore( "memory", mode="w", driver="H5FD_CORE", driver_core_backing_store=0, ) as hdf_store: hdf_store.put(key, self.df, format=kwargs.get("hdf5_format", "fixed")) data = hdf_store._handle.get_file_image() new_path = ( path.replace("*", "0") if path[-3:] == ".h5" else path.replace("*", "0") + ".h5" ) with fsspec.open( urlpath=new_path, mode="wb", storage_options=storage_options, **kwargs ) as fo: fo.write(data) return new_path
[docs] @runtime_dependency(module="fastavro", install_from=OptionalDependency.DATA) def to_avro(self, path, schema=None, storage_options=None, **kwargs): """ Save data to Avro files. Avro is a remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. Parameters ---------- path : string Path to a target filename. May contain a ``*`` to denote many filenames. schema : dict Avro schema dictionary, see below. storage_options: dict, optional Parameters passed to the backend filesystem class. Defaults to storage_options set using DatasetFactory.set_default_storage(). kwargs: dict, optional See https://fastavro.readthedocs.io/en/latest/writer.html Notes ----- Avro schema is a complex dictionary describing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html. Its structure is as follows:: {'name': 'Test', 'namespace': 'Test', 'doc': 'Descriptive text', 'type': 'record', 'fields': [ {'name': 'a', 'type': 'int'}, ]} where the "name" field is required, but "namespace" and "doc" are optional descriptors; "type" must always be "record". The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec (https://avro.apache.org/docs/1.8.2/spec.html). Examples -------- >>> import pandas >>> import fastavro >>> with open("data.avro", "rb") as fp: >>> reader = fastavro.reader(fp) >>> records = [r for r in reader] >>> df = pandas.DataFrame.from_records(records) >>> ds = ADSDataset.from_dataframe(df) >>> ds.to_avro("my/path.avro") """ # Get the row by row formatting data_row_by_row = [] for i, row in self.df.iterrows(): data_row_by_row.append(row.to_dict()) # Try to auto-generate schema if schema is None: avro_types = self._convert_dtypes_to_avro_types() schema = {"name": self.name, "doc": self.description, "type": "record"} fields = [] ## Add vars for col, dtype in avro_types: fields.append({"name": col, "type": ["null", dtype]}) schema["fields"] = fields parsed_schema = fastavro.parse_schema(schema=schema) new_path = ( path.replace("*", "0") if path[-5:] == ".avro" else path.replace("*", "0") + ".avro" ) with fsspec.open( new_path, "wb", storage_options=storage_options, **kwargs ) as fo: fastavro.writer(fo, parsed_schema, data_row_by_row) return new_path
def _convert_dtypes_to_avro_types(self): avro_types = [] for name, dtype in zip(self.dtypes.index, self.dtypes.values): if dtype == np.int64: avro_dtype = "long" elif "int" in str(dtype): avro_dtype = "int" elif dtype == np.float64: avro_dtype = "double" elif "float" in str(dtype): avro_dtype = "float" elif dtype == np.bool_: avro_dtype = "boolean" else: avro_dtype = "string" avro_types.append((name, avro_dtype)) return avro_types
[docs] def astype(self, types): """ Convert data type of features. Parameters ---------- types: dict key is the existing feature name value is the data type to which the values of the feature should be converted. Valid data types: All numpy datatypes (Example: np.float64, np.int64, ...) or one of categorical, continuous, ordinal or datetime. Returns ------- updated_dataset: `ADSDataset` an ADSDataset with new data types Examples -------- >>> import pandas as pd >>> ds = ADSDataset.from_dataframe(pd.read_csv("data.csv")) >>> ds_reformatted = ds.astype({"target": "categorical"}) """ return self.__getattr__("astype")(helper.map_types(types))
[docs] def merge(self, data, **kwargs): """ Merges this dataset with another ADSDataset or pandas dataframe. Parameters ---------- data : Union[ADSDataset, pandas.DataFrame] Data to merge. kwargs : dict, optional additional keyword arguments that would be passed to underlying dataframe's merge API. Examples -------- >>> import pandas as pd >>> df1 = pd.read_csv("data1.csv") >>> df2 = pd.read_csv("data2.csv") >>> ds = ADSDataset.from_dataframe(df1.merge(df2)) >>> ds_12 = ds1.merge(ds2) """ assert isinstance(data, pd.DataFrame) or isinstance( data, ADSDataset ), "Can only merge datasets if they are of the types pandas or ads" df = self.df.merge(data.df if isinstance(data, ADSDataset) else data, **kwargs) return self._build_new_dataset(df, progress=utils.get_progress_bar(3))
""" Internal methods """ def __getattr__(self, item): attr = getattr(self.df, item) if callable(attr): return self._apply(attr) else: return attr def __getitem__(self, key): if isinstance(key, str) or isinstance(key, (tuple, str)): return self.df[key] else: return self._build_new_dataset(self.df[key]) def _apply(self, func): def df_func(*args, _new_target=None, **kwargs): has_dataframe_arg = False args = list(args) for i, arg in enumerate(args): if isinstance(arg, ADSDataset) or isinstance(arg, pd.DataFrame): has_dataframe_arg = True # convert any argument that is of type ADSDataset to dataframe. This is useful in delegate calls # like dataset1.concat(dataset2) args[i] = arg.df if isinstance(arg, ADSDataset) else arg result = func(*args, **kwargs) # return the response as such if the the result is not a dataframe and it is a read function such as head if ( isinstance(result, pd.DataFrame) and func.__name__ not in self.df_read_functions ): target_name = None target_sample_val = None if not utils.is_same_class(self, ADSDataset): target_name = ( self.target.name if _new_target is None else _new_target ) target_sample_val = ( self.sampled_df[self.target.name].dropna().values[0] ) df = result n = len(df) trans_df = None transformed = False transformers = [] # The sampled dataframe needs to be re-generated when this operation involves another dataframe. # Also, this kind of transformations cannot be reproduced at the time of scoring if not has_dataframe_arg: ft = DataFrameTransformer( func_name=func.__name__, target_name=target_name, target_sample_val=target_sample_val, args=args, kw_args=kwargs, ).fit(result) # transformed is set to false if the method fails to run on pandas dataframe. In this case a new # sampled dataframe is added trans_df, transformed = ft._transform(self.sampled_df.copy()) # if the dataset length changes as a result of transformation, these operations need not be added to # pipeline as they do not need to be reproduced at the time of scoring. transformers = (func.__name__, ft) if n == self.shape[0] else [] init_kwargs = self.init_kwargs.copy() if func.__name__ == "astype": if "types" in init_kwargs: init_kwargs["types"] = init_kwargs["types"] + args[0] else: init_kwargs["types"] = args[0] # if the transforming function is not supported by pandas dataframe, we need to sample the dask # dataframe again to get a new representation return self._build_new_dataset( df, sampled_df=df, target=target_name, target_type=TypeDiscoveryDriver().discover( target_name, df[target_name] ) if target_name is not None and target_name in df else None, sample=not transformed, transformers=transformers, **init_kwargs, ) return result return df_func def _handle_key_error(self, args): raise ValidationError("Column %s does not exist in data" % str(args)) def _build_new_dataset( self, df, sampled_df=None, target=None, target_type=None, transformers=[], sample=False, progress=DummyProgressBar(), n=None, **init_kwargs, ): prev_doc_mode = utils.is_documentation_mode() set_documentation_mode(False) init_kwargs = ( self.init_kwargs if init_kwargs is None or len(init_kwargs) == 0 else init_kwargs.copy() ) n = len(df) if n is None else n # re-calculate sample df if not provided if sampled_df is None or sample: if progress: progress.update("Sampling data") sampled_df = generate_sample( df, n, DatasetDefaults.sampling_confidence_level, DatasetDefaults.sampling_confidence_interval, **init_kwargs, ) else: if progress: progress.update() shape = (n, len(df.columns)) if not utils.is_same_class(self, ADSDataset) and target is None: target = self.target.name set_documentation_mode(prev_doc_mode) # return a ADSDataset object if the target has been removed from the dataframe if target in sampled_df.columns: if progress: progress.update("Building new dataset") target_type = self.target.type if target_type is None else target_type new_ds = get_dataset( df, sampled_df, target, target_type, shape, progress=progress, **init_kwargs, ) new_ds.transformer_pipeline = self._update_transformer_pipeline( transformers ) return new_ds else: if target is not None and not isinstance(progress, DummyProgressBar): logger.info( "The target variable does not exist. Use `set_target()` to specify the target." ) if progress: progress.update("Building the dataset with no target.") dsp = ADSDataset( df, sampled_df, shape, progress=progress, interactive=False, **init_kwargs, ) dsp.transformer_pipeline = self._update_transformer_pipeline(transformers) return dsp def _validate_feature(self, feature_names): if np.isscalar(feature_names): feature_names = [feature_names] for feature in feature_names: if feature not in self.df.columns: self._handle_key_error(feature) def _update_transformer_pipeline(self, steps=[]): if isinstance(steps, tuple): steps = [steps] if steps is None or len(steps) == 0: return copy.deepcopy(self.transformer_pipeline) if self.transformer_pipeline is not None: transformer_pipeline = TransformerPipeline( steps=self.transformer_pipeline.steps + steps ) else: transformer_pipeline = TransformerPipeline(steps=steps) return transformer_pipeline def _get_unique_name(self, name): id = ( uuid.uuid4().hex + "_" + datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S") ) if name == "": return id return name + "_" + id
[docs] def corr( self, correlation_methods: Union[list, str] = "pearson", frac: float = 1.0, sample_size: float = 1.0, nan_threshold: float = 0.8, overwrite: bool = None, force_recompute: bool = False, ): """ Compute pairwise correlation of numeric and categorical columns, output a matrix or a list of matrices computed using the correlation methods passed in. Parameters ---------- correlation_methods: Union[list, str], default to 'pearson' - 'pearson': Use Pearson's Correlation between continuous features, - 'cramers v': Use Cramer's V correlations between categorical features, - 'correlation ratio': Use Correlation Ratio Correlation between categorical and continuous features, - 'all': Is equivalent to ['pearson', 'cramers v', 'correlation ratio']. Or a list containing any combination of these methods, for example, ['pearson', 'cramers v']. frac: Is deprecated and replaced by sample_size. sample_size: float, defaults to 1.0. Float, Range -> (0, 1] What fraction of the data should be used in the calculation? nan_threshold: float, default to 0.8, Range -> [0, 1] Only compute a correlation when the proportion of the values, in a column, is less than or equal to nan_threshold. overwrite: Is deprecated and replaced by force_recompute. force_recompute: bool, default to be False - If False, it calculates the correlation matrix if there is no cached correlation matrix. Otherwise, it returns the cached correlation matrix. - If True, it calculates the correlation matrix regardless whether there is cached result or not. Returns ------- correlation: Union[list, pandas.DataFrame] The pairwise correlations as a matrix (DataFrame) or list of matrices """ frac = deprecate_default_value( frac, None, 1, "<code>frac=None</code> is superseded by <code>sample_size=1.0</code>.", FutureWarning, ) if frac != 1.0: deprecate_frac = deprecate_variable( frac, sample_size, "<code>frac</code> is superseded by <code>sample_size</code>.", DeprecationWarning, ) if sample_size == 1.0: sample_size = deprecate_frac force_recompute = deprecate_variable( overwrite, force_recompute, f"<code>overwrite=None</code> is deprecated. Use <code>force_recompute</code> instead.", DeprecationWarning, ) if sample_size > 1 or sample_size <= 0: logger.error("`sample_size` must to be in the range of (0, 1].") return if nan_threshold > 1 or nan_threshold < 0: logger.error("`nan_threshold` must be between 0 and 1 (exclusive).") return return self._compute_correlation( frac=sample_size, threshold=nan_threshold, force_recompute=force_recompute, correlation_methods=correlation_methods, )
def _compute_correlation( self, frac=1.0, threshold=0.8, include_n_features=16, correlation_methods="pearson", force_recompute=False, ): """ returns a list of correlation matrix/matrices """ # validate the correlation methods correlation_methods = _validate_correlation_methods(correlation_methods) # if users choose to sample a frac of the data corr_df = self.df if not frac else self.df.sample(frac=frac) # return columns by type and filter by threshold threshold = threshold * 100 feature_types_df = pd.DataFrame.from_dict(self.feature_types).T # reduce the dim of wide data n_rows, n_columns = self.shape is_wide_dataset = n_columns >= N_Features_Wide_Dataset if is_wide_dataset and include_n_features: corr_df, feature_types_df = self._reduce_dim_for_wide_dataset( corr_df, feature_types_df, include_n_features ) categorical_columns, continuous_columns, _ = _get_columns_by_type( feature_types_df, threshold=threshold ) # get the correlation correlation_list = [] for method in correlation_methods: correlation_list.append( self._return_correlation( corr_df, method, categorical_columns, continuous_columns, force_recompute, ) ) return correlation_list[0] if len(correlation_list) == 1 else correlation_list def _calc_pearson(self, df: pd.DataFrame, continuous_columns: list) -> pd.DataFrame: self._pearson = ( df[continuous_columns].corr() if len(continuous_columns) > 1 else pd.DataFrame() ) return self._pearson def _calc_cramers_v( self, df: pd.DataFrame, categorical_columns: list ) -> pd.DataFrame: self._cramers_v = _cat_vs_cat(df, categorical_columns) return self._cramers_v def _calc_correlation_ratio( self, df: pd.core.frame.DataFrame, categorical_columns: list, continuous_columns: list, ) -> pd.DataFrame: self._correlation_ratio = _cat_vs_cts( df, categorical_columns, continuous_columns ) return self._correlation_ratio def _return_correlation( self, corr_df, method, categorical_columns, continuous_columns, force_recompute, ): if not force_recompute and hasattr(self, "_" + "_".join(method.split())): logger.info( f"Using cached results for {method} correlation. Use" " `force_recompute=True` to override." ) return getattr(self, "_" + "_".join(method.split())) else: if method == "pearson": self._calc_pearson(corr_df, continuous_columns) return self._pearson elif method == "cramers v": self._calc_cramers_v(corr_df, categorical_columns) return self._cramers_v elif method == "correlation ratio": self._calc_correlation_ratio( corr_df, categorical_columns, continuous_columns ) return self._correlation_ratio else: raise ValueError(f"The {method} method is not supported.") @runtime_dependency(module="IPython", install_from=OptionalDependency.NOTEBOOK) def _reduce_dim_for_wide_dataset( self, corr_df: pd.DataFrame, feature_types_df: pd.DataFrame, include_n_features ): min_cores_for_correlation = 2 n_rows, n_columns = self.shape from IPython.core.display import display, HTML if utils.get_cpu_count() <= min_cores_for_correlation: msg = ( f"Not attempting to calculate correlations, too few cores ({utils.get_cpu_count()}) " f"for wide dataset ({n_columns} columns)" ) display(HTML(f"<li>{msg}</li>")) return None, None display(HTML(f"<li>detected wide dataset ({n_columns} columns)</li>")) if "target" in self.__dict__: display( HTML( f"<li>feature reduction using mutual information (max {include_n_features} columns)</li>" ) ) logger.info("Set `include_n_features=None` to include all features.") corr_sampled_df = self._find_feature_subset( self.sampled_df, self.target.name, include_n_features=include_n_features ) corr_df, feature_types_df = self._update_dataframes( corr_sampled_df, corr_df, feature_types_df ) else: # # in the absense of a target we simply use the first_n # logger.info( f"To include the first {include_n_features} features based on the feature" f"importance, use `.set_target`()." ) feature_types_df = feature_types_df[ (feature_types_df.index.isin(corr_df.columns.values)) & feature_types_df.type.isin( ["categorical", "ordinal", "continuous", "zipcode"] ) ] corr_df = corr_df[feature_types_df.index[:include_n_features]] feature_types_df = feature_types_df.iloc[:include_n_features, :] return corr_df, feature_types_df def _update_dataframes(self, corr_sampled_df, corr_df, feature_types_df): """ update the dataframe and feature types based on the reduced dataframe """ cols = corr_sampled_df.columns.tolist() cols.insert(0, cols.pop(cols.index(self.target.name))) corr_df_reduced = corr_df[[*cols]] feature_types_df_reduced = feature_types_df[feature_types_df.index.isin(cols)] return corr_df_reduced, feature_types_df_reduced
[docs] def show_corr( self, frac: float = 1.0, sample_size: float = 1.0, nan_threshold: float = 0.8, overwrite: bool = None, force_recompute: bool = False, correlation_target: str = None, plot_type: str = "heatmap", correlation_threshold: float = -1, correlation_methods="pearson", **kwargs, ): """ Show heatmap or barplot of pairwise correlation of numeric and categorical columns, output three tabs which are heatmap or barplot of correlation matrix of numeric columns vs numeric columns using pearson correlation method, categorical columns vs categorical columns using Cramer's V method, and numeric vs categorical columns, excluding NA/null values and columns which have more than 80% of NA/null values. By default, only 'pearson' correlation is calculated and shown in the first tab. Set correlation_methods='all' to show all correlation charts. Parameters ---------- frac: Is superseded by sample_size sample_size: float, defaults to 1.0. Float, Range -> (0, 1] What fraction of the data should be used in the calculation? nan_threshold: float, defaults to 0.8, Range -> [0, 1] In the default case, it will only calculate the correlation of the columns which has less than or equal to 80% of missing values. overwrite: Is deprecated and replaced by force_recompute. force_recompute: bool, default to be False. - If False, it calculates the correlation matrix if there is no cached correlation matrix. Otherwise, it returns the cached correlation matrix. - If True, it calculates the correlation matrix regardless whether there is cached result or not. plot_type: str, default to "heatmap" It can only be "heatmap" or "bar". Note that if "bar" is chosen, correlation_target also has to be set and the bar chart will only show the correlation values of the pairs which have the target in them. correlation_target: str, default to Non It can be any columns of type continuous, ordinal, categorical or zipcode. When correlation_target is set, only pairs that contains correlation_target will show. correlation_threshold: float, default to -1 It can be any number between -1 and 1. correlation_methods: Union[list, str], defaults to 'pearson' - 'pearson': Use Pearson's Correlation between continuous features, - 'cramers v': Use Cramer's V correlations between categorical features, - 'correlation ratio': Use Correlation Ratio Correlation between categorical and continuous features, - 'all': Is equivalent to ['pearson', 'cramers v', 'correlation ratio']. Or a list containing any combination of these methods, for example, ['pearson', 'cramers v']. Returns ------- None """ frac = deprecate_default_value( frac, None, 1, "<code>frac=None</code> is superseded by <code>sample_size=1.0</code>.", FutureWarning, ) if frac != 1.0: deprecate_frac = deprecate_variable( frac, sample_size, "<code>frac</code> is deprecated. Use <code>sample_size</code> instead.", DeprecationWarning, ) if sample_size == 1.0: sample_size = deprecate_frac feature_types_df = pd.DataFrame.from_dict(self.feature_types).loc["type", :] features_list = list( feature_types_df[ feature_types_df.isin( ["categorical", "zipcode", "continuous", "ordinal"] ) ].index ) if plot_type not in ["heatmap", "bar"]: raise ValueError('plot_type has to be "heatmap" ' 'or "bar"') if plot_type == "bar" and correlation_target is None: raise ValueError('correlation_target has to be set when plot_type="bar".') if correlation_target: if correlation_target not in features_list: raise ValueError( "correlation_target has to be in {}.".format(features_list) ) force_recompute = deprecate_variable( overwrite, force_recompute, f"<code>overwrite=None</code> is deprecated. Use <code>force_recompute</code> instead.", DeprecationWarning, ) plot_correlation_heatmap( ds=self, frac=sample_size, force_recompute=force_recompute, correlation_target=correlation_target, plot_type=plot_type, correlation_threshold=correlation_threshold, nan_threshold=nan_threshold, correlation_methods=correlation_methods, **kwargs, )
[docs] @runtime_dependency(module="IPython", install_from=OptionalDependency.NOTEBOOK) @runtime_dependency(module="ipywidgets", install_from=OptionalDependency.NOTEBOOK) def show_in_notebook( self, correlation_threshold=-1, selected_index=0, sample_size=0, visualize_features=True, correlation_methods="pearson", **kwargs, ): """ Provide visualization of dataset. - Display feature distribution. The data table display will show a maximum of 8 digits, - Plot the correlation between the dataset features (as a heatmap) only when all the features are continuous or ordinal, - Display data head. Parameters ---------- correlation_threshold : int, default -1 The correlation threshold to select, which only show features that have larger or equal correlation values than the threshold. selected_index: int, str, default 0 The displayed output is stacked into an accordion widget, use selected_index to force the display to open a specific element, use the (zero offset) index or any prefix string of the name (eg, 'corr' for correlations) sample_size: int, default 0 The size (in rows) to sample for visualizations visualize_features: bool default False For the "Features" section control if feature visualizations are shown or not. If not only a summary of the numeric statistics is shown. The numeric statistics are also always shows for wide (>64 features) datasets correlation_methods: Union[list, str], default to 'pearson' - 'pearson': Use Pearson's Correlation between continuous features, - 'cramers v': Use Cramer's V correlations between categorical features, - 'correlation ratio': Use Correlation Ratio Correlation between categorical and continuous features, - 'all': Is equivalent to ['pearson', 'cramers v', 'correlation ratio']. Or a list containing any combination of these methods, for example, ['pearson', 'cramers v']. """ if not utils.is_notebook(): print("show_in_notebook called but not in notebook environment") return n_rows, n_columns = self.shape min_sample_size = 10000 if sample_size == 0: sub_samp_size = len(self.sampled_df) sub_samp_df = self.sampled_df else: sub_samp_size = max(min(sample_size, len(self.sampled_df)), min_sample_size) sub_samp_df = self.sampled_df.sample(n=sub_samp_size) html_summary = "" if self.name: html_summary += "<h1>Name: %s</h1>" % (self.name) # dataset type (problem type) html_summary += "<h3>Type: %s</h3>" % self.__class__.__name__ if self.description: html_summary += "<pre>%s</pre>" % self.description html_summary += "<hr>" html_summary += "<h3>{:,} Rows, {:,} Columns</h3>".format(n_rows, n_columns) html_summary += "<h4>Column Types:</h4><UL>" for group in Counter( [self.feature_types[k].meta_data["type"] for k in self.feature_types] ).most_common(): html_summary += "<LI><b>%s:</b> %d features" % (group[0], group[1]) html_summary += "</UL>" html_summary += """ <p><b> Note: Visualizations use a sampled subset of the dataset, this is to improve plotting performance. The sample size is calculated to be statistically significant within the confidence level: {} and confidence interval: {}. The sampled data has {:,} rows </b> </p> <ul> <li>The confidence <i>level</i> refers to the long-term success rate of the method, that is, how often this type of interval will capture the parameter of interest. </li> <li>A specific confidence <i>interval</i> gives a range of plausible values for the parameter of interest </li> </ul> """.format( DatasetDefaults.sampling_confidence_level, DatasetDefaults.sampling_confidence_interval, sub_samp_df.shape[0], ) html_summary += "</UL>" from ipywidgets import widgets summary = widgets.HTML(html_summary) features = widgets.HTML() correlations = widgets.Output() warningz = widgets.HTML() warningz.value = "Analyzing for warnings..." features.value = "Calculating full statistical info..." # with correlations: # display(HTML("<li>calculating...</li>")) accordion = widgets.Accordion( children=[summary, features, correlations, warningz] ) accordion.set_title(0, "Summary") accordion.set_title(1, "Features") accordion.set_title(2, "Correlations") accordion.set_title(3, "Warnings") if isinstance(selected_index, str): # lookup by title possible_titles = [ accordion.get_title(i) for i in range(len(accordion.children)) ] for i, title in enumerate(possible_titles): if title.lower().startswith(selected_index.lower()): selected_index = i break if isinstance(selected_index, str): # failed to match a title logger.info( "`selected_index` should be one of: {}.".format( ", ".join(possible_titles) ) ) selected_index = 0 accordion.selected_index = selected_index is_wide_dataset = n_columns >= N_Features_Wide_Dataset # # set up dataframe to use for correlation calculations # self.df_stats = self._calculate_dataset_statistics( is_wide_dataset, [features, warningz] ) with correlations: feature_types_df = pd.DataFrame.from_dict(self.feature_types).loc["type", :] if not is_wide_dataset: feature_types_df = feature_types_df[ self.df_stats["missing"] < len(self.df) ] frac = kwargs.pop("frac", 1.0) overwrite = kwargs.pop("overwrite", None) force_recompute = kwargs.pop("force_recompute", False) force_recompute = deprecate_variable( overwrite, force_recompute, f"<code>overwrite=None</code> is deprecated. Use <code>force_recompute</code> instead.", DeprecationWarning, ) plot_type = kwargs.pop("plot_type", "heatmap") correlation_target = kwargs.pop("correlation_target", None) nan_threshold = kwargs.pop("nan_threshold", 0.8) self.show_corr( correlation_threshold=correlation_threshold, sample_size=frac, force_recompute=force_recompute, plot_type=plot_type, correlation_target=correlation_target, nan_threshold=nan_threshold, correlation_methods=correlation_methods, **kwargs, ) from IPython.core.display import display display(accordion) # generate html for feature_distribution & warnings accordion.set_title( 1, f"Features ({n_columns})" ) # adjust for datasets with target # # compute missing value statistics # not done for wide datasets # features.value = self._generate_features_html( is_wide_dataset, n_columns, self.df_stats, visualizations_follow=bool(visualize_features), ) warningz.value = self._generate_warnings_html( is_wide_dataset, n_rows, n_columns, self.df_stats, warningz, accordion ) if visualize_features and not is_wide_dataset: self._visualize_feature_distribution(features)
[docs] def get_recommendations(self, *args, **kwargs): # real signature may change """ Returns user-friendly error message to set target variable before invoking this API. Parameters ---------- kwargs Returns ------- NotImplementedError raises NotImplementedError, if target parameter value not provided """ raise NotImplementedError( "Please set the target using set_target() before invoking this API. See " "https://accelerated-data-science.readthedocs.io/en/latest/ads.dataset.html#ads.dataset.dataset.ADSDataset.set_target " "for the API usage." )
[docs] def suggest_recommendations(self, *args, **kwargs): # real signature may change """ Returns user-friendly error message to set target variable before invoking this API. Parameters ---------- kwargs Returns ------- NotImplementedError raises NotImplementedError, if target parameter value not provided """ raise NotImplementedError( "Please set the target using set_target() before invoking this API. See " "https://accelerated-data-science.readthedocs.io/en/latest/ads.dataset.html#ads.dataset.dataset.ADSDataset.set_target " "for the API usage." )