Skip to content

Grid classes

DenseHypergrid

hypergrid.base.dense_tensor_hypergrid.DenseTensorHypergrid

Bases: BaseTensorHypergrid

Hypergrid backed by a dense numpy array.

Best for low-dimensional grids where most bins receive data. Memory footprint is O(prod(n_bins_per_dim)), independent of sparsity.

Parameters:

Name Type Description Default
edges list of array-like

Bin edges per dimension (length n_bins_d + 1 each).

required
Source code in hypergrid\base\dense_tensor_hypergrid.py
class DenseTensorHypergrid(BaseTensorHypergrid):
    """
    Hypergrid backed by a dense numpy array.

    Best for low-dimensional grids where most bins receive data.
    Memory footprint is O(prod(n_bins_per_dim)), independent of sparsity.

    Parameters
    ----------
    edges : list of array-like
        Bin edges per dimension (length n_bins_d + 1 each).
    """

    def __init__(self, edges):
        super().__init__(edges)
        self.storage = DenseTensorStorage(self.shape)

    def get_mass(self):
        return self.storage.to_dict()

    def to_dense(self):
        return self.storage.data.copy()

    def to_sparse(self):
        return self.storage.to_dict()

SparseHypergrid

hypergrid.base.sparse_tensor_hypergrid.SparseTensorHypergrid

Bases: BaseTensorHypergrid

Hypergrid backed by a sparse dict with explicit bounds checking.

Best for high-dimensional grids or data that occupies only a small fraction of the bin space. Memory scales with non-zero bin count.

Parameters:

Name Type Description Default
edges list of array-like

Bin edges per dimension (length n_bins_d + 1 each).

required
Source code in hypergrid\base\sparse_tensor_hypergrid.py
class SparseTensorHypergrid(BaseTensorHypergrid):
    """
    Hypergrid backed by a sparse dict with explicit bounds checking.

    Best for high-dimensional grids or data that occupies only a small
    fraction of the bin space. Memory scales with non-zero bin count.

    Parameters
    ----------
    edges : list of array-like
        Bin edges per dimension (length n_bins_d + 1 each).
    """

    def __init__(self, edges):
        super().__init__(edges)
        self.storage = SparseTensorStorage(self.shape)

    def get_mass(self):
        return self.storage.to_dict()

    def to_dense(self):
        arr = np.zeros(self.shape)
        for idx, v in self.storage.items():
            arr[idx] = v
        return arr

    def to_sparse(self):
        return self.storage.to_dict()

StaticHypergrid

hypergrid.base.static_hypergrid.StaticHypergrid

Bases: BaseTensorHypergrid

Hypergrid with user-specified edges and a pluggable storage backend.

The default storage is DictStorage (sparse, no bounds checking). Pass any object that implements add / items / clear / scale to swap backends.

This is the most flexible variant — useful when you want to combine custom storage with the full mixin stack (rebin, compare, visualize).

Parameters:

Name Type Description Default
edges list of array-like

Bin edges per dimension.

required
storage storage backend

Defaults to DictStorage.

None
Source code in hypergrid\base\static_hypergrid.py
class StaticHypergrid(BaseTensorHypergrid):
    """
    Hypergrid with user-specified edges and a pluggable storage backend.

    The default storage is DictStorage (sparse, no bounds checking).
    Pass any object that implements add / items / clear / scale to swap backends.

    This is the most flexible variant — useful when you want to combine
    custom storage with the full mixin stack (rebin, compare, visualize).

    Parameters
    ----------
    edges : list of array-like
        Bin edges per dimension.
    storage : storage backend, optional
        Defaults to DictStorage.
    """

    def __init__(self, edges, storage=None):
        super().__init__(edges)
        self.storage = storage or DictStorage()

    def get_mass(self):
        return dict(self.storage.items())

AdaptiveHypergrid

hypergrid.base.adaptive_hypergrid.AdaptiveHypergrid

Bases: BaseTensorHypergrid

Hypergrid that automatically rebins when the overflow fraction exceeds a threshold, adapting the grid to concept drift.

A rolling buffer of recent points is kept so that rebinning uses the most recent data distribution rather than the full history.

Parameters:

Name Type Description Default
edges list of array-like

Initial bin edges. If None, computed from the first batch passed to fit.

None
drift_threshold float

Fraction of out-of-bounds points that triggers a rebin (default 0.05).

0.05
buffer_size int

Maximum number of recent points kept for rebinning.

5000
binning_method (fd, sturges, sqrt)

Edge computation method used during rebinning.

"fd"
max_bins int

Per-dimension bin cap used during rebinning.

50
Source code in hypergrid\base\adaptive_hypergrid.py
class AdaptiveHypergrid(BaseTensorHypergrid):
    """
    Hypergrid that automatically rebins when the overflow fraction exceeds
    a threshold, adapting the grid to concept drift.

    A rolling buffer of recent points is kept so that rebinning uses the
    most recent data distribution rather than the full history.

    Parameters
    ----------
    edges : list of array-like, optional
        Initial bin edges. If None, computed from the first batch passed to fit.
    drift_threshold : float
        Fraction of out-of-bounds points that triggers a rebin (default 0.05).
    buffer_size : int
        Maximum number of recent points kept for rebinning.
    binning_method : {"fd", "sturges", "sqrt"}
        Edge computation method used during rebinning.
    max_bins : int
        Per-dimension bin cap used during rebinning.
    """

    def __init__(
        self,
        edges=None,
        drift_threshold=0.05,
        buffer_size=5000,
        binning_method="fd",
        max_bins=50,
    ):
        super().__init__(edges)  # edges=None is handled by BaseTensorHypergrid
        self.storage = DictStorage()
        self.buffer = deque(maxlen=buffer_size)
        self.drift_threshold = drift_threshold
        self.binning_method = binning_method
        self.max_bins = max_bins

        self._overflow = 0.0
        self._total = 0.0
        self._drift_history = []  # overflow fraction recorded at each rebin

    # ------------------------------------------------------------------
    # Core
    # ------------------------------------------------------------------

    def fit(self, data, weights=None):
        data = np.asarray(data, dtype=float)
        if data.ndim == 1:
            data = data[np.newaxis, :]
        self._init_edges(data)
        self.storage.clear()
        self._overflow = 0.0
        self._total = 0.0
        self.buffer.clear()
        self._accumulate(data, weights)

    def update(self, data, weights=None):
        data = np.asarray(data, dtype=float)
        if data.ndim == 1:
            data = data[np.newaxis, :]

        if self.edges is None:
            # Auto-initialise on first update if fit() was never called.
            self._init_edges(data)

        self._accumulate(data, weights)

    def get_mass(self):
        return dict(self.storage.items())

    # ------------------------------------------------------------------
    # Internal
    # ------------------------------------------------------------------

    def _accumulate(self, data, weights):
        if weights is None:
            weights = np.ones(len(data))
        else:
            weights = np.asarray(weights, dtype=float)

        for point, w in zip(data, weights):
            self.buffer.append((point, w))
            self._total += w
            idx = self._get_bin_index(point)
            if idx is None:
                self._overflow += w
            else:
                self.storage.add(idx, w)

        if self._total > 0 and (self._overflow / self._total) > self.drift_threshold:
            self._rebin()

    def _rebin(self):
        self._drift_history.append(self._overflow / self._total)

        buf_points = np.array([p for p, _ in self.buffer])
        buf_weights = np.array([w for _, w in self.buffer])

        self._init_edges(buf_points)
        self.storage.clear()
        self._overflow = 0.0
        self._total = 0.0

        # Re-bin buffered points without triggering another rebin (direct add).
        for point, w in zip(buf_points, buf_weights):
            idx = self._get_bin_index(point)
            if idx is not None:
                self.storage.add(idx, w)
            else:
                self._overflow += w
            self._total += w

    def _init_edges(self, data):
        edges = compute_edges(data, method=self.binning_method, max_bins=self.max_bins)
        self.edges = [np.asarray(e, dtype=float) for e in edges]
        self.dim = len(self.edges)
        self.shape = [len(e) - 1 for e in self.edges]

TemporalHypergrid

hypergrid.base.temporal_hypergrid.TemporalHypergrid

Wrapper that adds temporal tracking to any hypergrid.

Applies optional exponential decay to old counts before each update and saves periodic snapshots of the distribution so drift can be measured.

Parameters:

Name Type Description Default
grid BaseTensorHypergrid instance

The underlying hypergrid (DenseTensorHypergrid, SparseHypergrid, etc.)

required
decay float

Multiplicative factor applied to all counts before each update batch (e.g. 0.99 for slow forgetting). None means no decay.

None
snapshot_interval int

Save a snapshot every N data points processed.

1000
Source code in hypergrid\base\temporal_hypergrid.py
class TemporalHypergrid:
    """
    Wrapper that adds temporal tracking to any hypergrid.

    Applies optional exponential decay to old counts before each update and
    saves periodic snapshots of the distribution so drift can be measured.

    Parameters
    ----------
    grid : BaseTensorHypergrid instance
        The underlying hypergrid (DenseTensorHypergrid, SparseHypergrid, etc.)
    decay : float, optional
        Multiplicative factor applied to all counts before each update batch
        (e.g. 0.99 for slow forgetting). None means no decay.
    snapshot_interval : int
        Save a snapshot every N data points processed.
    """

    def __init__(self, grid, decay=None, snapshot_interval=1000):
        self.grid = grid
        self.decay = decay
        self.snapshot_interval = snapshot_interval
        self._counter = 0
        self.snapshots = []

    # ------------------------------------------------------------------
    # Forwarded interface — TemporalHypergrid can be used wherever a
    # plain hypergrid is expected.
    # ------------------------------------------------------------------

    @property
    def dim(self):
        return self.grid.dim

    @property
    def edges(self):
        return self.grid.edges

    @property
    def shape(self):
        return self.grid.shape

    def get_mass(self):
        return self.grid.get_mass()

    def get_edges(self):
        return self.grid.get_edges()

    def describe(self, percentiles=None):
        """Delegate to the underlying grid's describe(). See StatsMixin.describe."""
        return self.grid.describe(percentiles=percentiles)

    def fit(self, data, weights=None):
        self.grid.fit(data, weights)
        self._counter = 0
        self.snapshots.clear()

    def update(self, data, weights=None):
        data = np.asarray(data)

        if self.decay is not None:
            self.grid.storage.scale(self.decay)

        self.grid.update(data, weights)
        self._counter += len(data)

        if self.snapshot_interval and (self._counter % self.snapshot_interval == 0):
            self.snapshots.append(dict(self.grid.get_mass()))

    # ------------------------------------------------------------------
    # Temporal analysis
    # ------------------------------------------------------------------

    def evolution(self, method="js"):
        """
        Compute divergence between consecutive snapshots.

        Parameters
        ----------
        method : {"js", "kl", "l1", "wasserstein"}

        Returns
        -------
        list of float
        """
        if len(self.snapshots) < 2:
            return []

        if method == "wasserstein":
            return self._evolution_wasserstein()

        fn = {"js": _js, "kl": _kl, "l1": _l1}[method]
        return [
            fn(_normalize(h1), _normalize(h2))
            for h1, h2 in zip(self.snapshots, self.snapshots[1:])
        ]

    def _evolution_wasserstein(self):
        from hypergrid.base.static_hypergrid import StaticHypergrid
        from hypergrid.storage.storage import DictStorage

        edges = self.grid.get_edges()
        results = []
        for h1, h2 in zip(self.snapshots, self.snapshots[1:]):
            s1 = DictStorage()
            for k, v in h1.items():
                s1.data[k] = v
            s2 = DictStorage()
            for k, v in h2.items():
                s2.data[k] = v
            g1 = StaticHypergrid(edges, storage=s1)
            g2 = StaticHypergrid(edges, storage=s2)
            results.append(g1.compare(g2, method="wasserstein"))
        return results

    def plot_evolution(self, method="js"):
        """Plot divergence between consecutive snapshots over time."""
        distances = self.evolution(method)
        if not distances:
            raise ValueError("Need at least 2 snapshots. Process more data or reduce snapshot_interval.")
        plt.plot(distances, marker="o")
        plt.title(f"Distribution drift over time ({method.upper()})")
        plt.xlabel("Snapshot index")
        plt.ylabel("Divergence")
        plt.tight_layout()
        plt.show()

    def plot_temporal_umap(self, n_per_snapshot=500, **umap_kwargs):
        """
        UMAP projection of all snapshots coloured by snapshot index.
        Each snapshot contributes n_per_snapshot sampled points.
        """
        import umap as _umap
        from hypergrid.base.sparse_tensor_hypergrid import SparseTensorHypergrid

        samples, labels = [], []
        for i, snap in enumerate(self.snapshots):
            tmp = SparseTensorHypergrid(self.grid.get_edges())
            tmp.storage.data.update(snap)
            pts = tmp.sample(n_per_snapshot)
            samples.append(pts)
            labels.extend([i] * len(pts))

        X = np.vstack(samples)
        emb = _umap.UMAP(**umap_kwargs).fit_transform(X)

        plt.scatter(emb[:, 0], emb[:, 1], c=labels, s=3, cmap="viridis")
        plt.colorbar(label="Snapshot index")
        plt.title("Temporal evolution (UMAP)")
        plt.tight_layout()
        plt.show()
describe(percentiles=None)

Delegate to the underlying grid's describe(). See StatsMixin.describe.

Source code in hypergrid\base\temporal_hypergrid.py
def describe(self, percentiles=None):
    """Delegate to the underlying grid's describe(). See StatsMixin.describe."""
    return self.grid.describe(percentiles=percentiles)
evolution(method='js')

Compute divergence between consecutive snapshots.

Parameters:

Name Type Description Default
method (js, kl, l1, wasserstein)
"js"

Returns:

Type Description
list of float
Source code in hypergrid\base\temporal_hypergrid.py
def evolution(self, method="js"):
    """
    Compute divergence between consecutive snapshots.

    Parameters
    ----------
    method : {"js", "kl", "l1", "wasserstein"}

    Returns
    -------
    list of float
    """
    if len(self.snapshots) < 2:
        return []

    if method == "wasserstein":
        return self._evolution_wasserstein()

    fn = {"js": _js, "kl": _kl, "l1": _l1}[method]
    return [
        fn(_normalize(h1), _normalize(h2))
        for h1, h2 in zip(self.snapshots, self.snapshots[1:])
    ]
plot_evolution(method='js')

Plot divergence between consecutive snapshots over time.

Source code in hypergrid\base\temporal_hypergrid.py
def plot_evolution(self, method="js"):
    """Plot divergence between consecutive snapshots over time."""
    distances = self.evolution(method)
    if not distances:
        raise ValueError("Need at least 2 snapshots. Process more data or reduce snapshot_interval.")
    plt.plot(distances, marker="o")
    plt.title(f"Distribution drift over time ({method.upper()})")
    plt.xlabel("Snapshot index")
    plt.ylabel("Divergence")
    plt.tight_layout()
    plt.show()
plot_temporal_umap(n_per_snapshot=500, **umap_kwargs)

UMAP projection of all snapshots coloured by snapshot index. Each snapshot contributes n_per_snapshot sampled points.

Source code in hypergrid\base\temporal_hypergrid.py
def plot_temporal_umap(self, n_per_snapshot=500, **umap_kwargs):
    """
    UMAP projection of all snapshots coloured by snapshot index.
    Each snapshot contributes n_per_snapshot sampled points.
    """
    import umap as _umap
    from hypergrid.base.sparse_tensor_hypergrid import SparseTensorHypergrid

    samples, labels = [], []
    for i, snap in enumerate(self.snapshots):
        tmp = SparseTensorHypergrid(self.grid.get_edges())
        tmp.storage.data.update(snap)
        pts = tmp.sample(n_per_snapshot)
        samples.append(pts)
        labels.extend([i] * len(pts))

    X = np.vstack(samples)
    emb = _umap.UMAP(**umap_kwargs).fit_transform(X)

    plt.scatter(emb[:, 0], emb[:, 1], c=labels, s=3, cmap="viridis")
    plt.colorbar(label="Snapshot index")
    plt.title("Temporal evolution (UMAP)")
    plt.tight_layout()
    plt.show()