Source code for lymixture.utils

"""Module with utilities for the mixture model package."""

import logging
import warnings

import numpy as np
import pandas as pd
from scipy.special import factorial, softmax

warnings.filterwarnings("ignore", category=pd.errors.PerformanceWarning)
logger = logging.getLogger(__name__)

RESP_COLS = ("_mixture", "responsibility")

[docs] def binom_pmf(k: np.ndarray, n: int, p: float) -> np.ndarray: """Compute binomial PMF fast.""" if p > 1.0 or p < 0.0: msg = "Binomial prob must be btw. 0 and 1" raise ValueError(msg) q = 1.0 - p binom_coeff = factorial(n) / (factorial(k) * factorial(n - k)) return binom_coeff * p**k * q ** (n - k)
[docs] def late_binomial(support: np.ndarray, p: float = 0.5) -> np.ndarray: """Parametrized binomial distribution.""" return binom_pmf(k=support, n=support[-1], p=p)
[docs] def map_to_simplex(from_real: np.ndarray | list[float]) -> np.ndarray: """Map from real numbers to simplex. The result has one entry more than ``values``. The method creates a simplex by adding a dimension which is fixed to zero Then the values are run through a softmax function to normalize them. >>> real = [4, 3.5] >>> map_to_simplex(real) array([0.01127223, 0.61544283, 0.37328494]) """ non_normalized = np.array([0.0, *from_real]) return softmax(non_normalized, axis=0)
[docs] def map_to_real(from_simplex: np.ndarray | list[float]) -> np.ndarray: """Map from simplex to real numbers. >>> simplex = [0.01127223, 0.61544283, 0.37328494] >>> np.allclose(map_to_real(simplex), [4, 3.5]) True """ from_simplex = np.array(from_simplex) normalizer = 1 / from_simplex[0] return np.log(from_simplex[1:] * normalizer)
[docs] def normalize( values: np.ndarray, axis: int, **isclose_kwargs: float | bool, ) -> np.ndarray: """Normalize ``values`` to sum to 1 along ``axis``. Beyond normalizing, this function also sets values that are close to zero to the exact value of zero. For this, it passes all extra keyword arguments to numpy's ``isclose`` function. >>> normalize(np.array([0.1, 0.2, 0.7]), axis=0) # doctest: +NORMALIZE_WHITESPACE array([0.1, 0.2, 0.7]) >>> normalize(np.array([1e-20, 0.3, 0.7]), axis=0) # doctest: +NORMALIZE_WHITESPACE array([0. , 0.3, 0.7]) >>> normalize(np.array([1e-20, 0.3, 0.3]), axis=0) # doctest: +NORMALIZE_WHITESPACE array([0. , 0.5, 0.5]) """ normalized = values / np.sum(values, axis=axis) small_idx = np.isclose(normalized, 0.0, **isclose_kwargs) normalized[small_idx] = 0.0 return normalized / np.sum(normalized, axis=axis)
[docs] def log_normalize( log_values: np.ndarray, axis: int, ) -> np.ndarray: """Log-normalize ``log_values`` to sum to 1 along ``axis``. >>> log_norm = log_normalize(np.log([0.1, 0.2, 0.7]), axis=0) >>> np.exp(np.logaddexp.reduce(log_norm, axis=0)) # doctest: +NORMALIZE_WHITESPACE np.float64(1.0) """ return log_values - np.logaddexp.reduce(log_values, axis=axis)
[docs] def harden(values: np.ndarray, axis: int) -> np.ndarray: """Harden ``values`` to become a one-hot-encoding along the given ``axis``. >>> values = np.array( ... [[0.1, 0.2, 0.7], ... [0.3, 0.4, 0.3]] ... ) >>> harden(values, axis=1) # doctest: +NORMALIZE_WHITESPACE array([[0, 0, 1], [0, 1, 0]]) >>> arr = np.array([[[0.84, 0.64, 0.3 , 0.23], ... [0.18, 0.31, 0.23, 0.54], ... [0.08, 0.05, 0.72, 0.09]], ... [[0.33, 0.43, 0.28, 0.54], ... [0.26, 0.48, 0.8 , 0.01], ... [0.45, 0.09, 0.64, 0.11]]]) >>> harden(arr, axis=2) # doctest: +NORMALIZE_WHITESPACE array([[[1, 0, 0, 0], [0, 0, 0, 1], [0, 0, 1, 0]], [[0, 0, 0, 1], [0, 0, 1, 0], [0, 0, 1, 0]]]) >>> harden(np.array([0.1, 0.2, 0.3, 0.1]), axis=0) array([0, 0, 1, 0]) """ maxdim = len(values.shape) - 1 idx = np.argmax(values, axis=axis) # one dim less than `values` one_hot = np.eye(values.shape[axis], dtype=int)[idx] # right dim, but wrong order dim_sort = (*range(axis), maxdim, *range(axis, maxdim)) return one_hot.transpose(*dim_sort) # right order
[docs] def join_with_resps( patient_data: pd.DataFrame, num_components: int, resps: np.ndarray | None = None, ) -> pd.DataFrame: """Join patient data with empty responsibilities (and reset index).""" mixture_columns = pd.MultiIndex.from_tuples( [(*RESP_COLS, i) for i in range(num_components)], ) if resps is None: resps = np.empty(shape=(len(patient_data), num_components)) resps.fill(np.nan) resps = pd.DataFrame(resps, columns=mixture_columns) if RESP_COLS in patient_data: patient_data = patient_data.drop(columns=RESP_COLS) return patient_data.join(resps).reset_index(drop=True)
[docs] def one_slice(idx: int) -> slice: """Return a slice that selects only one element at the given index. Helpful if one wants to select one element, but return it as a list. >>> l = [1, 2, 3, 4, 5] >>> l[one_slice(2)] [3] """ return slice(idx, idx + 1)