"""Defines task needed in prediction pipelines."""
from __future__ import annotations
import abc
from typing import Any, Callable
import fs
import joblib
import numpy as np
from fs.base import FS
from eolearn.core import EOPatch, EOTask
from eolearn.core.types import Feature
from eolearn.core.utils.fs import pickle_fs, unpickle_fs
[docs]class BasePredictionTask(EOTask, metaclass=abc.ABCMeta):
"""Base predictions task streamlining data preprocessing before prediction"""
def __init__(
self,
*,
model_path: str,
filesystem: FS,
input_features: list[Feature],
mask_feature: Feature,
output_feature: Feature,
output_dtype: np.dtype | None,
):
"""
:param model_path: A file path to the model. The path is relative to the filesystem object.
:param filesystem: A filesystem object.
:param input_features: List of features containing input for the model, which are concatenated in given order
:param mask_feature: Mask specifying which points are to be predicted
:param output_feature: Feature into which predictions are saved
:param mp_lock: If predictions should be executed with a multiprocessing lock
"""
self.model_path = model_path
self._model = None
self.pickled_filesystem = pickle_fs(filesystem)
self.input_features = input_features
self.mask_feature = mask_feature
self.output_feature = output_feature
self.output_dtype = output_dtype
[docs] def process_data(self, eopatch: EOPatch, mask: np.ndarray) -> np.ndarray:
"""Masks and reshapes data into a form suitable for the model"""
all_features = []
for ftype, fname in self.input_features:
array = eopatch[ftype, fname]
if ftype.is_timeless():
all_features.append(array[mask, :])
else:
array = array[:, mask, :]
time, pixels, depth = array.shape
array = np.moveaxis(array, 0, 1)
all_features.append(array.reshape(pixels, time * depth))
return np.concatenate(all_features, axis=-1)
@property
def model(self) -> Any:
"""Implements lazy loading that gets around filesystem issues"""
if self._model is None:
filesystem = unpickle_fs(self.pickled_filesystem)
with filesystem.openbin(self.model_path, "r") as file_handle:
self._model = joblib.load(file_handle)
return self._model
[docs] def apply_predictor(
self, predictor: Callable, processed_features: np.ndarray, return_on_empty: np.ndarray | None = None
) -> np.ndarray:
"""Helper function that applies the predictor according to the mp_lock settings"""
if processed_features.shape[0] == 0 and return_on_empty is not None:
return return_on_empty
predictions: np.ndarray = predictor(processed_features)
return predictions.astype(self.output_dtype) if self.output_dtype else predictions
[docs] @abc.abstractmethod
def add_predictions(self, eopatch: EOPatch, processed_features: np.ndarray, mask: np.ndarray) -> EOPatch:
"""Runs the model prediction on given features and adds them to the eopatch. Must reverse mask beforehand."""
[docs] def execute(self, eopatch: EOPatch) -> EOPatch:
"""Run model on input features and save predictions to eopatch"""
some_feature = self.input_features[0]
mask_size = eopatch.get_spatial_dimension(*some_feature)
mask = np.squeeze(eopatch[self.mask_feature], axis=-1) if self.mask_feature else np.ones(mask_size)
mask = mask.astype(bool)
preprocessed_features = self.process_data(eopatch, mask)
return self.add_predictions(eopatch, preprocessed_features, mask)
[docs]class ClassificationPredictionTask(BasePredictionTask):
"""Uses a classification model to produce predictions for given input features"""
def __init__(
self,
*,
label_encoder_filename: str | None,
output_probability_feature: Feature | None = None,
**kwargs: Any,
):
"""
:param label_encoder_filename: Name of file containing the label encoder with which to decode predictions. The
file should be in the same folder as the model.
:param output_probability_feature: If specified saves pseudo-probabilities into given feature.
:param kwargs: Parameters of `BasePredictionTask`
"""
self.label_encoder_filename = label_encoder_filename
self._label_encoder = None
self.output_probability_feature = output_probability_feature
super().__init__(**kwargs)
@property
def label_encoder(self) -> Any:
"""Implements lazy loading that gets around filesystem issues"""
if self._label_encoder is None and self.label_encoder_filename is not None:
filesystem = unpickle_fs(self.pickled_filesystem)
model_folder = fs.path.dirname(self.model_path)
label_encoder_path = fs.path.join(model_folder, self.label_encoder_filename)
with filesystem.openbin(label_encoder_path, "r") as file_handle:
self._label_encoder = joblib.load(file_handle)
return self._label_encoder
[docs] def add_predictions(self, eopatch: EOPatch, processed_features: np.ndarray, mask: np.ndarray) -> EOPatch:
"""Runs the model prediction on given features and adds them to the eopatch
If specified also adds probability scores and uses a label encoder.
"""
predictions = self.apply_predictor(self.model.predict, processed_features, np.zeros((0,), dtype=np.uint8))
if self.label_encoder is not None:
predictions = self.label_encoder.inverse_transform(predictions)
predictions = predictions[..., np.newaxis]
eopatch[self.output_feature] = self.transform_to_feature_form(predictions, mask)
if self.output_probability_feature is not None:
probabilities = self.apply_predictor(self.model.predict_proba, processed_features)
eopatch[self.output_probability_feature] = self.transform_to_feature_form(probabilities, mask)
return eopatch
[docs]class RegressionPredictionTask(BasePredictionTask):
"""Computes values and scores given an input model and eopatch feature name"""
def __init__(
self,
*,
clip_predictions: tuple[float, float] | None,
**kwargs: Any,
):
"""
:param clip_predictions: If given the task also clips predictions to the specified interval.
:param kwargs: Parameters of `BasePredictionTask`
"""
self.clip_predictions = clip_predictions
super().__init__(**kwargs)
[docs] def add_predictions(self, eopatch: EOPatch, processed_features: np.ndarray, mask: np.ndarray) -> EOPatch:
"""Runs the model prediction on given features and adds them to the eopatch. Must reverse mask beforehand."""
predictions = self.apply_predictor(self.model.predict, processed_features, np.zeros((0,), dtype=np.float32))
predictions = predictions[..., np.newaxis]
if self.clip_predictions is not None:
predictions = predictions.clip(*self.clip_predictions)
eopatch[self.output_feature] = self.transform_to_feature_form(predictions, mask)
return eopatch