diff --git a/simplestac/extents.py b/simplestac/extents.py index 1979f67ef37ff3e05e7b54eb4eff74298ee0028c..069e409ba7a5e7874ba10331350ace108108f528 100644 --- a/simplestac/extents.py +++ b/simplestac/extents.py @@ -1,12 +1,38 @@ """ -Module to deal with STAC Extents. +Deal with STAC Extents. + +# AutoSpatialExtent + +The vanilla `pystac.SpatialExtent` enables to describe a spatial extent +from several bounding boxes. While this is useful, sometimes we want to +merge together bounding boxes that are partially overlapping. For instance, +we can take the example of the France mainland, covered by multiple remote +sensing products that are generally partially overlapping, and the Corse +island that is also covered by a number of RS products, but spatially +disjoint from the France mainland RS products bounding boxes. In this +particular exemple, we would like to regroup all RS products bounding +boxes so that there is one bbox for the France mainland, and another +bbox for the Corse island. This is particularly useful when a STAC +collection covers sparsely a broad area (e.g. worldwide), with several +isolated regions. + +The `AutoSpatialExtent` is an extension of the `pystac.SpatialExtent`. + +Instances are initialized with the same arguments as `pystac.SpatialExtent`. +Internally, bounding boxes lists are processed at initialisation, so all +partially overlapping bounding boxes are merged and updated as a single one. + +# AutoTemporalExtent + +The `AutoTemporalExtent` is an extension of the `pystac.TemporalExtent`. +It computes the date min and date max of a set of dates or dates ranges. + """ import pystac from dataclasses import dataclass from datetime import datetime from typing import Union - -Coords = list[Union[int, float]] +from stacflow.common_types import Bbox @dataclass @@ -14,7 +40,7 @@ class SmartBbox: """ Small class to work with a single 2D bounding box. """ - coords: Coords = None # [xmin, ymin, xmax, ymax] + coords: Bbox = None # [xmin, ymin, xmax, ymax] def touches(self, other: "SmartBbox") -> bool: """ @@ -53,7 +79,7 @@ class SmartBbox: ] -def clusterize_bboxes(bboxes: list[Coords]) -> list[Coords]: +def clusterize_bboxes(bboxes: list[Bbox]) -> list[Bbox]: """ Computes a list of bounding boxes regrouping all overlapping ones. @@ -64,74 +90,28 @@ def clusterize_bboxes(bboxes: list[Coords]) -> list[Coords]: list of 2D bounding boxes (list of int of float) """ - # Regroup bboxes into clusters of bboxes - smart_bboxes = [SmartBbox(bbox) for bbox in bboxes] - clusters = bboxes_to_bboxes_clusters(smart_bboxes) - - # Compute clusters extents - clusters_unions = [SmartBbox() for _ in clusters] - for i, cluster in enumerate(clusters): - for smart_bbox in cluster: - clusters_unions[i].update(smart_bbox) - - return [smart_bbox.coords.copy() for smart_bbox in clusters_unions] - - -def bboxes_to_bboxes_clusters(smart_bboxes: list[SmartBbox]) -> list[ - list[SmartBbox]]: - """ - Transform a list of bounding boxes into a nested list of clustered ones. - - Args: - smart_bboxes: a list of `SmartBbox` instances - - Returns: - a list of `SmartBbox` instances list - - """ - clusters_labels = compute_smart_bboxes_clusters(smart_bboxes) - clusters_bboxes = [[] for _ in range(max(clusters_labels) + 1)] - for smart_bbox, labels in zip(smart_bboxes, clusters_labels): - clusters_bboxes[labels].append(smart_bbox) - return clusters_bboxes - - -def compute_smart_bboxes_clusters(smart_bboxes: list[SmartBbox]) -> list[int]: - """ - Compute the extent of a cluster of `SmartBbox` instances. - - Args: - smart_bboxes: a list of `SmartBbox` instances - - Returns: - a vector of same size as `smart_bboxes` with the group numbers (int) - - """ - labels = len(smart_bboxes) * [None] - group = 0 - - def dfs(index: int): - """ - Deep first search with o(n) complexity. - - Args: - index: vertex index. + bboxes = [SmartBbox(bbox) for bbox in bboxes] + clusters = [bboxes.pop()] + + while bboxes: + bbox = bboxes.pop() + inter_clusters = [ + i for i, cluster in enumerate(clusters) if bbox.touches(cluster) + ] + if inter_clusters: + # We merge all intersecting clusters into one + clusters[inter_clusters[0]].update(bbox) + for i in inter_clusters[1:]: + clusters[inter_clusters[0]].update(clusters[i]) + clusters = [ + cluster + for i, cluster in enumerate(clusters) + if i not in inter_clusters[1:] + ] + else: + clusters.append(bbox) - """ - labels[index] = group - cur_item = smart_bboxes[index] - for i, (item, label) in enumerate(zip(smart_bboxes, labels)): - if i != index and cur_item.touches(item) and labels[i] is None: - dfs(i) - - while any(label is None for label in labels): - next_unmarked = next( - i for i, label in enumerate(labels) - if label is None - ) - dfs(next_unmarked) - group += 1 - return labels + return [cluster.coords for cluster in clusters] class AutoSpatialExtent(pystac.SpatialExtent): @@ -151,7 +131,7 @@ class AutoSpatialExtent(pystac.SpatialExtent): super().__init__(*args, **kwargs) self.clusterize_bboxes() - def update(self, other: pystac.SpatialExtent | Coords): + def update(self, other: pystac.SpatialExtent | Bbox): """ Updates itself with a new spatial extent or bounding box. Modifies inplace `self.bboxes`. @@ -208,9 +188,10 @@ class AutoTemporalExtent(pystac.TemporalExtent): all_dates = [] for interval in self.intervals: if isinstance(interval, (list, tuple)): - all_dates += [i for i in interval] + all_dates += [i for i in interval if i is not None] elif isinstance(interval, datetime): all_dates.append(interval) else: TypeError(f"Unsupported date/range of: {interval}") - self.intervals = [[min(all_dates), max(all_dates)]] + self.intervals = \ + [[min(all_dates), max(all_dates)]] if all_dates else [None, None] diff --git a/tests/extents.py b/tests/extents.py new file mode 100644 index 0000000000000000000000000000000000000000..5b52296a3d2f91ba53667516b77ec8278d2990ca --- /dev/null +++ b/tests/extents.py @@ -0,0 +1,20 @@ +import pytest +from simplestac.extents import AutoSpatialExtent + +def test_spatial_extent(): + """ + Test the `AutoSpatialExtent` class. + + Two clusters of bboxes (i.e. lists of bboxes) composed respectively with 2 and 1 bbox are + created (by definition, the clusters are disjoint: their bboxes don't overlap) + We instanciate an `AutoSpatialExtent` and we check that the two expected clusters are found. + """ + # first cluster (e.g. "france mainland") + bbox1 = [4, 42, 6, 44] + bbox2 = [3, 41, 5, 43] + + # second cluster (e.g. "corse") + bbox3 = [7, 42, 8, 50] + + ase = AutoSpatialExtent(bboxes=[bbox1, bbox2, bbox3]) + assert ase.bboxes == [[7, 42, 8, 50], [3, 41, 6, 44]]