Source code for eogrow.core.area.utm

"""Area manager implementation for automated UTM CRS grids."""

from __future__ import annotations

import logging
from collections import defaultdict

import fs
import geopandas as gpd
from geopandas import GeoDataFrame
from pydantic import Field

from sentinelhub import CRS, Geometry, UtmZoneSplitter

from ..schemas import BaseSchema
from .base import BaseAreaManager, get_geometry_from_file

LOGGER = logging.getLogger(__name__)


[docs]class PatchSchema(BaseSchema): size_x: int = Field(description="A width of each EOPatch in meters") size_y: int = Field(description="A height of each EOPatch in meters") buffer_x: float = Field(0, description="Number of meters by which to increase the tile size to left and right.") buffer_y: float = Field(0, description="Number of meters by which to increase the tile size to up and down.")
[docs]class UtmZoneAreaManager(BaseAreaManager): """Area manager that splits the area per UTM zone"""
[docs] class Schema(BaseAreaManager.Schema): geometry_filename: str = Field( description="Name of the file that defines the AoI geometry, located in the input data folder." ) patch: PatchSchema offset_x: float = Field(0, description="An offset of tiling grid in horizontal dimension") offset_y: float = Field(0, description="An offset of tiling grid in vertical dimension")
config: Schema
[docs] def get_area_geometry(self, *, crs: CRS = CRS.WGS84) -> Geometry: file_path = fs.path.join(self.storage.get_input_data_folder(), self.config.geometry_filename) return get_geometry_from_file( filesystem=self.storage.filesystem, file_path=file_path, geopandas_engine=self.storage.config.geopandas_backend, ).transform(crs)
def _create_grid(self) -> dict[CRS, GeoDataFrame]: """Uses UtmZoneSplitter to create a grid""" area_geometry = self.get_area_geometry() LOGGER.info("Splitting area geometry into UTM zone grid") return create_utm_zone_grid( area_geometry, self.NAME_COLUMN, (self.config.patch.size_x, self.config.patch.size_y), (self.config.offset_x, self.config.offset_y), (self.config.patch.buffer_x, self.config.patch.buffer_y), )
[docs] def get_grid_cache_filename(self) -> str: input_filename = fs.path.basename(self.config.geometry_filename) input_filename = input_filename.rsplit(".", 1)[0] raw_params = [ input_filename, self.config.patch.size_x, self.config.patch.size_y, self.config.patch.buffer_x, self.config.patch.buffer_y, self.config.offset_x, self.config.offset_y, ] params = [str(param) for param in raw_params] return f"{self.__class__.__name__}_{'_'.join(params)}.gpkg"
[docs]def create_utm_zone_grid( geometry: Geometry, name_column: str, bbox_size: tuple[int, int], bbox_offset: tuple[float, float], bbox_buffer: tuple[float, float], ) -> dict[CRS, GeoDataFrame]: """Creates a grid of bounding boxes covering the given area geometry.""" splitter = UtmZoneSplitter([geometry.geometry], crs=geometry.crs, bbox_size=bbox_size, offset=bbox_offset) bbox_list, info_list = splitter.get_bbox_list(), splitter.get_info_list() if bbox_buffer != (0, 0): bbox_list = [bbox.buffer(bbox_buffer, relative=False) for bbox in bbox_list] tiles_dict: dict[CRS, list[dict]] = defaultdict(list) zfill_length = len(str(len(bbox_list) - 1)) for i, (bbox, info) in enumerate(zip(bbox_list, info_list)): i_x, i_y = info["index_x"], info["index_y"] name = f"eopatch-id-{i:0{zfill_length}}-col-{i_x}-row-{i_y}" tiles_dict[bbox.crs].append({"id": i, name_column: name, "geometry": bbox.geometry}) return { crs: gpd.GeoDataFrame(tiles, geometry="geometry", crs=crs.pyproj_crs()) for crs, tiles in tiles_dict.items() }