"""This module implements containers for storing pair counts
(:obj:`PatchedCount`) and the total number of objects (:obj:`PatchedTotal`) for
pair count normalisation. The data is stored per spatial patch and in bins of
redshift. The containers implement methods to compute total value (summing over
all patches) and samples needed for error estimations after evaluating the
correlation estimator (e.g. jackknife or bootstrap resampling).
Finally, :obj:`NormalisedCounts` implements normalised pair counts and holds
both a :obj:`PatchedCount` and :obj:`PatchedTotal` container. Its
:meth:`NormalisedCounts.sample` method computes the ratio of
counts-to-total-objects and samples thereof.
"""
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from itertools import accumulate
from typing import TYPE_CHECKING, NoReturn, Type, Union
try: # pragma: no cover
from typing import TypeAlias
except ImportError: # pragma: no cover
from typing_extensions import TypeAlias
import h5py
import numpy as np
import pandas as pd
from deprecated import deprecated
from yaw.config import ResamplingConfig
from yaw.core.abc import (
BinnedQuantity,
HDFSerializable,
PatchedQuantity,
concatenate_bin_edges,
)
from yaw.core.containers import Indexer, PatchIDs, SampledData
from yaw.core.math import apply_slice_ndim, outer_triu_sum
if TYPE_CHECKING: # pragma: no cover
from numpy.typing import DTypeLike, NDArray
from pandas import IntervalIndex
__all__ = ["PatchedTotal", "PatchedCount", "NormalisedCounts"]
_compression = dict(fletcher32=True, compression="gzip", shuffle=True)
"""default compression settings for :obj:`h5py.Dataset`."""
TypeSlice: TypeAlias = Union[slice, int, None]
TypeIndex: TypeAlias = Union[int, slice, Sequence]
def sequence_require_type(items: Sequence, class_or_inst: Type | object) -> None:
for item in items:
if not isinstance(item, class_or_inst):
raise TypeError(f"invalid type '{type(item)}' for concatenation")
def check_mergable(patched_arrays: Sequence[PatchedArray], *, patches: bool) -> None:
"""Check if two instaces of PatchedArray can be merged along the patch
or binning axis.
Args:
patched_arrays (:obj:`Sequence[PatchedArray]`):
Instances to merge
Keyword args:
patches (:obj:`bool`):
Whether to check for merging patches or binning.
"""
reference = patched_arrays[0]
for patched in patched_arrays[1:]:
if reference.auto != patched.auto:
raise ValueError("cannot merge mixed cross- and autocorrelations")
if patches:
reference.is_compatible(patched, require=True)
elif reference.n_patches != patched.n_patches:
raise ValueError("cannot merge, patch numbers do not match")
def binning_from_hdf(source: h5py.Group) -> IntervalIndex:
"""Construct a :obj:`pandas.IntervalIndex` from a group in an HDF5 file."""
dset = source["binning"]
left, right = dset[:].T
closed = dset.attrs["closed"]
return pd.IntervalIndex.from_arrays(left, right, closed=closed)
def binning_to_hdf(binning: IntervalIndex, dest: h5py.Group) -> None:
"""Serialise a :obj:`pandas.IntervalIndex` into a group of an HDF5 file.
Stores the left and right edges for each interval, as well as on which side
the intervals are closed as group attribute.
"""
edges = np.column_stack([binning.left, binning.right])
dset = dest.create_dataset("binning", data=edges, **_compression)
dset.attrs["closed"] = binning.closed
def patch_idx_offset(patched: Iterable[PatchedArray]) -> NDArray[np.int64]:
"""Compute the offsets for patch indices of a set of :obj:`PatchedArray` if
they were merged into one large :obj:`PatchedArray`."""
idx_offset = np.fromiter(
accumulate((p.n_patches for p in patched), initial=0),
dtype=np.int64,
count=len(patched),
)
return idx_offset
class PatchedArray(BinnedQuantity, PatchedQuantity, HDFSerializable):
"""Base class that implements the interface for classes that store results
from pair count measurements.
"""
auto = False
"""Whether the stored data are from an autocorrelation measurement."""
def __repr__(self) -> str:
string = super().__repr__()[:-1]
shape = self.shape
return f"{string}, {shape=})"
@abstractmethod
def __eq__(self, other: object) -> bool:
pass
@property
def dtype(self) -> DTypeLike:
"""The numpy data type of the underlying data."""
return np.float64
@property
def shape(self) -> tuple[int]:
"""The shape of underlying data if viewed as array."""
return (self.n_patches, self.n_patches, self.n_bins)
@property
def ndim(self) -> int:
"""The number of dimensions of underlying data if viewed as array."""
return len(self.shape)
@property
def size(self) -> int:
"""The number of items in the underlying data if viewed as array."""
return np.prod(self.shape)
@abstractmethod
def as_array(self) -> NDArray:
"""Get the underlying data as contiguous array.
The array 3-dimensional with shape (N, N, K), where N is the number of
spatial patches, and K is the number of redshift bins."""
pass
@abstractmethod
def _sum(self, config: ResamplingConfig) -> NDArray:
"""Method that implements the sum over all patches."""
pass
@abstractmethod
def _jackknife(self, config: ResamplingConfig, signal: NDArray) -> NDArray:
"""Method that implements generating jackknife samples of the sum over
all patches.
For N patches, draw N realisations by leaving out one of the N patches.
"""
pass
@abstractmethod
def _bootstrap(self, config: ResamplingConfig) -> NDArray:
"""Method that implements generating bootstrap samples of the sum over
all patches.
For N patches, draw M realisations each containing N randomly chosen
patches.
"""
pass
@deprecated(reason="renamed to CorrFunc.sample_sum", version="2.3.1")
def get_sum(self, *args, **kwargs):
"""
.. deprecated:: 2.3.1
Renamed to :meth:`sample_sum`.
"""
return self.sample_sum(*args, **kwargs) # pragma: no cover
def sample_sum(self, config: ResamplingConfig | None = None) -> SampledData:
"""Compute the sum of the data over all patches and samples thereof.
Returns a data container with the sum in each redshift bin and samples
generated from the patches using the resampling method specified in the
configuration parameter.
Args:
config (:obj:`~yaw.config.ResamplingConfig`):
Specifies the resampling method and its customisation
parameters.
Returns:
:obj:`~yaw.core.SampledData`
"""
if config is None:
config = ResamplingConfig() # pragma: no cover
data = self._sum(config)
if self.n_patches > 1:
if config.method == "bootstrap":
samples = self._bootstrap(config)
else:
samples = self._jackknife(config, signal=data)
else:
samples = np.atleast_2d(data)
return SampledData(
binning=self.get_binning(), data=data, samples=samples, method=config.method
)
[docs]
class PatchedTotal(PatchedArray):
"""Container class for the product of the total number of objects of two
samples.
The data in this container, the product of the total number of objects from
two samples, is constructed by multiplting the total number of objects of
both samples (split into spatial patches and redshift bins). This data is
required to normalise pair counts when computing correlation functions, e.g.
to account for different sizes of a data and a random sample.
Internally, the nubmer of objects are stored per sample as :obj:`totals1`
and :obj:`totals2` respectively. The (outer) product for all combinations
of patches is only computed when calling the :meth:`as_array` or
:meth:`sample_sum` methods.
The container supports comparison of the data elements and the redshift
binning with ``==`` and ``!=``. The indexing rules are the same as for
:obj:`PatchedCount`.
.. rubric:: Examples
Select a subset of all redshift bins or all spatial patches:
>>> from yaw.examples import patched_total
>>> patched_total
PatchedTotal(n_bins=30, z='0.070...1.420', shape=(64, 64, 30))
Note how the indicated shape changes when a patch subset is selected:
>>> patched_total.patches[:10]
PatchedTotal(n_bins=30, z='0.070...1.420', shape=(10, 10, 30))
Note how the indicated redshift range and shape change when a bin subset is
selected:
>>> patched_total.bins[:3]
PatchedTotal(n_bins=3, z='0.070...0.205', shape=(64, 64, 3))
An example of iteration over bins, which yields instances with a single
redshift bin:
>>> for zbin in patched_total.bins:
... print(zbin)
... break # just show the first item
PatchedTotal(n_bins=1, z='0.070...0.115', shape=(64, 64, 1))
"""
totals1: NDArray
"""The total number of objects from the first data catalogue per patch and
redshift bin.
The array is of shape (N, K), where N is the number of spatial patches, and
K is the number of redshift bins.
"""
totals2: NDArray
"""The total number of objects from the second data catalogue per patch and
redshift bin.
The array is of shape (N, K), where N is the number of spatial patches, and
K is the number of redshift bins.
"""
def __init__(
self, binning: IntervalIndex, totals1: NDArray, totals2: NDArray, *, auto: bool
) -> None:
"""Construct a new instance from the total number of objects in the
first and second catalog.
Args:
binning (:obj:`pandas.IntervalIndex`):
The redshift binning applied to the data.
totals1 (:obj:`NDArray`):
The total number of objects from the first data catalogue per
patch and redshift bin. The array must be of shape (N, K), where
N is the number of spatial patches, and K is the number of
redshift bins.
totals2 (:obj:`NDArray`):
The total number of objects from the second data catalogue per
patch and redshift bin. The array must be of shape (N, K), where
N is the number of spatial patches, and K is the number of
redshift bins.
Keyword Args:
auto (:obj:`bool`):
Whether the data originates from an autocorrelation measurement.
"""
self._binning = binning
for i, totals in enumerate((totals1, totals2), 1):
if totals.ndim != 2:
raise ValueError(f"'totals{i}' must be two dimensional")
if totals.shape[1] != self.n_bins:
raise ValueError(
f"number of bins for 'totals{i}' does not match 'binning'"
)
if totals1.shape != totals2.shape:
raise ValueError(
f"number of patches and bins do not match: "
f"{totals1.shape} != {totals2.shape}"
)
self.totals1 = totals1
self.totals2 = totals2
self.auto = auto
def __eq__(self, other: object) -> bool:
if isinstance(other, self.__class__):
return (
self.n_bins == other.n_bins
and self.n_patches == other.n_patches
and (self.get_binning() == other.get_binning()).all()
and np.all(self.totals1 == other.totals1)
and np.all(self.totals2 == other.totals2)
and self.auto == other.auto
)
return NotImplemented
[docs]
def as_array(self) -> NDArray:
return np.einsum("i...,j...->ij...", self.totals1, self.totals2)
@property
def bins(self) -> Indexer[TypeIndex, PatchedTotal]:
def builder(inst: PatchedTotal, item: TypeIndex) -> PatchedTotal:
if isinstance(item, int):
item = [item]
return PatchedTotal(
binning=inst._binning[item],
totals1=inst.totals1[:, item],
totals2=inst.totals2[:, item],
auto=inst.auto,
)
return Indexer(self, builder)
@property
def patches(self) -> Indexer[TypeIndex, PatchedTotal]:
def builder(inst: PatchedTotal, item: TypeIndex) -> PatchedTotal:
if isinstance(item, int):
item = [item]
return PatchedTotal(
binning=inst._binning,
totals1=inst.totals1[item],
totals2=inst.totals2[item],
auto=inst.auto,
)
return Indexer(self, builder)
[docs]
def get_binning(self) -> IntervalIndex:
return self._binning
@property
def n_patches(self) -> int:
return self.totals1.shape[0]
[docs]
@classmethod
def from_hdf(cls, source: h5py.Group) -> PatchedTotal:
# reconstruct the binning
binning = binning_from_hdf(source)
# load the data
totals1 = source["totals1"][:]
totals2 = source["totals2"][:]
auto = source["auto"][()]
return cls(binning=binning, totals1=totals1, totals2=totals2, auto=auto)
[docs]
def to_hdf(self, dest: h5py.Group) -> None:
# store the binning
binning_to_hdf(self.get_binning(), dest)
# store the data
dest.create_dataset("totals1", data=self.totals1, **_compression)
dest.create_dataset("totals2", data=self.totals2, **_compression)
dest.create_dataset("auto", data=self.auto)
[docs]
def concatenate_patches(self, *data: PatchedTotal) -> PatchedTotal:
sequence_require_type(data, self.__class__)
all_totals: list[PatchedTotal] = [self, *data]
check_mergable(all_totals, patches=True)
return self.__class__(
binning=self.get_binning().copy(),
totals1=np.concatenate([t.totals1 for t in all_totals], axis=0),
totals2=np.concatenate([t.totals2 for t in all_totals], axis=0),
auto=self.auto,
)
[docs]
def concatenate_bins(self, *data: PatchedTotal) -> PatchedTotal:
sequence_require_type(data, self.__class__)
all_totals: list[PatchedTotal] = [self, *data]
check_mergable(all_totals, patches=False)
binning = concatenate_bin_edges(*all_totals)
return self.__class__(
binning=binning,
totals1=np.concatenate([t.totals1 for t in all_totals], axis=1),
totals2=np.concatenate([t.totals2 for t in all_totals], axis=1),
auto=self.auto,
)
# methods implementing the signal
def _sum_cross(self) -> NDArray:
"""Implements the sum over all patches for crosscorrelation data.
Effectively computes the sum of the outer product of :obj:`totals1` and
:obj:`totals2` along the redshift bin axis.
"""
return np.einsum("i...,j...->...", self.totals1, self.totals2)
def _sum_auto(self) -> NDArray:
"""Implements the sum over all patches for autocorrelation data.
The same as :meth:`_sum_cross`, but only computes the upper triangle of
the outer product and weights the diagonal with 1/2 to account for the
fact that the diagonal elements are true autocorrelations.
"""
sum_upper = outer_triu_sum(self.totals1, self.totals2, k=1)
sum_diag = np.einsum("i...,i...->...", self.totals1, self.totals2)
return sum_upper + 0.5 * sum_diag
def _sum(self, config: ResamplingConfig) -> NDArray:
if self.auto:
return self._sum_auto()
else:
return self._sum_cross()
# methods implementing jackknife samples
def _jackknife_cross(self, signal: NDArray) -> NDArray:
"""Implements jackknife samples of the sum over all patches for
crosscorrelation data.
The same as :meth:`_sum_cross`, but adds an additional dimension as
first axis which lists the samples. For the i-th sample, subtract all
data containing :obj:`totals1` or :obj:`totals2` from the i-th patch.
"""
diag = np.einsum("i...,i...->i...", self.totals1, self.totals2)
rows = np.einsum("i...,j...->i...", self.totals1, self.totals2)
cols = np.einsum("i...,j...->j...", self.totals1, self.totals2)
return signal - rows - cols + diag # subtracted diag twice
def _jackknife_auto(self, signal: NDArray) -> NDArray:
"""Implements jackknife samples of the sum over all patches for
autocorrelation data.
The same as :meth:`_jackknife_cross`, but only computes the upper
triangle of the outer product for each sample and weights the diagonal
with 1/2 to account for the fact that the diagonal elements are true
autocorrelations.
"""
diag = np.einsum("i...,i...->i...", self.totals1, self.totals2)
# sum along axes of upper triangle (without diagonal) of outer product
rows = outer_triu_sum(self.totals1, self.totals2, k=1, axis=1)
cols = outer_triu_sum(self.totals1, self.totals2, k=1, axis=0)
return signal - rows - cols - 0.5 * diag # diag not in rows or cols
def _jackknife(self, config: ResamplingConfig, signal: NDArray) -> NDArray:
if self.auto:
return self._jackknife_auto(signal)
else:
return self._jackknife_cross(signal)
# methods implementing bootstrap samples
def _bootstrap(self, config: ResamplingConfig, **kwargs) -> NoReturn:
"""Bootstrap resampling currently not implemented.
Raises:
:exc:`NotImplementedError`
"""
raise NotImplementedError
[docs]
class PatchedCount(PatchedArray):
"""Container class for pair counts between two samples.
The data in this container are the pair counts between two samples of
points. The counts are stored for each spatial patch and per redshift bin,
forming a data array of shape shape (N, N, K), where N is the number of
spatial patches, and K is the number of redshift bins.
The container supports comparison of the data elements and the redshift
binning with ``==`` and ``!=``. Additionally, :obj:`PatchedCount` can be
added together if they have the same redshift binning and number of patches,
e.g. to add pair counts measured on different scales. The count values can
be rescaled/multiplied by a floating point number, e.g. to apply a weighting
before summing different scales (see also
:func:`yaw.correlation.add_corrfuncs`). Any sequence of :obj:`PatchedCount`
can be summed together with the built-in python function ``sum()``.
Finally, the container supports indexing of and iteration over redshift bins
and spatial patches using the special accessor attributes :obj:`bins` (see
also :obj:`~yaw.core.containers.SampledData`) and :obj:`patches`. Some
examples are listed below.
.. rubric:: Examples
Create a redshift binning:
>>> import pandas as pd
>>> bins = pd.IntervalIndex.from_breaks([0.1, 0.2, 0.3])
>>> bins
IntervalIndex([(0.1, 0.2], (0.2, 0.3]], dtype='interval[float64, right]')
Create two data containers with some dummy values:
>>> count1 = PatchedCount.zeros(bins, n_patches=5, auto=False)
>>> count1.counts += 1 # set all counts with dummy value 1
>>> count2 = PatchedCount.zeros(bins, n_patches=5, auto=False)
>>> count2.counts += 2 # set all counts with dummy value 2
>>> count2
PatchedCount(n_bins=2, z='0.100...0.300', shape=(5, 5, 2))
Sum the pair counts and compare different methods:
>>> summed = count1 + count2
>>> summed
PatchedCount(n_bins=2, z='0.100...0.300', shape=(5, 5, 2))
>>> sum([count1, count2]) == summed
True
>>> (summed.counts == 3).all()
True
Rescale the pair counts:
>>> count1 * 2.0
PatchedCount(n_bins=2, z='0.100...0.300', shape=(5, 5, 2))
>>> count1 * 2.0 == count2
True
Select a subset of all redshift bins or all spatial patches:
>>> from yaw.examples import patched_count
>>> patched_count
PatchedCount(n_bins=30, z='0.070...1.420', shape=(64, 64, 30))
Note how the indicated shape changes when a patch subset is selected:
>>> patched_count.patches[:10]
PatchedCount(n_bins=30, z='0.070...1.420', shape=(10, 10, 30))
Note how the indicated redshift range and shape change when a bin subset is
selected:
>>> patched_count.bins[:3]
PatchedCount(n_bins=3, z='0.070...0.205', shape=(64, 64, 3))
An example of iteration over bins, which yields instances with a single
redshift bin:
>>> for zbin in patched_count.bins:
... print(zbin)
... break # just show the first item
PatchedCount(n_bins=1, z='0.070...0.115', shape=(64, 64, 1))
"""
counts: NDArray
"""Internal data array containing the pair counts between spatial patches in
bins of redshift.
The array is 3-dimensional with shape (N, N, K), where N is the number of
spatial patches, and K is the number of redshift bins. Same as
:meth:`as_array`.
"""
def __init__(
self,
binning: IntervalIndex,
counts: NDArray,
*,
auto: bool,
) -> None:
"""Construct a new instance from an existing pair count array.
Args:
binning (:obj:`pandas.IntervalIndex`):
The redshift binning applied to the data.
counts (:obj:`NDArray`):
Internal data array containing the pair counts between spatial
patches in bins of redshift. The array must be 3-dimensional
with shape (N, N, K), where N is the number of spatial patches,
and K is the number of redshift bins. Same as :meth:`as_array`.
Keyword Args:
auto (:obj:`bool`):
Whether the data originates from an autocorrelation measurement.
"""
if counts.ndim != 3 or counts.shape[0] != counts.shape[1]:
raise IndexError("counts must be of shape (n_patches, n_patches, n_bins)")
if counts.shape[2] != len(binning):
raise ValueError("length of 'binning' and 'counts' dimension do not match")
self._binning = binning
self.counts = counts
self.auto = auto
[docs]
@classmethod
def zeros(
cls,
binning: IntervalIndex,
n_patches: int,
*,
auto: bool,
dtype: DTypeLike = np.float64,
) -> PatchedCount:
"""Create a new instance where all elements of the counts array are
initialised to zero.
Args:
binning (:obj:`pandas.IntervalIndex`):
Redshift binning for the container, determines size of last data
array dimension.
n_patches (:obj:`int`):
Number of spatial patches, determines the size of the first two
data array dimensions.
Keyword Args:
auto (:obj:`bool`):
Whether the data originates from an autocorrelation measurement.
dtype (:obj:`DTypeLike`, optional):
Data type to use for the internal data array.
"""
counts = np.zeros((n_patches, n_patches, len(binning)), dtype=dtype)
return cls(binning, counts, auto=auto)
def __eq__(self, other: object) -> bool:
if isinstance(other, self.__class__):
return (
self.n_bins == other.n_bins
and self.n_patches == other.n_patches
and (self.get_binning() == other.get_binning()).all()
and np.all(self.counts == other.counts)
and (self.auto == other.auto)
)
return NotImplemented
def __add__(self, other: object) -> PatchedCount:
if isinstance(other, self.__class__):
self.is_compatible(other, require=True)
if self.n_patches != other.n_patches:
raise ValueError("number of patches does not agree")
return self.__class__(
self.get_binning(), self.counts + other.counts, auto=self.auto
)
return NotImplemented
def __radd__(self, other: object) -> PatchedCount:
if np.isscalar(other) and other == 0:
return self
return other.__add__(self)
def __mul__(self, other: object) -> PatchedCount:
if np.isscalar(other) and not isinstance(other, (bool, np.bool_)):
return self.__class__(
self.get_binning(), self.counts * other, auto=self.auto
)
return NotImplemented
[docs]
def set_measurement(self, key: PatchIDs | tuple[int, int], item: NDArray):
"""Set the counts value in all redshift bins for a pair of patch
indices.
Args:
key (:obj:`yaw.core.containers.PatchIDs`, tuple):
Pair of patch indices for which the new values are set.
item (:obj:`NDArray`):
Values to set, must be an array with length matching the number
of redshift bins.
"""
# check the key
if not isinstance(key, tuple):
raise TypeError(f"slice must be of type {tuple}")
elif len(key) != 2:
raise IndexError(
f"too many indices for array assignment: index must be "
f"2-dimensional, but {len(key)} where indexed"
)
# check the item
item = np.asarray(item)
if item.shape != (self.n_bins,):
raise ValueError(f"can only set items with length n_bins={self.n_bins}")
# insert values
self.counts[key] = item
[docs]
def as_array(self) -> NDArray:
return self.counts
[docs]
def sum(self, axis: int | tuple[int] | None = None, **kwargs) -> NDArray:
"""Shorthand for :meth:`PatchedCount.counts.sum`
Args:
axis (:obj:`tuple`, :obj:`int`, optional):
Axis over which the internal 3-dimensional data array is summed.
**kwargs:
Keyword arguments passed to :meth:`numpy.ndarry.sum`.
"""
return self.counts.sum(axis=axis, **kwargs)
@property
def bins(self) -> Indexer[TypeIndex, PatchedCount]:
def builder(inst: PatchedCount, item: TypeIndex) -> PatchedCount:
if isinstance(item, int):
item = [item]
return PatchedCount(
binning=inst._binning[item],
counts=apply_slice_ndim(inst.counts, item, axis=2),
auto=inst.auto,
)
return Indexer(self, builder)
@property
def patches(self) -> Indexer[TypeIndex, PatchedCount]:
def builder(inst: PatchedCount, item: TypeIndex) -> PatchedCount:
return PatchedCount(
binning=inst._binning,
counts=apply_slice_ndim(inst.counts, item, axis=(0, 1)),
auto=inst.auto,
)
return Indexer(self, builder)
[docs]
def get_binning(self) -> IntervalIndex:
return self._binning
@property
def n_patches(self) -> int:
return self.counts.shape[0]
@property
def n_bins(self) -> int:
return len(self.get_binning())
[docs]
def keys(self) -> NDArray:
"""Array of patch index pairs with non-zero pair counts.
The index pairs are ordered by first, then second index. The returned
array is of shape (N, 2), where N is the number patches that contain
non-zero entries in any of the redshift bins.
"""
has_data = np.any(self.counts, axis=2)
indices = np.nonzero(has_data)
return np.column_stack(indices)
[docs]
def values(self) -> NDArray:
"""Array of non-zero pair count values.
The values are ordered in the same way as the indices returned by
:meth:`keys`.
"""
keys = self.keys() # shape (n_nonzero, 2)
i1, i2 = keys.T
return self.counts[i1, i2, :] # shape (n_nonzero, n_bins)
[docs]
@classmethod
def from_hdf(cls, source: h5py.Group) -> PatchedCount:
# reconstruct the binning
binning = binning_from_hdf(source)
# load the sparse data representation
keys = [tuple(key) for key in source["keys"][:]]
data = source["data"][:]
n_patches = source["n_patches"][()]
auto = source["auto"][()]
# build dense counts matrix
counts = np.zeros((n_patches, n_patches, len(binning)), dtype=data.dtype)
for key, values in zip(keys, data):
counts[key] = values
return cls(binning=binning, counts=counts, auto=auto)
[docs]
def to_hdf(self, dest: h5py.Group) -> None:
# store the binning
binning_to_hdf(self.get_binning(), dest)
# store the data
dest.create_dataset("keys", data=self.keys(), **_compression)
dest.create_dataset("data", data=self.values(), **_compression)
dest.create_dataset("n_patches", data=self.n_patches)
dest.create_dataset("auto", data=self.auto)
[docs]
def concatenate_patches(self, *data: PatchedCount) -> PatchedCount:
sequence_require_type(data, self.__class__)
all_counts: list[PatchedCount] = [self, *data]
check_mergable(all_counts, patches=True)
offsets = patch_idx_offset(all_counts)
n_patches = [count.n_patches for count in all_counts]
merged = self.__class__.zeros(
binning=self.get_binning(),
n_patches=sum(n_patches),
auto=self.auto,
)
# insert the blocks of counts into the merged counts array
loc = 0
for count, offset, n in zip(all_counts, offsets, n_patches):
i_start = offset
i_end = i_start + n
merged.counts[i_start:i_end, i_start:i_end] = count.counts
loc += offset
return merged
[docs]
def concatenate_bins(self, *data: PatchedCount) -> PatchedCount:
sequence_require_type(data, self.__class__)
all_counts: list[PatchedCount] = [self, *data]
check_mergable(all_counts, patches=False)
binning = concatenate_bin_edges(*all_counts)
merged = self.__class__.zeros(
binning=binning, n_patches=self.n_patches, auto=self.auto, dtype=self.dtype
)
merged.counts = np.concatenate([count.counts for count in all_counts], axis=2)
return merged
# methods implementing the signal
def _bin_sum_diag(self, data: NDArray) -> np.number:
"""Implements the sum over the autocorrelation patches in a single
redshift bin.
The autocorrelation patches are stored in the diagonal of the first two
dimension of the pair count data array.
.. Note::
This method is valid for both cross- and autocorrelation data.
"""
return np.diagonal(data).sum()
def _bin_sum_cross(self, data: NDArray) -> np.number:
"""Implements the sum over all patches for crosscorrelation data in a
single redshift bin.
"""
return data.sum()
def _bin_sum_auto(self, data: NDArray) -> np.number:
"""Implements the sum over all patches for autocorrelation data in a
single redshift bin.
The same as :meth:`_bin_sum_cross`, but only sums the upper triangle of
the counts array.
"""
return np.triu(data).sum()
def _sum(self, config: ResamplingConfig) -> NDArray:
out = np.zeros(self.n_bins)
# compute the counts bin-wise
for i in range(self.n_bins):
data = self.counts[:, :, i]
if config.crosspatch:
if self.auto:
out[i] = self._bin_sum_auto(data)
else:
out[i] = self._bin_sum_cross(data)
else:
out[i] = self._bin_sum_diag(data)
return out
# methods implementing jackknife samples
def _bin_jackknife_diag(self, data: NDArray, signal: NDArray) -> NDArray:
"""Implements jackknife samples of the sum over the autocorrelation
patches in a single redshift bin.
The autocorrelation patches are stored in the diagonal of the first two
dimension of the pair count data array. Samples are computed by leaving
out the i-th patch in the i-th sample.
.. Note::
This method is valid for both cross- and autocorrelation data.
"""
return signal - np.diagonal(data) # broadcast to (n_patches,)
def _bin_jackknife_cross(self, data: NDArray, signal: NDArray) -> NDArray:
"""Implements jackknife samples of the sum over all patches for
crosscorrelation data in a single redshift bin.
The same as :meth:`_bin_sum_cross`, but adds an additional dimension as
first axis which lists the samples. For the i-th sample, subtract all
data containing counts with objects of the i-th patch.
"""
diag = np.diagonal(data)
rows = data.sum(axis=1)
cols = data.sum(axis=0)
return signal - rows - cols + diag # broadcast to (n_patches,)
def _bin_jackknife_auto(self, data: NDArray, signal: NDArray) -> NDArray:
"""Implements jackknife samples of the sum over all patches for
autocorrelation data in a single redshift bin.
The same as :meth:`_bin_jackknife_cross`, but only sums elements on the
main diagonal and upper triangle of the pair count array.
"""
diag = np.diagonal(data)
# sum along axes of upper triangle (without diagonal) of outer product
tri_upper = np.triu(data, k=1)
rows = tri_upper.sum(axis=1).flatten()
cols = tri_upper.sum(axis=0).flatten()
return signal - rows - cols - diag # broadcast to (n_patches,)
def _jackknife(self, config: ResamplingConfig, signal: NDArray) -> NDArray:
out = np.empty((self.n_patches, self.n_bins))
# compute the counts bin-wise
for i, bin_signal in enumerate(signal):
data = self.counts[:, :, i]
if config.crosspatch:
if self.auto:
out[:, i] = self._bin_jackknife_auto(data, bin_signal)
else:
out[:, i] = self._bin_jackknife_cross(data, bin_signal)
else:
out[:, i] = self._bin_jackknife_diag(data, bin_signal)
return out
# methods implementing bootstrap samples
def _bootstrap(self, config: ResamplingConfig, **kwargs) -> NoReturn:
"""Bootstrap resampling currently not implemented.
Raises:
:exc:`NotImplementedError`
"""
raise NotImplementedError
[docs]
@dataclass(frozen=True)
class NormalisedCounts(PatchedQuantity, BinnedQuantity, HDFSerializable):
"""Container to store counts and the total number of objects obtained from
measuring pair counts for a correlation function.
Both input containers must have the same binning and the same number of
spatial patches. The container supports the same arithmetic as
:obj:`PatchedCount` (see the listed examples), i.e. comparison, addition,
multiplication by a scalar, as well as indexing. The resulting pair counts
are always normalised by the number of objects stored in :obj:`total`.
Args:
count (:obj:`PatchedCount`):
The pair count container.
total (:obj:`PatchedTotal`):
The container for the total number of objects from the samples.
"""
count: PatchedCount
"""The pair count container."""
total: PatchedTotal
"""The container for the total number of objects from the samples."""
def __post_init__(self) -> None:
if self.count.n_patches != self.total.n_patches:
raise ValueError("number of patches of 'count' and total' do not match")
if self.count.n_bins != self.total.n_bins:
raise ValueError("number of bins of 'count' and total' do not match")
def __repr__(self) -> str:
string = super().__repr__()[:-1]
n_patches = self.n_patches
return f"{string}, {n_patches=})"
def __eq__(self, other: object) -> bool:
if isinstance(other, self.__class__):
return self.count == other.count and self.total == other.total
return NotImplemented
def __add__(self, other: object) -> NormalisedCounts:
if isinstance(other, self.__class__):
count = self.count + other.count
if np.any(self.total.totals1 != other.total.totals1) or np.any(
self.total.totals2 != other.total.totals2
):
raise ValueError("total number of objects do not agree for operands")
return self.__class__(count, self.total)
return NotImplemented
def __radd__(self, other: object) -> NormalisedCounts:
if np.isscalar(other) and other == 0:
return self
return other.__add__(self)
def __mul__(self, other: object) -> NormalisedCounts:
if np.isscalar(other) and not isinstance(other, (bool, np.bool_)):
return self.__class__(self.count * other, self.total)
return NotImplemented
@property
def auto(self) -> bool:
"""Whether the stored data are from an autocorrelation measurement."""
return self.count.auto
@property
def bins(self) -> Indexer[TypeIndex, NormalisedCounts]:
def builder(inst: NormalisedCounts, item: TypeIndex) -> NormalisedCounts:
if isinstance(item, int):
item = [item]
return NormalisedCounts(
count=inst.count.bins[item], total=inst.total.bins[item]
)
return Indexer(self, builder)
@property
def patches(self) -> Indexer[TypeIndex, NormalisedCounts]:
def builder(inst: NormalisedCounts, item: TypeIndex) -> NormalisedCounts:
return NormalisedCounts(
count=inst.count.patches[item], total=inst.total.patches[item]
)
return Indexer(self, builder)
[docs]
def get_binning(self) -> IntervalIndex:
return self.total.get_binning()
@property
def n_patches(self) -> int:
return self.total.n_patches
[docs]
def sample(self, config: ResamplingConfig) -> SampledData:
"""Sum the pair counts across all spatial patches and generate samples
from resampling the patches.
Calls the ``sample_sum()`` method of the :obj:`counts` and :obj:`total`
containers. Returns a :obj:`~yaw.SampledData` instance that contains the
normalised pair counts and spatially resampled values. Effectively
computes :math:`\\frac{XX}{n_1 n_2}`, where :math:XX` are the number of
pairs between the two samples with a total number of objects :math:`n_1`
and :math:`n_2` respectively.
Args:
config (:obj:`~yaw.config.ResamplingConfig`):
Specifies the resampling method and its customisation
parameters.
Returns:
:obj:`~yaw.core.SampledData`
"""
counts = self.count.sample_sum(config)
totals = self.total.sample_sum(config)
samples = SampledData(
binning=self.get_binning(),
data=(counts.data / totals.data),
samples=(counts.samples / totals.samples),
method=config.method,
)
return samples
[docs]
@classmethod
def from_hdf(cls, source: h5py.Group) -> NormalisedCounts:
count = PatchedCount.from_hdf(source["count"])
total = PatchedTotal.from_hdf(source["total"])
return cls(count=count, total=total)
[docs]
def to_hdf(self, dest: h5py.Group) -> None:
group = dest.create_group("count")
self.count.to_hdf(group)
group = dest.create_group("total")
self.total.to_hdf(group)
[docs]
def concatenate_patches(self, *pcounts: NormalisedCounts) -> NormalisedCounts:
sequence_require_type(pcounts, self.__class__)
counts = [pc.count for pc in pcounts]
totals = [pc.total for pc in pcounts]
return self.__class__(
count=self.count.concatenate_patches(*counts),
total=self.total.concatenate_patches(*totals),
)
[docs]
def concatenate_bins(self, *pcounts: NormalisedCounts) -> NormalisedCounts:
sequence_require_type(pcounts, self.__class__)
counts = [pc.count for pc in pcounts]
totals = [pc.total for pc in pcounts]
return self.__class__(
count=self.count.concatenate_bins(*counts),
total=self.total.concatenate_bins(*totals),
)
def pack_results(
count_dict: dict[str, PatchedCount], total: PatchedTotal
) -> NormalisedCounts | dict[str, NormalisedCounts]:
"""Pack pair counts and the total number of objects.
If measured for multiple scales, the counts should be a dictionary of with
the scale name as key. In this case, the function returns a dictionary of
:obj:`NormalisedCounts` with the same keys.
"""
# drop the dictionary if there is only one scale
if len(count_dict) == 1:
count = tuple(count_dict.values())[0]
result = NormalisedCounts(count=count, total=total)
else:
result = {}
for scale_key, count in count_dict.items():
result[scale_key] = NormalisedCounts(count=count, total=total)
return result