eogrow.core.pipeline
Implementation of the base Pipeline class.
- exception eogrow.core.pipeline.PipelineExecutionError[source]
Bases:
RuntimeError
Raised when the pipeline failed some executions.
- class eogrow.core.pipeline.Pipeline(config, raw_config=None)[source]
Bases:
EOGrowObject
A base class for execution of processing procedures which may or may not include running EOWorkflows, running EOExecutions, creating maps, etc.
- The functionalities of this class are:
collecting input arguments (either from command line or as an initialization parameter) and parsing them
preparing a list of patches
preparing execution arguments
running the pipeline, monitoring, and reporting
- Parameters:
config (Schema) – A dictionary with configuration parameters
raw_config (RawConfig | None) – The configuration parameters pre-validation, for logging purposes only
- pydantic model Schema[source]
Bases:
PipelineSchema
Configuration schema, describing input parameters and their types.
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- Fields:
- classmethod from_raw_config(config, *args, **kwargs)[source]
Creates an object from a dictionary by constructing a validated config and use it to create the object.
- Parameters:
config (RawConfig) –
args (Any) –
kwargs (Any) –
- Return type:
Self
- get_pipeline_execution_name(pipeline_timestamp)[source]
Returns the full name of the pipeline execution
- Parameters:
pipeline_timestamp (str) –
- Return type:
str
- get_patch_list()[source]
Method that prepares the list of EOPatches for which to run the pipeline execution.
- Return type:
List[Tuple[str, BBox]]
- filter_patch_list(patch_list)[source]
Specifies which EOPatches should be skipped when skip_existing is enabled.
- Parameters:
patch_list (List[Tuple[str, BBox]]) –
- Return type:
List[Tuple[str, BBox]]
- get_execution_arguments(workflow, patch_list)[source]
Prepares execution arguments for each eopatch from a list of patches.
The output should be a dictionary of form {execution_name: {node: node_kwargs}}. Execution names are usually names of EOPatches, but can be anything.
- Parameters:
workflow (EOWorkflow) – A workflow for which arguments will be prepared
patch_list (List[Tuple[str, BBox]]) –
- Return type:
Dict[str, Dict[EONode, Dict[str, object]]]
- run_execution(workflow, execution_kwargs, **executor_run_params)[source]
A method which runs EOExecutor on given workflow with given execution parameters
- Parameters:
workflow (EOWorkflow) – A workflow to be executed
execution_kwargs (Dict[str, Dict[EONode, Dict[str, object]]]) – A dictionary mapping execution names to dictionaries holding execution arguments
eopatch_list – A custom list of EOPatch names on which execution will run. If not specified, the default self.patch_list will be used
executor_run_params (Any) –
- Returns:
Lists of successfully/unsuccessfully executed EOPatch names and the result of the EOWorkflow execution
- Return type:
tuple[list[str], list[str], list[eolearn.core.eoworkflow.WorkflowResults]]
- run()[source]
The main method for pipeline execution. It sets up logging and runs the pipeline procedure.
- Return type:
None
- run_procedure()[source]
Execution procedure of pipeline. Can be overridden if needed.
By default, builds the workflow by using a build_workflow method, which must be additionally implemented.
- Returns:
A pair of lists representing successful and unsuccessful executions.
- Return type:
tuple[list[str], list[str]]