eogrow.core.pipeline

Implementation of the base Pipeline class.

exception eogrow.core.pipeline.PipelineExecutionError[source]

Bases: RuntimeError

Raised when the pipeline failed some executions.

class eogrow.core.pipeline.Pipeline(config, raw_config=None)[source]

Bases: EOGrowObject

A base class for execution of processing procedures which may or may not include running EOWorkflows, running EOExecutions, creating maps, etc.

The functionalities of this class are:
  • collecting input arguments (either from command line or as an initialization parameter) and parsing them

  • preparing a list of patches

  • preparing execution arguments

  • running the pipeline, monitoring, and reporting

Parameters:
  • config (Schema) – A dictionary with configuration parameters

  • raw_config (RawConfig | None) – The configuration parameters pre-validation, for logging purposes only

pydantic model Schema[source]

Bases: PipelineSchema

Configuration schema, describing input parameters and their types.

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Fields:

config: Schema
classmethod from_raw_config(config, *args, **kwargs)[source]

Creates an object from a dictionary by constructing a validated config and use it to create the object.

Parameters:
  • config (RawConfig) –

  • args (Any) –

  • kwargs (Any) –

Return type:

Self

get_pipeline_execution_name(pipeline_timestamp)[source]

Returns the full name of the pipeline execution

Parameters:

pipeline_timestamp (str) –

Return type:

str

get_patch_list()[source]

Method that prepares the list of EOPatches for which to run the pipeline execution.

Return type:

List[Tuple[str, BBox]]

filter_patch_list(patch_list)[source]

Specifies which EOPatches should be skipped when skip_existing is enabled.

Parameters:

patch_list (List[Tuple[str, BBox]]) –

Return type:

List[Tuple[str, BBox]]

get_execution_arguments(workflow, patch_list)[source]

Prepares execution arguments for each eopatch from a list of patches.

The output should be a dictionary of form {execution_name: {node: node_kwargs}}. Execution names are usually names of EOPatches, but can be anything.

Parameters:
  • workflow (EOWorkflow) – A workflow for which arguments will be prepared

  • patch_list (List[Tuple[str, BBox]]) –

Return type:

Dict[str, Dict[EONode, Dict[str, object]]]

run_execution(workflow, execution_kwargs, **executor_run_params)[source]

A method which runs EOExecutor on given workflow with given execution parameters

Parameters:
  • workflow (EOWorkflow) – A workflow to be executed

  • execution_kwargs (Dict[str, Dict[EONode, Dict[str, object]]]) – A dictionary mapping execution names to dictionaries holding execution arguments

  • eopatch_list – A custom list of EOPatch names on which execution will run. If not specified, the default self.patch_list will be used

  • executor_run_params (Any) –

Returns:

Lists of successfully/unsuccessfully executed EOPatch names and the result of the EOWorkflow execution

Return type:

tuple[list[str], list[str], list[eolearn.core.eoworkflow.WorkflowResults]]

run()[source]

The main method for pipeline execution. It sets up logging and runs the pipeline procedure.

Return type:

None

run_procedure()[source]

Execution procedure of pipeline. Can be overridden if needed.

By default, builds the workflow by using a build_workflow method, which must be additionally implemented.

Returns:

A pair of lists representing successful and unsuccessful executions.

Return type:

tuple[list[str], list[str]]