Source code for yaw.core.math

"""This module implements some math and array related functions.
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, TypeVar

import numpy as np
from numpy.exceptions import AxisError
from numpy.typing import NDArray

from ._math import _rebin

if TYPE_CHECKING:  # pragma: no cover
    from numpy.typing import ArrayLike

    from yaw.core.containers import SampledData

__all__ = [
    "array_equal",
    "outer_triu_sum",
    "apply_bool_mask_ndim",
    "apply_slice_ndim",
    "sgn",
    "cov_from_samples",
    "global_covariance",
    "corr_from_cov",
    "rebin",
    "shift_histogram",
]


_Tarr = TypeVar("_Tarr", bound=NDArray)


[docs] def array_equal(arr1: NDArray, arr2: NDArray) -> bool: """Check if the shape and array elements of two numpy array are equal.""" return ( isinstance(arr1, np.ndarray) and isinstance(arr2, np.ndarray) and arr1.shape == arr2.shape and (arr1 == arr2).all() )
[docs] def outer_triu_sum( a: ArrayLike, b: ArrayLike, *, k: int = 0, axis: int | None = None ) -> NDArray: """Compute the sum over the upper triangle of the outer product. Shapes of input array must be identical. Equivalent to >>> np.triu(np.outer(a, b), k).sum(axis) but supports extra dimensions in a and b and does not construct the full outer product matrix in memory. Args: a (:obj:`NDArray`): First input array. b (:obj:`NDArray`): Second input array. Keyword args: k (:obj:`int`, optional): Diagonal above which to zero elements. `k = 0` (the default) is the main diagonal, `k < 0` is below it and `k > 0` is above. axis (:obj:`int`, optional): Array axis over which the outer product is summed. All by default. """ a = np.atleast_1d(a) b = np.atleast_1d(b) if a.shape != b.shape: raise IndexError("shape of 'a' and 'b' does not match") # allocate output array dtype = (a[0] * b[0]).dtype # correct dtype for product N = len(a) # sum all elements if axis is None: result = np.zeros_like(a[0], dtype=dtype) for i in range(min(N, N - k)): result += (a[i] * b[max(0, i + k) :]).sum(axis=0) # sum row-wise elif axis == 1: result = np.zeros_like(b, dtype=dtype) for i in range(min(N, N - k)): result[i] = (a[i] * b[max(0, i + k) :]).sum(axis=0) # sum column-wise elif axis == 0: result = np.zeros_like(a, dtype=dtype) for i in range(max(0, k), N): result[i] = (a[: min(N, max(0, i - k + 1))] * b[i]).sum(axis=0) return result[()]
[docs] def apply_bool_mask_ndim( array: _Tarr, mask: NDArray[np.bool_], axis: int | Sequence[int] | None = None ) -> _Tarr: """Apply a boolean mask (``mask``) to one or many axes of a numpy array.""" if axis is None: axis = list(range(array.ndim)) elif isinstance(axis, int): axis = [axis] result = array for ax in axis: if result.shape[ax] != len(mask): raise IndexError( f"boolean index did not match indexed array along dimension " f"{ax}; dimension is {result.shape[ax]} but corresponding " f"boolean dimension is {len(mask)}" ) result = np.compress(mask, result, axis=ax) return result
[docs] def apply_slice_ndim( array: _Tarr, item: int | slice | Sequence, axis: int | Sequence[int] | None = None ) -> _Tarr: """Apply an integer subset or slice (``item``) to one or many axes of a numpy array.""" if axis is None: axis = list(range(array.ndim)) elif isinstance(axis, int): axis = [axis] if isinstance(item, slice): slices = [slice(None) for _ in range(array.ndim)] for ax in axis: slices[ax] = item return array[tuple(slices)] else: if isinstance(item, int): item = [item] indices = [range(n) for n in array.shape] for ax in axis: indices[ax] = item mesh_indices = np.ix_(*indices) return array[mesh_indices]
[docs] def sgn(val: ArrayLike) -> ArrayLike: """Compute the sign of a (array of) numbers, with positive numbers and 0 returning 1, negative number returning -1.""" return np.where(val == 0, 1.0, np.sign(val))
[docs] def cov_from_samples( samples: NDArray | Sequence[NDArray], method: str, rowvar: bool = False, kind: str = "full", # full, diag, var ) -> NDArray: """Compute a joint covariance from a sequence of data samples. These samples can be jackknife or bootstrap samples (etc.). If more than one set of samples is provided, the samples are concatenated along the second axis (default) or along the first axis if ``rowvar=True``. Args: samples (:obj`:NDArray`, :obj:`Sequence[NDArray]`): One or many sets of data samples. The number of samples must be identical. method (:obj:`str`, optional): The resampling method that generated the samples, see :obj:`~yaw.config.options.Options.method`. rowvar (:obj:`bool`, optional): Whether the each row represents an observable. Determines the concatenation for multiple input sample sets. kind (:obj:`str`, optional): Determines the kind of covariance computed, see :obj:`~yaw.config.options.Options.kind`. """ ax_samples = 1 if rowvar else 0 ax_observ = 0 if rowvar else 1 # if many samples are provided, concatenate them try: concat_samples = np.concatenate(samples, axis=ax_observ) except AxisError: concat_samples = samples # np.cov will produce a scalar value instead of matrix with shape (N,N) # for a single sample with shape (1,N) if concat_samples.shape[ax_samples] == 1: n_obs = concat_samples.shape[ax_observ] return np.full((n_obs, n_obs), np.nan) if method == "bootstrap": covmat = np.cov(concat_samples, rowvar=rowvar, ddof=1) elif method == "jackknife": n_samples = concat_samples.shape[ax_samples] covmat = np.cov(concat_samples, rowvar=rowvar, ddof=0) * (n_samples - 1) else: raise ValueError(f"invalid sampling method '{method}'") if kind == "full": pass elif kind == "diag": # get a matrix with only the main diagonal elements idx_diag = 0 cov_diags = np.diag(np.diag(covmat, k=idx_diag), k=idx_diag) try: for sample in samples: # go to next diagonal that contains correlations between samples idx_diag += sample.shape[ax_observ] # add just the diagonal values to the existing matrix cov_diags += np.diag(np.diag(covmat, k=-idx_diag), k=-idx_diag) cov_diags += np.diag(np.diag(covmat, k=idx_diag), k=idx_diag) except IndexError: raise covmat = cov_diags elif kind == "var": covmat = np.diag(np.diag(covmat, k=0), k=0) else: raise ValueError(f"invalid covariance kind '{kind}'") return covmat
[docs] def global_covariance( data: Sequence[SampledData], method: str | None = None, kind: str = "full" ) -> NDArray: """Compute a joint covariance from a set of resampled data. Typically applied to a set of :obj:`CorrData`, :obj:`~yaw.redshifts.RedshiftData`, or :obj:`~yaw.redshifts.HistData` containers. The joint covariance is computed by concatenating the samples along the redshift binning axis. .. Warning:: The input containers must have the same number of samples, and use the same resampling method. They also should be of the same type. Args: data (sequence of :obj:`SampledData`): The input containers, should be of the same type. method (:obj:`str`, optional): Specify the sampling method to use. All other containers must follow this convention. kind (:obj:`str`, optional): The method to compute the covariance matrix, see :func:`yaw.core.math.cov_from_samples`. Returns: :obj:`NDArray`: Jointly estimated covariance. Dimension matches the sum of all redshift bins in the input containers. """ if len(data) == 0: raise ValueError("'data' must be a sequence with at least one item") if method is None: method = data[0].method for d in data[1:]: if d.method != method: raise ValueError("resampling method of data items is inconsistent") return cov_from_samples([d.samples for d in data], method=method, kind=kind)
[docs] def corr_from_cov(covariance: NDArray) -> NDArray: """Convert an input covariance matrix to a covariance matrix.""" v = np.sqrt(np.diag(covariance)) outer_v = np.outer(v, v) return covariance / outer_v
[docs] def rebin( bins_new: NDArray[np.float64], bins_old: NDArray[np.float64], counts_old: NDArray[np.float64], ) -> NDArray[np.float64]: """Recompute compute histogram counts for a new binning. The new counts are computed by summing the fractional contribution of the counts from the original binning to the binning. The new binning may exceed or just partially cover the range of the original binning. Args: bins_new (:obj:`NDArray`): The new bin edges on which the counts are reevaluated. bins_old (:obj:`NDArray`): The bin edges from which the original counts were computed. counts_old (:obj:`NDArray`): The original histogram counts. Returns: :obj:`NDArray`: The histogram counts for the new binning. .. Note:: Implemented as C extension. """ return _rebin( bins_new.astype(np.float64), bins_old.astype(np.float64), counts_old.astype(np.float64), )
[docs] def shift_histogram( bins: NDArray, counts: NDArray, *, A: float = 1.0, dx: float = 0.0 ) -> NDArray: """Shift a histogram by a fixed value. The histogram values are recomputed for new bin edges that are shifted by the desired amount using :func:`rebin`. The normalisation of the histogram may change if the shifted binning does not cover the full range of the data. Args: bins (:obj:`NDArray`): The bin edges from which the counts were computed. counts_old (:obj:`NDArray`): The histogram counts. Keyword args: A (:obj:`float`, optional): Scalar amplitude used to rescale the new histgram counts. dx (:obj:`float`, optional): Amount by which the histogram (i.e. the bin edges) are shifted. Returns: :obj:`NDArray`: The shifted histogram counts. """ bins_old = bins.astype(np.float64) bins_new = bins_old + dx return A * _rebin(bins_new, bins_old, counts.astype(np.float64))