Source code for shapfire.utils
"""This module contains a collection of general constants and functions that are
used by several other functions and classes in the ShapFire library."""
import typing
import numpy
import pandas
import scipy.stats as stats
REPLACE: str = "replace"
"""The default string value used to indicate that NaN or None values should be \
replaced with another given value.""" # pylint: disable=W0105
DROP: str = "drop"
"""The default string value used to indicate that samples associated with a \
dataset (X) and target variable (y) should be dropped if NaN or None values \
are contained in a sample.
""" # pylint: disable=W0105
DROP_SAMPLES: str = "drop_samples"
"""The default string value used to indicate that a sample (row) in a dataset \
(X) should be dropped if it contains NaN or None values.
""" # pylint: disable=W0105
DROP_FEATURES: str = "drop_features"
"""The default string value used to indicate that a feature (column) in a \
dataset (X) should be dropped if it contains NaN or None values.
""" # pylint: disable=W0105
SKIP: str = "skip"
"""The default string value used to indicate that a value should be skipped \
whenever a NaN or None value is encountered.
""" # pylint: disable=W0105
DEFAULT_REPLACE_VALUE: float = 0.0
"""The default value that NaN or None values are replaced with.
""" # pylint: disable=W0105
DEFAULT_RANDOM_SEED: int = 123
"""The default random seed used across modules.""" # pylint: disable=W0105
def remove_incomplete_samples(
x: typing.Union[list, numpy.ndarray],
y: typing.Union[list, numpy.ndarray],
) -> typing.Union[tuple[list, list], tuple[numpy.ndarray, numpy.ndarray]]:
x = [v if v is not None else numpy.nan for v in x]
y = [v if v is not None else numpy.nan for v in y]
arr = numpy.array([x, y]).transpose()
arr = arr[~numpy.isnan(arr).any(axis=1)].transpose()
if isinstance(x, list):
return arr[0].tolist(), arr[1].tolist()
else:
return arr[0], arr[1]
def replace_nan_with_value(
x: numpy.ndarray, y: numpy.ndarray, value: float
) -> tuple[numpy.ndarray, numpy.ndarray]:
x = numpy.array([v if v == v and v is not None else value for v in x])
y = numpy.array([v if v == v and v is not None else value for v in y])
return x, y
def convert(
data: typing.Union[numpy.ndarray, pandas.Series, pandas.DataFrame, list],
to: str,
copy: bool = True,
) -> typing.Union[numpy.ndarray, pandas.Series, pandas.DataFrame, list]:
converted: typing.Union[
None, numpy.ndarray, pandas.Series, pandas.DataFrame, list
] = None
if to.strip().lower() == "array":
if isinstance(data, numpy.ndarray):
converted = data.copy() if copy else data
elif isinstance(data, pandas.Series):
converted = data.values
elif isinstance(data, list):
converted = numpy.array(data)
elif isinstance(data, pandas.DataFrame):
converted = data.values()
elif to.strip().lower() == "list":
if isinstance(data, list):
converted = data.copy() if copy else data
elif isinstance(data, pandas.Series):
converted = data.values.tolist()
elif isinstance(data, numpy.ndarray):
converted = data.tolist()
elif to.strip().lower() == "dataframe":
if isinstance(data, pandas.DataFrame):
converted = data.copy(deep=True) if copy else data
elif isinstance(data, numpy.ndarray):
converted = pandas.DataFrame(data)
else:
raise ValueError(f"Unknown data conversion: {to}")
if converted is None:
raise TypeError(
f"Cannot handle data conversion of type: {type(data)} to {to}"
)
else:
return converted
def cramers_v(
# TODO: Allow 'x' to be an numpy.ndarray
x: pandas.Series,
# TODO: Allow 'y' to be an numpy.ndarray
y: pandas.Series,
bias_correction: bool = True,
nan_strategy: str = REPLACE,
nan_replace_value: float = DEFAULT_REPLACE_VALUE,
) -> float:
if nan_strategy == REPLACE:
x, y = replace_nan_with_value(
x=x,
y=y,
value=nan_replace_value,
)
elif nan_strategy == DROP:
x, y = remove_incomplete_samples(x=x, y=y)
confusion_matrix = pandas.crosstab(index=x, columns=y)
chi2, _, _, _ = stats.chi2_contingency(confusion_matrix)
n = confusion_matrix.sum().sum()
phi2 = chi2 / n
r, k = confusion_matrix.shape
if bias_correction:
phi2corr = numpy.maximum(0, phi2 - ((k - 1) * (r - 1)) / (n - 1))
rcorr = r - ((r - 1) ** 2) / (n - 1)
kcorr = k - ((k - 1) ** 2) / (n - 1)
if numpy.minimum((kcorr - 1), (rcorr - 1)) == 0:
print(
"Unable to calculate Cramer's V using bias correction. "
+ "Consider using bias_correction=False"
)
return numpy.nan
else:
return numpy.sqrt(
phi2corr / numpy.minimum((kcorr - 1), (rcorr - 1))
)
else:
if numpy.minimum(k - 1, r - 1) == 0:
return numpy.nan
else:
return numpy.sqrt(phi2 / numpy.minimum(k - 1, r - 1))
def correlation_ratio(
# TODO: Allow 'categories' to be an numpy.ndarray
categories: pandas.Series,
# TODO: Allow 'measurements' to be an numpy.ndarray
measurements: pandas.Series,
nan_strategy: str = REPLACE,
nan_replace_value: float = DEFAULT_REPLACE_VALUE,
) -> float:
if nan_strategy == REPLACE:
categories, measurements = replace_nan_with_value(
x=categories,
y=measurements,
value=nan_replace_value,
)
elif nan_strategy == DROP:
categories, measurements = remove_incomplete_samples(
x=categories, y=measurements
)
categories = convert(data=categories, to="array")
measurements = convert(data=measurements, to="array")
fcat, _ = pandas.factorize(categories)
cat_num = numpy.max(fcat) + 1
y_avg_array = numpy.zeros(cat_num)
n_array = numpy.zeros(cat_num)
for i in range(0, cat_num):
cat_measures = measurements[numpy.argwhere(fcat == i).flatten()]
n_array[i] = len(cat_measures)
y_avg_array[i] = numpy.average(cat_measures)
y_total_avg = numpy.sum(numpy.multiply(y_avg_array, n_array)) / numpy.sum(
n_array
)
numerator = numpy.sum(
numpy.multiply(
n_array,
numpy.power(
numpy.subtract(
y_avg_array,
y_total_avg,
),
2,
),
)
)
denominator = numpy.sum(
numpy.power(
numpy.subtract(
measurements,
y_total_avg,
),
2,
)
)
if numerator == 0:
eta = 0.0
else:
eta = numpy.sqrt(numerator / denominator)
return eta
[docs]def associations(
X: pandas.DataFrame,
nan_strategy: str = DROP_SAMPLES,
nan_replace_value: float = DEFAULT_REPLACE_VALUE,
) -> pandas.DataFrame:
"""
Calculate pairwise measures of association/correlation between numerical and
categorical features in a given dataset. Numerical-numerical association is
measured through Spearman's correlation coefficient, numerical-categorical
association is measured through the correlation ratio and categorical-
categorical association is measured through Cramer's V.
Args:
X: The input dataset that is assumed to contain features (columns) and \
corresponding observations (rows).
nan_strategy: The action to take in case the input dataset contains \
NaN or None values. Defaults to DROP_SAMPLES.
nan_replace_value: In case the :code:`nan_strategy` is \
:const:`shapfire.utils.REPLACE`, then this argument \
determines the value which NaN or None values are replaced by. \
Defaults to :const:`shapfire.utils.DEFAULT_REPLACE_VALUE`.
Raises:
ValueError: If the number of `category` and `float` features (columns) \
in the pandas dataframe do not add up to the total number of \
features (columns) contained in the dataframe.
Returns:
A symmetric pandas dataframe that contains all pariwise feature \
correlation/association values.
"""
# Extract dataframe column labels
columns = X.columns
# Apply a strategy for handling NaN values in the given data
if nan_strategy == REPLACE:
_X = X.fillna(value=nan_replace_value, inplace=False)
elif nan_strategy == DROP_SAMPLES:
_X = X.dropna(axis=0, inplace=False)
elif nan_strategy == DROP_FEATURES:
_X = X.dropna(axis=1, inplace=False)
else:
_X = X.copy()
# Identify categorical features and columns
cat_columns = _X.select_dtypes(include=["category"]).columns
# Identify numerical features and columns
num_columns = _X.select_dtypes(include=["float"]).columns
if len(cat_columns) + len(num_columns) != _X.shape[1]:
# Make sure that columns are either of type 'category' or type 'float'
raise ValueError(
"The number of categorical and numerical features (columns) in "
+ "the dataframe do not add up to the total number of features "
+ "(columns) that are actually contained in the dataframe. Make "
+ "sure the data contained in the columns are either of 'dtype' "
+ "'float' or 'category'."
)
# Create dataframe for storing associations values
c = pandas.DataFrame(index=columns, columns=columns)
# Find single-value columns
single_value_columns_set = set()
for column in columns:
if _X[column].unique().size == 1:
single_value_columns_set.add(column)
# Compute feature associations
for i in range(0, len(columns)):
if columns[i] in single_value_columns_set:
c.loc[:, columns[i]] = 0.0
c.loc[columns[i], :] = 0.0
for j in range(i, len(columns)):
if columns[j] in single_value_columns_set:
continue
elif i == j:
c.loc[columns[i], columns[j]] = 1.0
else:
if columns[i] in cat_columns:
if columns[j] in cat_columns:
cell = cramers_v(
_X[columns[i]],
_X[columns[j]],
bias_correction=False,
nan_strategy=SKIP,
)
ij, ji = cell, cell
else:
cell = correlation_ratio(
_X[columns[i]],
_X[columns[j]],
nan_strategy=SKIP,
)
ij, ji = cell, cell
else:
if columns[j] in cat_columns:
cell = correlation_ratio(
_X[columns[j]],
_X[columns[i]],
nan_strategy=SKIP,
)
ij, ji = cell, cell
else:
cell, _ = stats.spearmanr(
_X[columns[i]],
_X[columns[j]],
)
ij, ji = cell, cell
c.loc[columns[i], columns[j]] = (
ij if not numpy.isnan(ij) and abs(ij) < numpy.inf else 0.0
)
c.loc[columns[j], columns[i]] = (
ji if not numpy.isnan(ji) and abs(ji) < numpy.inf else 0.0
)
c.fillna(value=numpy.nan, inplace=True)
return c
class ThresholdFinder:
def __init__(
self,
random_seed: int,
ncols: int,
n_batches: int = 250,
n_samples: int = 1000,
) -> None:
self.ncols = ncols
self.n_batches = n_batches
self.n_samples = n_samples
self.random_seed = random_seed
self.data: typing.Union[None, list[float]] = None
self.lower_threshold: typing.Union[None, float] = None
self.iqr: typing.Union[None, float] = None
self.upper_threshold: typing.Union[None, float] = None
def fit(self) -> dict[str, float]:
self.data = self.estimate_ranking_distribution()
a, b, c = self.find_quartiles(data=self.data)
self.lower_threshold = a
self.iqr = b
self.upper_threshold = c
return {
"lower_threshold": a,
"iqr": b,
"upper_threshold": c,
}
def find_quartiles(
self, data: list[float]
) -> typing.Tuple[float, float, float]:
data = sorted(data)
q1, q3 = numpy.percentile(data, [25, 75])
iqr = q3 - q1
lower_bound = q1 - (1.5 * iqr)
upper_bound = q3 + (1.5 * iqr)
return lower_bound, iqr, upper_bound
def estimate_ranking_distribution(self) -> list[float]:
rng = numpy.random.default_rng(self.random_seed)
# Generate ideal ranking with ranking starting from 1
ideal_rank = numpy.arange(self.ncols) + 1
vstacked_list: list[float] = []
for _ in range(self.n_batches):
rnd_order = rng.permuted(
numpy.tile(ideal_rank, self.n_samples).reshape(
self.n_samples, ideal_rank.size
),
axis=1,
)
# Compute mean over rows
result = numpy.mean(numpy.abs(ideal_rank - rnd_order), axis=0)
vstacked_list.extend(result)
return vstacked_list
def plot_ranking_distribution(self) -> None:
# TODO
return None
Last update:
Jun 12, 2022