Source code for eogrow.tasks.batch_to_eopatch

"""Tasks used to transform Sentinel Hub Batch results into EOPatches."""

from __future__ import annotations

import concurrent.futures
import json

import fs
import numpy as np
from fs.base import FS

from eolearn.core import EOPatch, EOTask
from eolearn.core.types import Feature
from eolearn.core.utils.fs import pickle_fs, unpickle_fs
from sentinelhub import parse_time

from ..utils.meta import import_object


[docs]class LoadUserDataTask(EOTask): """Task that loads and adds timestamps and tile names to an EOPatch""" def __init__( self, path: str, filesystem: FS, userdata_feature_name: str | None = None, userdata_timestamp_reader: str | None = None, ): """ :param path: A path to folder containing the tiles, relative to the filesystem object. :param filesystem: A filesystem object. :param userdata_feature_name: A name of a META_INFO feature in which userdata.json content could be stored :param userdata_timestamp_reader: A reference to a Python function or a Python code that collects timestamps from loaded userdata.json """ self.path = path self.pickled_filesystem = pickle_fs(filesystem) self.userdata_feature_name = userdata_feature_name self.userdata_timestamp_reader = userdata_timestamp_reader def _load_userdata_file(self, folder: str, filename: str = "userdata.json") -> dict: """Loads a content of a JSON file""" filesystem = unpickle_fs(self.pickled_filesystem) full_path = fs.path.join(self.path, folder, filename) userdata_text = filesystem.readtext(full_path, encoding="utf-8") return json.loads(userdata_text) @staticmethod def _parse_timestamps(userdata: dict, userdata_timestamp_reader: str) -> list: """Parses timestamps from userdata dictionary""" try: reader = import_object(userdata_timestamp_reader) time_strings = reader(userdata) except (ImportError, ValueError): time_strings = eval(userdata_timestamp_reader) # pylint: disable=eval-used return [parse_time(time_string, force_datetime=True, ignoretz=True) for time_string in time_strings]
[docs] def execute(self, eopatch: EOPatch | None = None, *, folder: str = "") -> EOPatch: """Adds metadata to the given EOPatch :param eopatch: Name of the eopatch to process :param folder: Folder in which userdata.json is stored """ eopatch = eopatch or EOPatch() userdata = self._load_userdata_file(folder) if self.userdata_feature_name: eopatch.meta_info[self.userdata_feature_name] = userdata if self.userdata_timestamp_reader: eopatch.timestamps = self._parse_timestamps(userdata, self.userdata_timestamp_reader) return eopatch
[docs]class FixImportedTimeDependentFeatureTask(EOTask): """Fixes a time-dependent feature that has been imported as a timeless feature from batch results. It performs the following: - If the timestamps are empty (but not `None`) and all the data is `0` then it creates an empty eopatch. - rotates bands axis to time axis, - reverses order of timestamps and feature - potentially removes redundant timeframes according to timestamps. This is necessary because in case there were no available acquisitions batch job still had to save a single time frame with dummy values. """ def __init__(self, input_feature: Feature, output_feature: Feature): self.input_feature = input_feature self.output_feature = output_feature
[docs] def execute(self, eopatch: EOPatch) -> EOPatch: """Fixes a feature in the given EOPatch""" data = eopatch[self.input_feature] del eopatch[self.input_feature] data = data[np.newaxis, ...] data = np.swapaxes(data, 0, -1) if eopatch.timestamps == [] and list(np.unique(data)) == [0]: # there is no actual data shape = data.shape data = np.empty((0, *shape[1:]), dtype=data.dtype) if eopatch.timestamps: timeframe_num = len(eopatch.timestamps) if data.shape[0] != timeframe_num: # Handling a case where data would contain some empty timeframes data = data[:timeframe_num, ...] order_mask = np.argsort(eopatch.timestamps) # type: ignore[arg-type] is_strictly_increasing = (np.diff(order_mask) > 0).all() if not is_strictly_increasing: eopatch.timestamps = sorted(eopatch.timestamps) data = data[order_mask] eopatch[self.output_feature] = data return eopatch
[docs]class DeleteFilesTask(EOTask): """Delete files""" def __init__(self, path: str, filesystem: FS, filenames: list[str]): """ :param path: A path to folder containing the files to be deleted, relative to filesystem object. :param filesystem: A filesystem object :param filenames: A list of filenames to delete """ self.path = path self.pickled_filesystem = pickle_fs(filesystem) self.filenames = filenames
[docs] def execute(self, *_: EOPatch, folder: str) -> None: """Execute method to delete files relative to the specified tile :param folder: A folder containing files """ filesystem = unpickle_fs(self.pickled_filesystem) file_paths = [fs.path.join(self.path, folder, filename) for filename in self.filenames] with concurrent.futures.ThreadPoolExecutor() as executor: # The following is intentionally wrapped in a list in order to get back potential exceptions list(executor.map(filesystem.remove, file_paths))