diff --git a/.gitignore b/.gitignore index 0aec123f0..822896a6c 100644 --- a/.gitignore +++ b/.gitignore @@ -133,7 +133,7 @@ dmypy.json build/ cplus_scenario_output.* [Skip output] -reports +# reports ext-libs/ .vscode diff --git a/admin.py b/admin.py index 17400b876..f3fbfde87 100644 --- a/admin.py +++ b/admin.py @@ -213,6 +213,7 @@ def build( if clean and output_directory.exists(): shutil.rmtree(str(output_directory), ignore_errors=True) output_directory.mkdir(parents=True, exist_ok=True) + plugin_setup(clean) copy_source_files(output_directory, tests=tests) icon_path = copy_icon(output_directory) if icon_path is None: @@ -552,5 +553,87 @@ def _get_latest_releases( return latest_stable, latest_experimental +############################################################################### +# Setup dependencies and install package +############################################################################### + + +def not_comments(lines, s, e): + """Return non comment line (does not start with #). + + :param lines: list of line + :type lines: list of str + + :param s: start index + :type s: int + + :param e: end index + :type e: int + + :return: list of line without commented lines + :rtype: list of str + """ + return [line for line in lines[s:e] if line[0] != "#"] + + +def read_requirements(): + """Return a list of runtime and list of test requirements.""" + with open("requirements.txt") as f: + lines = f.readlines() + lines = [line for line in [line.strip() for line in lines] if line] + + return not_comments(lines, 0, len(lines)), [] + + +def _safe_remove_folder(rootdir): + """ + Supports removing a folder that may have symlinks in it. + + Needed on windows to avoid removing the original files linked to within + each folder. + + :param rootdir: Root directory to clean. + :type rootdir: str + """ + rootdir = Path(rootdir) + if rootdir.is_symlink(): + rootdir.rmdir() + else: + folders = [path for path in Path(rootdir).iterdir() if path.is_dir()] + for folder in folders: + if folder.is_symlink(): + folder.rmdir() + else: + shutil.rmtree(folder) + files = [path for path in Path(rootdir).iterdir()] + for file in files: + file.unlink() + shutil.rmtree(rootdir) + + +@app.command() +def plugin_setup(clean=True, pip="pip"): + """install plugin dependencies. + + :param clean: Clean out dependencies first. + :type clean: bool + + :param pip: Path to pip, usually 'pip' or 'pip3'. + :type pip: str + """ + ext_libs = os.path.join(LOCAL_ROOT_DIR, "src", "cplus_plugin", "ext-libs") + + if clean and os.path.exists(ext_libs): + _safe_remove_folder(ext_libs) + + os.makedirs(ext_libs, exist_ok=True) + runtime, test = read_requirements() + + os.environ["PYTHONPATH"] = ext_libs + + for req in runtime + test: + subprocess.check_call([pip, "install", "--upgrade", "-t", ext_libs, req]) + + if __name__ == "__main__": app() diff --git a/requirements-dev.txt b/requirements-dev.txt index 7ca03c3d2..118f667eb 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -26,15 +26,17 @@ jinja2 == 3.0.3 markdown == 3.3.6 markupsafe == 2.0.1 mergedeep == 1.3.4 -mkdocs-git-revision-date-localized-plugin +mkdocs-autorefs == 1.2.0 +mkdocs-git-revision-date-localized-plugin == 0.11.1 mkdocs-material-extensions == 1.0.3 -mkdocs-material -mkdocstrings-python -mkdocs-video -mkdocs -packaging == 21.3 +mkdocs-material == 8.1.7 +mkdocstrings-python == 1.6.0 +mkdocs-video == 1.1.0 +mkdocs == 1.2.3 +# packaging == 21.3 +packaging == 24.0 pygments == 2.11.2 -pymdown-extensions +pymdown-extensions == 9.1 pyparsing == 3.0.6 pyqt5-qt5 == 5.15.2 pyqt5-sip == 12.9.0 @@ -52,3 +54,5 @@ typer == 0.4.0 watchdog == 2.1.6 werkzeug == 2.0.2 zipp == 3.7.0 +# use setuptools 70.3.0, to fix issue strip_trailing_zero +setuptools == 70.3.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..1b55bfe6c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +# cplus-core +git+https://github.com/kartoza/cplus-core.git@v0.0.15 \ No newline at end of file diff --git a/src/cplus_plugin/__init__.py b/src/cplus_plugin/__init__.py index 225cbc87a..beaf67aa4 100644 --- a/src/cplus_plugin/__init__.py +++ b/src/cplus_plugin/__init__.py @@ -30,6 +30,19 @@ sys.path.append(LIB_DIR) +def _add_at_front_of_path(d): + """add a folder at front of path""" + sys.path, remainder = sys.path[:1], sys.path[1:] + site.addsitedir(d) + sys.path.extend(remainder) + + +# init ext-libs directory +plugin_dir = os.path.dirname(os.path.realpath(__file__)) +# Put ext-libs folder near the front of the path (important on Linux) +_add_at_front_of_path(str(Path(plugin_dir) / "ext-libs")) + + # noinspection PyPep8Naming def classFactory(iface): # pylint: disable=invalid-name """Load QgisCplus class diff --git a/src/cplus_plugin/api/base.py b/src/cplus_plugin/api/base.py index 8a734fda4..035913070 100644 --- a/src/cplus_plugin/api/base.py +++ b/src/cplus_plugin/api/base.py @@ -14,7 +14,7 @@ from qgis.core import QgsTask from .request import CplusApiRequest -from ..models.base import Scenario, ScenarioResult, NcsPathway, Activity +from cplus_core.models.base import Scenario, ScenarioResult, NcsPathway, Activity class BaseScenarioTask(QgsTask): diff --git a/src/cplus_plugin/api/request.py b/src/cplus_plugin/api/request.py index ca0a32502..0080adc67 100644 --- a/src/cplus_plugin/api/request.py +++ b/src/cplus_plugin/api/request.py @@ -16,7 +16,8 @@ QgsProcessingFeedback, ) -from ..models.base import Scenario, SpatialExtent, Activity, LayerSource +from cplus_core.models.base import Scenario, SpatialExtent, Activity +from ..models.source import LayerSource from ..conf import settings_manager, Settings from ..definitions.defaults import BASE_API_URL from ..trends_earth import auth diff --git a/src/cplus_plugin/api/scenario_history_tasks.py b/src/cplus_plugin/api/scenario_history_tasks.py index ddd12dfd1..fd4fe352e 100644 --- a/src/cplus_plugin/api/scenario_history_tasks.py +++ b/src/cplus_plugin/api/scenario_history_tasks.py @@ -14,8 +14,8 @@ from .request import CplusApiRequest, CplusApiRequestError from .scenario_task_api_client import ScenarioAnalysisTaskApiClient from ..conf import settings_manager -from ..models.base import Scenario -from ..models.base import SpatialExtent +from cplus_core.models.base import Scenario +from cplus_core.analysis import TaskConfig from ..utils import log @@ -133,38 +133,36 @@ class FetchScenarioOutputTask(ScenarioAnalysisTaskApiClient): def __init__( self, - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, - scenario, - scenario_directory, + task_config: TaskConfig, + extent_box, ): - super(FetchScenarioOutputTask, self).__init__( - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, - scenario, - SpatialExtent(bbox=scenario.extent.bbox, crs=scenario.extent.crs), - ) + super(FetchScenarioOutputTask, self).__init__(task_config, extent_box) self.request = CplusApiRequest() self.status_pooling = None self.logs = [] self.total_file_output = 0 self.downloaded_output = 0 self.scenario_status = None - self.scenario_directory = scenario_directory - self.scenario_api_uuid = scenario.uuid - self.scenario = scenario + self.scenario_api_uuid = task_config.scenario.uuid + self.scenario = task_config.scenario self.scenario_directory = None self.processing_cancelled = False self.scenario_result = None self.output_list = None - self.created_datetime + self.created_datetime = None + + def _get_scenario_directory(self, created_datetime: datetime.datetime) -> str: + """Generate scenario directory for current task. + + :return: Path to scenario directory + :rtype: str + """ + base_dir = self.task_config.base_dir + return os.path.join( + f"{base_dir}", + "scenario_" f'{created_datetime.strftime("%Y_%m_%d_%H_%M_%S")}', + ) def run(self): """Execute the task logic. @@ -183,7 +181,9 @@ def run(self): self.created_datetime = datetime.datetime.strptime( self.new_scenario_detail["submitted_on"], "%Y-%m-%dT%H:%M:%SZ" ) - self.scenario_directory = self.get_scenario_directory() + self.scenario_directory = self._get_scenario_directory( + self.created_datetime + ) if os.path.exists(self.scenario_directory): for file in os.listdir(self.scenario_directory): if file != "processing.log": diff --git a/src/cplus_plugin/api/scenario_task_api_client.py b/src/cplus_plugin/api/scenario_task_api_client.py index b9ebbf3f1..36a4d6f83 100644 --- a/src/cplus_plugin/api/scenario_task_api_client.py +++ b/src/cplus_plugin/api/scenario_task_api_client.py @@ -15,8 +15,8 @@ ) from ..api.base import BaseFetchScenarioOutput from ..conf import settings_manager, Settings -from ..models.base import Activity, NcsPathway, Scenario -from ..tasks import ScenarioAnalysisTask +from cplus_core.models.base import Activity, NcsPathway +from cplus_core.analysis import ScenarioAnalysisTask, TaskConfig from ..utils import FileUtils, CustomJsonEncoder, todict from ..definitions.constants import NO_DATA_VALUE @@ -43,45 +43,15 @@ def clean_filename(filename): class ScenarioAnalysisTaskApiClient(ScenarioAnalysisTask, BaseFetchScenarioOutput): """Prepares and runs the scenario analysis in Cplus API - :param analysis_scenario_name: Scenario name - :type analysis_scenario_name: str - - :param analysis_scenario_description: Scenario description - :type analysis_scenario_description: str - - :param analysis_activities: List of activity to be processed - :type analysis_activities: typing.List[Activity] - - :param analysis_priority_layers_groups: List of priority layer groups - :type analysis_priority_layers_groups: typing.List[dict] - - :param analysis_extent: Extents of the Scenario - :type analysis_extent: typing.List[float] - :param scenario: Scenario object :type scenario: Scenario + + :param extent_box: Project extent + :type extent_box: list of float """ - def __init__( - self, - analysis_scenario_name: str, - analysis_scenario_description: str, - analysis_activities: typing.List[Activity], - analysis_priority_layers_groups: typing.List[dict], - analysis_extent: typing.List[float], - scenario: Scenario, - extent_box, - clip_to_studyarea: bool = False, - ): - super(ScenarioAnalysisTaskApiClient, self).__init__( - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, - scenario, - clip_to_studyarea, - ) + def __init__(self, task_config: TaskConfig, extent_box): + super().__init__(task_config) self.total_file_upload_size = 0 self.total_file_upload_chunks = 0 self.uploaded_chunks = 0 @@ -145,7 +115,7 @@ def run(self) -> bool: :rtype: bool """ self.request = CplusApiRequest() - self.scenario_directory = self.get_scenario_directory() + self.scenario_directory = self.task_config.base_dir FileUtils.create_new_dir(self.scenario_directory) try: @@ -538,8 +508,10 @@ def build_scenario_detail_json(self) -> None: snap_rescale = self.get_settings_value( Settings.RESCALE_VALUES, default=False, setting_type=bool ) - resampling_method = self.get_settings_value( - Settings.RESAMPLING_METHOD, default=0 + resampling_method = int( + self.get_settings_value( + Settings.RESAMPLING_METHOD, default=0, setting_type=int + ) ) ncs_with_carbon = self.get_settings_value( Settings.NCS_WITH_CARBON, default=False, setting_type=bool diff --git a/src/cplus_plugin/conf.py b/src/cplus_plugin/conf.py index 722131c9f..ddb51ba84 100644 --- a/src/cplus_plugin/conf.py +++ b/src/cplus_plugin/conf.py @@ -31,7 +31,7 @@ UUID_ATTRIBUTE, ) from .definitions.defaults import PRIORITY_LAYERS -from .models.base import ( +from cplus_core.models.base import ( Activity, NcsPathway, Scenario, diff --git a/src/cplus_plugin/gui/activity_editor_dialog.py b/src/cplus_plugin/gui/activity_editor_dialog.py index ecfcb64c0..55a5dcb8f 100644 --- a/src/cplus_plugin/gui/activity_editor_dialog.py +++ b/src/cplus_plugin/gui/activity_editor_dialog.py @@ -29,7 +29,7 @@ ACTIVITY_SCENARIO_STYLE_ATTRIBUTE, ) from ..definitions.defaults import ICON_PATH, USER_DOCUMENTATION_SITE -from ..models.base import Activity +from cplus_core.models.base import Activity from ..utils import FileUtils, generate_random_color, open_documentation, tr WidgetUi, _ = loadUiType( diff --git a/src/cplus_plugin/gui/activity_widget.py b/src/cplus_plugin/gui/activity_widget.py index 8db37cc7e..91017adfa 100644 --- a/src/cplus_plugin/gui/activity_widget.py +++ b/src/cplus_plugin/gui/activity_widget.py @@ -19,7 +19,7 @@ ActivityComponentWidget, NcsComponentWidget, ) -from ..models.base import Activity, NcsPathway +from cplus_core.models.base import Activity, NcsPathway from ..utils import FileUtils diff --git a/src/cplus_plugin/gui/component_item_model.py b/src/cplus_plugin/gui/component_item_model.py index 89be92fbf..17e443643 100644 --- a/src/cplus_plugin/gui/component_item_model.py +++ b/src/cplus_plugin/gui/component_item_model.py @@ -12,7 +12,7 @@ from qgis.PyQt import QtCore, QtGui -from ..models.base import ( +from cplus_core.models.base import ( BaseModelComponent, BaseModelComponentType, Activity, diff --git a/src/cplus_plugin/gui/financials/npv_manager_dialog.py b/src/cplus_plugin/gui/financials/npv_manager_dialog.py index bd0010102..43a77e061 100644 --- a/src/cplus_plugin/gui/financials/npv_manager_dialog.py +++ b/src/cplus_plugin/gui/financials/npv_manager_dialog.py @@ -21,7 +21,7 @@ from ..component_item_model import NcsPathwayItemModel from ...conf import settings_manager from ...definitions.defaults import ICON_PATH, USER_DOCUMENTATION_SITE -from ...models.base import NcsPathway +from cplus_core.models.base import Activity, NcsPathway from ...models.financial import NcsPathwayNpv, NcsPathwayNpvCollection, NpvParameters from .npv_financial_model import NpvFinancialModel from ...lib.financials import compute_discount_value diff --git a/src/cplus_plugin/gui/items_selection_dialog.py b/src/cplus_plugin/gui/items_selection_dialog.py index 6d1c4b402..9117b44a9 100644 --- a/src/cplus_plugin/gui/items_selection_dialog.py +++ b/src/cplus_plugin/gui/items_selection_dialog.py @@ -10,7 +10,8 @@ from qgis.PyQt import QtCore, QtWidgets from qgis.PyQt.uic import loadUiType -from ..models.base import NcsPathway, PriorityLayer +from cplus_core.models.base import NcsPathway, PriorityLayer + from ..conf import settings_manager from ..utils import log, tr diff --git a/src/cplus_plugin/gui/map_repeat_item_widget.py b/src/cplus_plugin/gui/map_repeat_item_widget.py index 54aff0087..ad36f41b1 100644 --- a/src/cplus_plugin/gui/map_repeat_item_widget.py +++ b/src/cplus_plugin/gui/map_repeat_item_widget.py @@ -18,7 +18,7 @@ from ..conf import settings_manager from ..lib.reports.layout_items import CplusMapRepeatItem, CPLUS_MAP_REPEAT_ITEM_TYPE -from ..models.base import ModelComponentType +from cplus_core.models.base import ModelComponentType from ..utils import FileUtils, tr diff --git a/src/cplus_plugin/gui/metrics_builder_dialog.py b/src/cplus_plugin/gui/metrics_builder_dialog.py index 617efd5f0..c565afa47 100644 --- a/src/cplus_plugin/gui/metrics_builder_dialog.py +++ b/src/cplus_plugin/gui/metrics_builder_dialog.py @@ -36,7 +36,7 @@ MetricColumnListItem, MetricColumnListModel, ) -from ..models.base import Activity +from cplus_core.models.base import Activity from ..models.helpers import clone_activity, clone_metric_configuration_profile from ..models.report import ( ActivityColumnMetric, diff --git a/src/cplus_plugin/gui/metrics_builder_model.py b/src/cplus_plugin/gui/metrics_builder_model.py index aac2d54da..595607689 100644 --- a/src/cplus_plugin/gui/metrics_builder_model.py +++ b/src/cplus_plugin/gui/metrics_builder_model.py @@ -10,7 +10,7 @@ from ..definitions.constants import ACTIVITY_NAME -from ..models.base import Activity +from cplus_core.models.base import Activity from ..models.report import ActivityColumnMetric, MetricColumn, MetricType from ..utils import FileUtils, tr diff --git a/src/cplus_plugin/gui/model_component_widget.py b/src/cplus_plugin/gui/model_component_widget.py index 94f74a756..affe38361 100644 --- a/src/cplus_plugin/gui/model_component_widget.py +++ b/src/cplus_plugin/gui/model_component_widget.py @@ -30,7 +30,7 @@ from .model_description_editor import ModelDescriptionEditorDialog from .ncs_pathway_editor_dialog import NcsPathwayEditorDialog from .pixel_value_editor_dialog import PixelValueEditorDialog -from ..models.base import Activity, NcsPathway +from cplus_core.models.base import Activity, NcsPathway from .validation.inspector_dialog import ValidationInspectorDialog from .validation.progress_dialog import ValidationProgressDialog from ..utils import FileUtils, log diff --git a/src/cplus_plugin/gui/ncs_pathway_editor_dialog.py b/src/cplus_plugin/gui/ncs_pathway_editor_dialog.py index b37daa6cd..cab967418 100644 --- a/src/cplus_plugin/gui/ncs_pathway_editor_dialog.py +++ b/src/cplus_plugin/gui/ncs_pathway_editor_dialog.py @@ -19,13 +19,8 @@ ICON_PATH, USER_DOCUMENTATION_SITE, ) -from ..models.base import ( - DataSourceType, - LayerType, - NcsPathway, - NcsPathwayType, - LayerSource, -) +from cplus_core.models.base import LayerType, NcsPathway, NcsPathwayType +from ..models.source import DataSourceType, LayerSource from ..utils import FileUtils, open_documentation, tr, log WidgetUi, _ = loadUiType( diff --git a/src/cplus_plugin/gui/priority_group_dialog.py b/src/cplus_plugin/gui/priority_group_dialog.py index a5e28a88d..1538aa378 100644 --- a/src/cplus_plugin/gui/priority_group_dialog.py +++ b/src/cplus_plugin/gui/priority_group_dialog.py @@ -14,7 +14,7 @@ ) from qgis.PyQt.uic import loadUiType -from ..models.base import PriorityLayer +from cplus_core.models.base import PriorityLayer from ..conf import settings_manager from ..utils import FileUtils, open_documentation diff --git a/src/cplus_plugin/gui/priority_layer_dialog.py b/src/cplus_plugin/gui/priority_layer_dialog.py index 8663b78e8..cc5237cd1 100644 --- a/src/cplus_plugin/gui/priority_layer_dialog.py +++ b/src/cplus_plugin/gui/priority_layer_dialog.py @@ -17,8 +17,8 @@ from qgis.gui import QgsFileWidget from ..conf import settings_manager, Settings -from ..utils import FileUtils, open_documentation, log -from ..models.base import PriorityLayerType +from ..utils import FileUtils, open_documentation +from cplus_core.models.base import PriorityLayerType from ..definitions.defaults import ICON_PATH, PRIORITY_LAYERS, USER_DOCUMENTATION_SITE from ..definitions.constants import PRIORITY_LAYERS_SEGMENT, USER_DEFINED_ATTRIBUTE diff --git a/src/cplus_plugin/gui/qgis_cplus_main.py b/src/cplus_plugin/gui/qgis_cplus_main.py index fe3f3cffe..6747de6ff 100644 --- a/src/cplus_plugin/gui/qgis_cplus_main.py +++ b/src/cplus_plugin/gui/qgis_cplus_main.py @@ -11,6 +11,8 @@ import uuid from dateutil import tz from functools import partial +from pathlib import Path +import traceback from qgis.PyQt import ( QtCore, @@ -68,6 +70,7 @@ ACTIVITY_IDENTIFIER_PROPERTY, NCS_PATHWAYS_WEIGHTED_GROUP_LAYER_NAME, USER_DEFINED_ATTRIBUTE, + NO_DATA_VALUE, ) from .financials.npv_manager_dialog import NpvPwlManagerDialog @@ -77,11 +80,8 @@ from .scenario_dialog import ScenarioDialog -from ..models.base import ( - Activity, - PriorityLayerType, - AreaOfInterestSource, -) +from cplus_core.models.base import Activity, PriorityLayerType +from ..models.source import AreaOfInterestSource from ..models.financial import NcsPathwayNpv from ..conf import settings_manager, Settings @@ -109,16 +109,15 @@ SCENARIO_LOG_FILE_NAME, USER_DOCUMENTATION_SITE, ) -from ..lib.reports.manager import report_manager -from ..models.base import Scenario, ScenarioResult, ScenarioState, SpatialExtent -from ..tasks import ScenarioAnalysisTask -from ..utils import ( - open_documentation, - tr, - log, - FileUtils, - write_to_file, +from ..lib.reports.manager import report_manager, ReportManager +from cplus_core.models.base import ( + Scenario, + ScenarioResult, + ScenarioState, + SpatialExtent, ) +from cplus_core.analysis import ScenarioAnalysisTask, TaskConfig +from ..utils import open_documentation, tr, log, FileUtils, write_to_file WidgetUi, _ = loadUiType( @@ -1488,15 +1487,11 @@ def load_scenario(self, scenario_identifier=None): ) progress_dialog.run_dialog() - analysis_task = FetchScenarioOutputTask( - self.analysis_scenario_name, - self.analysis_scenario_description, - self.analysis_activities, - self.analysis_priority_layers_groups, - self.analysis_extent, - scenario, - None, - ) + task_config = self.create_task_config(scenario) + task_config.all_activities = self.analysis_activities + task_config.base_dir = settings_manager.get_value(Settings.BASE_DIR) + + analysis_task = FetchScenarioOutputTask(task_config, scenario.extent) analysis_task.scenario_api_uuid = scenario.server_uuid analysis_task.task_finished.connect(self.update_scenario_list) @@ -1663,6 +1658,8 @@ def run_cplus_main_task(self, progress_dialog, scenario, analysis_task): analysis_task.info_message_changed.connect(self.show_message) + analysis_task.log_received.connect(self.on_log_message) + self.current_analysis_task = analysis_task progress_dialog.analysis_task = analysis_task @@ -1705,6 +1702,107 @@ def prepare_message_bar(self): ) self.dock_widget_contents.layout().insertLayout(0, self.grid_layout) + def get_scenario_directory(self) -> str: + """Generate scenario directory for current task. + + :return: Path to scenario directory + :rtype: str + """ + base_dir = settings_manager.get_value(Settings.BASE_DIR) + return os.path.join( + f"{base_dir}", + "scenario_" f'{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}', + ) + + def create_task_config( + self, scenario: Scenario, clip_to_studyarea: bool + ) -> TaskConfig: + """Create task config from scenario and settings_manager. + + :param scenario: Scenario object + :type scenario: Scenario + :param clip_to_studyarea: True if clip to study area + :type clip_to_studyarea: bool + + :return: config for scenario analysis task + :rtype: TaskConfig + """ + return TaskConfig( + scenario, + settings_manager.get_priority_layers(), + scenario.priority_layer_groups, + scenario.activities, + settings_manager.get_all_activities(), + settings_manager.get_value( + Settings.SNAPPING_ENABLED, default=False, setting_type=bool + ), + settings_manager.get_value( + Settings.SNAP_LAYER, default="", setting_type=str + ), + settings_manager.get_value( + Settings.MASK_LAYERS_PATHS, default="", setting_type=str + ), + settings_manager.get_value( + Settings.RESCALE_VALUES, default=False, setting_type=bool + ), + settings_manager.get_value(Settings.RESAMPLING_METHOD, default=0), + settings_manager.get_value(Settings.PATHWAY_SUITABILITY_INDEX, default=0), + 0.0, + settings_manager.get_value( + Settings.SIEVE_ENABLED, default=False, setting_type=bool + ), + settings_manager.get_value(Settings.SIEVE_THRESHOLD, default=10.0), + settings_manager.get_value( + Settings.NCS_WITH_CARBON, default=True, setting_type=bool + ), + settings_manager.get_value( + Settings.LANDUSE_PROJECT, default=True, setting_type=bool + ), + settings_manager.get_value( + Settings.LANDUSE_NORMALIZED, default=True, setting_type=bool + ), + settings_manager.get_value( + Settings.LANDUSE_WEIGHTED, default=True, setting_type=bool + ), + settings_manager.get_value( + Settings.HIGHEST_POSITION, default=True, setting_type=bool + ), + self.get_scenario_directory(), + settings_manager.get_value( + Settings.NCS_NO_DATA_VALUE, default=NO_DATA_VALUE, setting_type=float + ), + settings_manager.get_value( + Settings.STUDYAREA_PATH, default="", setting_type=str + ), + clip_to_studyarea, + ) + + def on_log_message( + self, + message: str, + name: str = "qgis_cplus", + info: bool = True, + notify: bool = True, + ): + """Logs the message into QGIS logs using qgis_cplus as the default + log instance. + If notify_user is True, user will be notified about the log. + + :param message: The log message + :type message: str + + :param name: Name of te log instance, qgis_cplus is the default + :type message: str + + :param info: Whether the message is about info or a + warning + :type info: bool + + :param notify: Whether to notify user about the log + :type notify: bool + """ + log(message, name=name, info=info, notify=notify) + def is_metric_configuration_valid(self) -> bool: """Checks if the setup of the metric configuration for the scenario analysis report is correct. @@ -1902,6 +2000,7 @@ def run_analysis(self): activities=self.analysis_activities, priority_layer_groups=self.analysis_priority_layers_groups, ) + task_config = self.create_task_config(scenario, clip_to_studyarea) self.processing_cancelled = False @@ -1996,12 +2095,7 @@ def run_analysis(self): if self.processing_type.isChecked(): analysis_task = ScenarioAnalysisTaskApiClient( - self.analysis_scenario_name, - self.analysis_scenario_description, - self.analysis_activities, - self.analysis_priority_layers_groups, - self.analysis_extent, - scenario, + task_config, SpatialExtent( bbox=[ passed_extent.xMinimum(), @@ -2014,15 +2108,7 @@ def run_analysis(self): clip_to_studyarea, ) else: - analysis_task = ScenarioAnalysisTask( - self.analysis_scenario_name, - self.analysis_scenario_description, - self.analysis_activities, - self.analysis_priority_layers_groups, - self.analysis_extent, - scenario, - clip_to_studyarea, - ) + analysis_task = ScenarioAnalysisTask(task_config) self.run_cplus_main_task(progress_dialog, scenario, analysis_task) @@ -2037,6 +2123,7 @@ def run_analysis(self): ', error message "{}"'.format(err) ) ) + log(traceback.format_exc()) def selected_activities(self) -> typing.List[Activity]: """Gets the collection of selected activities. diff --git a/src/cplus_plugin/gui/settings/cplus_options.py b/src/cplus_plugin/gui/settings/cplus_options.py index 2426a7298..2d150ea79 100644 --- a/src/cplus_plugin/gui/settings/cplus_options.py +++ b/src/cplus_plugin/gui/settings/cplus_options.py @@ -47,7 +47,7 @@ settings_manager, Settings, ) -from ...models.base import DataSourceType +from ...models.source import DataSourceType from ...definitions.constants import CPLUS_OPTIONS_KEY, NO_DATA_VALUE from ...definitions.defaults import ( GENERAL_OPTIONS_TITLE, @@ -62,7 +62,7 @@ from ...lib.validation.feedback import ValidationFeedback from ...lib.validation.validators import DataValidator from ...models.validation import RuleInfo, RuleType -from ...models.base import DataSourceType, LayerModelComponent, LayerType +from cplus_core.models.base import LayerModelComponent, LayerType from ...trends_earth.constants import API_URL, TIMEOUT from ...utils import FileUtils, log, tr, convert_size from ...trends_earth import auth, api, download diff --git a/src/cplus_plugin/gui/settings/priority_layer_add.py b/src/cplus_plugin/gui/settings/priority_layer_add.py index aabce2942..4685d7f1d 100644 --- a/src/cplus_plugin/gui/settings/priority_layer_add.py +++ b/src/cplus_plugin/gui/settings/priority_layer_add.py @@ -10,7 +10,7 @@ ) from ...api.layer_tasks import CreateUpdateDefaultLayerTask from ...definitions.defaults import ICON_PATH, USER_DOCUMENTATION_SITE -from ...models.base import LayerType +from cplus_core.models.base import LayerType from ...trends_earth import api from ...trends_earth.constants import API_URL, TIMEOUT from ...utils import ( diff --git a/src/cplus_plugin/lib/carbon.py b/src/cplus_plugin/lib/carbon.py index 4f34c1485..c64b795b1 100644 --- a/src/cplus_plugin/lib/carbon.py +++ b/src/cplus_plugin/lib/carbon.py @@ -22,7 +22,8 @@ from qgis import processing from ..conf import settings_manager, Settings -from ..models.base import Activity, DataSourceType, NcsPathwayType +from cplus_core.models.base import Activity, NcsPathwayType +from ..models.source import DataSourceType from ..utils import log diff --git a/src/cplus_plugin/lib/reports/comparison_table.py b/src/cplus_plugin/lib/reports/comparison_table.py index d704d72c7..4c9462a11 100644 --- a/src/cplus_plugin/lib/reports/comparison_table.py +++ b/src/cplus_plugin/lib/reports/comparison_table.py @@ -14,7 +14,7 @@ ) from qgis.PyQt import QtCore, QtGui -from ...models.base import ScenarioResult +from cplus_core.models.base import ScenarioResult from ...models.helpers import layer_from_scenario_result from ...models.report import ScenarioAreaInfo from ...utils import calculate_raster_area_by_pixel_value, log, tr diff --git a/src/cplus_plugin/lib/reports/generator.py b/src/cplus_plugin/lib/reports/generator.py index ead7e84d3..ba7a9a3f7 100644 --- a/src/cplus_plugin/lib/reports/generator.py +++ b/src/cplus_plugin/lib/reports/generator.py @@ -68,8 +68,8 @@ REPORT_COLOR_TREEFOG, ) from .layout_items import BasicScenarioDetailsItem, CplusMapRepeatItem +from cplus_core.models.base import Activity, ScenarioResult from .metrics import create_metrics_expression_context, evaluate_activity_metric -from ...models.base import Activity, ScenarioResult from ...models.helpers import extent_to_project_crs_extent from ...models.report import ( ActivityContextInfo, diff --git a/src/cplus_plugin/lib/reports/layout_items.py b/src/cplus_plugin/lib/reports/layout_items.py index 0845b1afe..7cb3c5c47 100644 --- a/src/cplus_plugin/lib/reports/layout_items.py +++ b/src/cplus_plugin/lib/reports/layout_items.py @@ -25,7 +25,7 @@ ) from qgis.PyQt import QtCore, QtGui -from ...models.base import ModelComponentType, ScenarioResult +from cplus_core.models.base import ModelComponentType, ScenarioResult from ...utils import FileUtils, get_report_font, log, tr diff --git a/src/cplus_plugin/lib/reports/manager.py b/src/cplus_plugin/lib/reports/manager.py index bb2f2e6ab..64da36601 100644 --- a/src/cplus_plugin/lib/reports/manager.py +++ b/src/cplus_plugin/lib/reports/manager.py @@ -32,7 +32,7 @@ SCENARIO_ANALYSIS_METRICS_TEMPLATE_NAME, SCENARIO_COMPARISON_TEMPLATE_NAME, ) -from ...models.base import Scenario, ScenarioResult +from cplus_core.models.base import Scenario, ScenarioResult from ...models.report import ( ReportContext, ReportResult, diff --git a/src/cplus_plugin/lib/validation/manager.py b/src/cplus_plugin/lib/validation/manager.py index edef41d69..6b1ed7023 100644 --- a/src/cplus_plugin/lib/validation/manager.py +++ b/src/cplus_plugin/lib/validation/manager.py @@ -12,7 +12,7 @@ from qgis.PyQt import QtCore, sip -from ...models.base import ModelComponentType, NcsPathway +from cplus_core.models.base import ModelComponentType, NcsPathway from ...models.helpers import clone_ncs_pathway from ...models.validation import SubmitResult, ValidationResult from .validators import NcsDataValidator diff --git a/src/cplus_plugin/lib/validation/validators.py b/src/cplus_plugin/lib/validation/validators.py index efff6e5d5..0b3de31d4 100644 --- a/src/cplus_plugin/lib/validation/validators.py +++ b/src/cplus_plugin/lib/validation/validators.py @@ -27,7 +27,7 @@ resolution_validation_config, ) from .feedback import ValidationFeedback -from ...models.base import LayerModelComponent, ModelComponentType, NcsPathway +from cplus_core.models.base import LayerModelComponent, ModelComponentType, NcsPathway from ...models.validation import ( RuleConfiguration, RuleInfo, diff --git a/src/cplus_plugin/main.py b/src/cplus_plugin/main.py index de6b09dde..9835e9889 100644 --- a/src/cplus_plugin/main.py +++ b/src/cplus_plugin/main.py @@ -52,7 +52,7 @@ from .lib.reports.layout_items import CplusMapRepeatItemLayoutItemMetadata from .lib.reports.manager import report_manager from .lib.reports.metrics import register_metric_functions, unregister_metric_functions -from .models.base import PriorityLayerType +from cplus_core.models.base import PriorityLayerType from .models.report import MetricConfigurationProfile, MetricProfileCollection from .gui.settings.cplus_options import CplusOptionsFactory from .gui.settings.log_options import LogOptionsFactory diff --git a/src/cplus_plugin/models/base.py b/src/cplus_plugin/models/base.py deleted file mode 100644 index a1727985c..000000000 --- a/src/cplus_plugin/models/base.py +++ /dev/null @@ -1,667 +0,0 @@ -# -*- coding: utf-8 -*- - -""" QGIS CPLUS plugin models. -""" - -import dataclasses -import datetime -import enum -from enum import Enum, IntEnum -import os.path -import typing -from uuid import UUID - -from qgis.core import ( - QgsColorBrewerColorRamp, - QgsColorRamp, - QgsCptCityColorRamp, - QgsFillSymbol, - QgsGradientColorRamp, - QgsLimitedRandomColorRamp, - QgsMapLayer, - QgsPresetSchemeColorRamp, - QgsRandomColorRamp, - QgsRasterLayer, - QgsVectorLayer, -) - -from ..definitions.constants import ( - COLOR_RAMP_PROPERTIES_ATTRIBUTE, - COLOR_RAMP_TYPE_ATTRIBUTE, - ACTIVITY_LAYER_STYLE_ATTRIBUTE, - ACTIVITY_SCENARIO_STYLE_ATTRIBUTE, -) - - -@dataclasses.dataclass -class SpatialExtent: - """Extent object that stores - the coordinates of the area of interest and the analysis - coordinate reference system (CRS). - """ - - bbox: typing.List[float] - crs: typing.Optional[str] = None - - -class PRIORITY_GROUP(Enum): - """Represents priority groups types""" - - CARBON_IMPORTANCE = "Carbon importance" - BIODIVERSITY = "Biodiversity" - LIVELIHOOD = "Livelihood" - CLIMATE_RESILIENCE = "Climate Resilience" - ECOLOGICAL_INFRASTRUCTURE = "Ecological infrastructure" - POLICY = "Policy" - FINANCE_YEARS_EXPERIENCE = "Finance - Years Experience" - FINANCE_MARKET_TRENDS = "Finance - Market Trends" - FINANCE_NET_PRESENT_VALUE = "Finance - Net Present value" - FINANCE_CARBON = "Finance - Carbon" - - -@dataclasses.dataclass -class BaseModelComponent: - """Base class for common model item properties.""" - - uuid: UUID - name: str - description: str - - def __eq__(self, other: "BaseModelComponent") -> bool: - """Test equality of object with another BaseModelComponent - object using the attributes. - - :param other: BaseModelComponent object to compare with this object. - :type other: BaseModelComponent - - :returns: True if the all the attribute values match, else False. - :rtype: bool - """ - if self.uuid != other.uuid: - return False - - if self.name != other.name: - return False - - if self.description != other.description: - return False - - return True - - -BaseModelComponentType = typing.TypeVar( - "BaseModelComponentType", bound=BaseModelComponent -) - - -class LayerType(IntEnum): - """QGIS spatial layer type.""" - - RASTER = 0 - VECTOR = 1 - UNDEFINED = -1 - - -class ModelComponentType(Enum): - """Type of model component i.e. NCS pathway or - activity. - """ - - NCS_PATHWAY = "ncs_pathway" - ACTIVITY = "activity" - UNKNOWN = "unknown" - - @staticmethod - def from_string(str_enum: str) -> "ModelComponentType": - """Creates an enum from the corresponding string equivalent. - - :param str_enum: String representing the model component type. - :type str_enum: str - - :returns: Component type enum corresponding to the given - string else unknown if not found. - :rtype: ModelComponentType - """ - if str_enum.lower() == "ncs_pathway": - return ModelComponentType.NCS_PATHWAY - elif str_enum.lower() == "activity": - return ModelComponentType.ACTIVITY - - return ModelComponentType.UNKNOWN - - -@dataclasses.dataclass -class LayerModelComponent(BaseModelComponent): - """Base class for model components that support - a map layer. - """ - - path: str = "" - layer_type: LayerType = LayerType.UNDEFINED - user_defined: bool = False - - def __post_init__(self): - """Try to set the layer and layer type properties.""" - if self.layer_uuid: - return - self.update_layer_type() - - @property - def layer_uuid(self): - """Return Layer UUID for default layer. - - Default layer's path will start with 'cplus://'. - :return: Server Layer UUID - :rtype: str - """ - if self.path.startswith("cplus://"): - return self.path.replace("cplus://", "") - return None - - def update_layer_type(self): - """Update the layer type if either the layer or - path properties have been set. - """ - layer = self.to_map_layer() - if layer is None: - return - - if not layer.isValid(): - return - - if isinstance(layer, QgsRasterLayer): - self.layer_type = LayerType.RASTER - - elif isinstance(layer, QgsVectorLayer): - self.layer_type = LayerType.VECTOR - - def to_map_layer(self) -> typing.Union[QgsMapLayer, None]: - """Constructs a map layer from the specified path. - - It will first check if the layer property has been set - else try to construct the layer from the path else return - None. - - :returns: Map layer corresponding to the set layer - property or specified path. - :rtype: QgsMapLayer - """ - if not os.path.exists(self.path): - return None - - layer = None - if self.layer_type == LayerType.RASTER: - layer = QgsRasterLayer(self.path, self.name) - - elif self.layer_type == LayerType.VECTOR: - layer = QgsVectorLayer(self.path, self.name) - - return layer - - def is_valid(self) -> bool: - """Checks if the corresponding map layer is valid. - - :returns: True if the map layer is valid, else False if map layer is - invalid or of None type. - :rtype: bool - """ - if self.layer_uuid: - return True - layer = self.to_map_layer() - if layer is None: - return False - - return layer.isValid() - - def __eq__(self, other) -> bool: - """Uses BaseModelComponent equality test rather than - what the dataclass default implementation will provide. - """ - if self.layer_uuid: - return self.layer_uuid == other.layer_uuid - return super().__eq__(other) - - def is_default_layer(self) -> bool: - """Check if layer is a default layer - - :return: True if layer comes from server API - :rtype: bool - """ - return self.layer_uuid is not None - - -LayerModelComponentType = typing.TypeVar( - "LayerModelComponentType", bound=LayerModelComponent -) - - -class PriorityLayerType(IntEnum): - """Type of priority weighting layer.""" - - DEFAULT = 0 - NPV = 1 - - -@dataclasses.dataclass -class PriorityLayer(BaseModelComponent): - """Base class for model components storing priority weighting layers.""" - - groups: list - selected: bool = False - path: str = "" - type: PriorityLayerType = PriorityLayerType.DEFAULT - - @property - def layer_uuid(self): - """Return Layer UUID for default layer. - - Default layer's path will start with 'cplus://'. - :return: Server Layer UUID - :rtype: str - """ - if self.path.startswith("cplus://"): - return self.path.replace("cplus://", "") - return None - - def __eq__(self, other) -> bool: - """Uses BaseModelComponent equality test rather than - what the dataclass default implementation will provide. - """ - if self.layer_uuid: - return self.layer_uuid == other.layer_uuid - return super().__eq__(other) - - def is_default_layer(self) -> bool: - """Check if layer is a default layer - - :return: True if layer comes from server API - :rtype: bool - """ - return self.layer_uuid is not None - - -class NcsPathwayType(IntEnum): - """Type of NCS pathway.""" - - PROTECT = 0 - RESTORE = 1 - MANAGE = 2 - UNDEFINED = -1 - - @staticmethod - def from_int(int_enum: int) -> "NcsPathwayType": - """Creates an enum from the corresponding int equivalent. - - :param int_enum: Integer representing the NCS pathway type. - :type int_enum: int - - :returns: NCS pathway type enum corresponding to the given - integer else unknown if not found. - :rtype: NcsPathwayType - """ - return { - 0: NcsPathwayType.PROTECT, - 1: NcsPathwayType.RESTORE, - 2: NcsPathwayType.MANAGE, - -1: NcsPathwayType.UNDEFINED, - }[int_enum] - - -@dataclasses.dataclass -class NcsPathway(LayerModelComponent): - """Contains information about an NCS pathway layer.""" - - pathway_type: NcsPathwayType = NcsPathwayType.UNDEFINED - priority_layers: typing.List[typing.Dict] = dataclasses.field(default_factory=list) - - def __eq__(self, other: "NcsPathway") -> bool: - """Test equality of NcsPathway object with another - NcsPathway object using the attributes. - - Excludes testing the map layer for equality. - - :param other: NcsPathway object to compare with this object. - :type other: NcsPathway - - :returns: True if all the attribute values match, else False. - :rtype: bool - """ - base_equality = super().__eq__(other) - if not base_equality: - return False - - if self.path != other.path: - return False - - if self.layer_type != other.layer_type: - return False - - if self.user_defined != other.user_defined: - return False - - return True - - def pw_layers(self) -> typing.List[QgsRasterLayer]: - """Returns the list of priority weighting layers defined under - the :py:attr:`~priority_layers` attribute. - - :returns: Priority layers for the implementation or an empty list - if the path is not defined. - :rtype: list - """ - return [ - QgsRasterLayer(layer.get("path")) - for layer in self.priority_layers - if layer.get("path") - ] - - def is_pwls_valid(self) -> bool: - """Checks if the priority layers are valid. - - :returns: True if all priority layers are valid, else False if - even one is invalid. If there are no priority layers defined, it will - always return True. - :rtype: bool - """ - is_valid = True - for cl in self.pw_layers(): - if not cl.isValid(): - is_valid = False - break - - return is_valid - - -@dataclasses.dataclass -class Activity(LayerModelComponent): - """Contains information about an activity used in a scenario. - If the layer has been set then it will - not be possible to add NCS pathways unless the layer - is cleared. - Priority will be given to the layer property. - """ - - pathways: typing.List[NcsPathway] = dataclasses.field(default_factory=list) - layer_styles: dict = dataclasses.field(default_factory=dict) - mask_paths: typing.List[str] = dataclasses.field(default_factory=list) - style_pixel_value: int = -1 - - @classmethod - def from_dict(cls, activity_dict: typing.Dict): - """Create an Activity object from Activity dict.""" - pathways = [] - for pathway in activity_dict["pathways"]: - del pathway["layer_uuid"] - if "carbon_paths" in pathway: - del pathway["carbon_paths"] - if "carbon_uuids" in pathway: - del pathway["carbon_uuids"] - pathways.append(NcsPathway(**pathway)) - activity_dict["pathways"] = pathways - # delete mask_uuids using pop - activity_dict.pop("mask_uuids", None) - activity_dict.pop("priority_layers", None) - return Activity(**activity_dict) - - def __post_init__(self): - """Pre-checks on initialization.""" - super().__post_init__() - - # Ensure there are no duplicate pathways. - uuids = [str(p.uuid) for p in self.pathways] - - if len(set(uuids)) != len(uuids): - msg = "Duplicate pathways found in activity" - raise ValueError(f"{msg} {self.name}.") - - # Reset pathways if layer has also been set. - if self.to_map_layer() is not None and len(self.pathways) > 0: - self.pathways = [] - - def contains_pathway(self, pathway_uuid: str) -> bool: - """Checks if there is an NCS pathway matching the given UUID. - - :param pathway_uuid: UUID to search for in the collection. - :type pathway_uuid: str - - :returns: True if there is a matching NCS pathway, else False. - :rtype: bool - """ - ncs_pathway = self.pathway_by_uuid(pathway_uuid) - if ncs_pathway is None: - return False - - return True - - def add_ncs_pathway(self, ncs: NcsPathway) -> bool: - """Adds an NCS pathway object to the collection. - - :param ncs: NCS pathway to be added to the activity. - :type ncs: NcsPathway - - :returns: True if the NCS pathway was successfully added, else False - if there was an existing NCS pathway object with a similar UUID or - the layer property had already been set. - """ - - if not ncs.is_valid(): - return False - - if self.contains_pathway(str(ncs.uuid)): - return False - - self.pathways.append(ncs) - - return True - - def clear_layer(self): - """Removes a reference to the layer URI defined in the path attribute.""" - self.path = "" - - def remove_ncs_pathway(self, pathway_uuid: str) -> bool: - """Removes the NCS pathway with a matching UUID from the collection. - - :param pathway_uuid: UUID for the NCS pathway to be removed. - :type pathway_uuid: str - - :returns: True if the NCS pathway object was successfully removed, - else False if there is no object matching the given UUID. - :rtype: bool - """ - idxs = [i for i, p in enumerate(self.pathways) if str(p.uuid) == pathway_uuid] - - if len(idxs) == 0: - return False - - rem_idx = idxs[0] - _ = self.pathways.pop(rem_idx) - - return True - - def pathway_by_uuid(self, pathway_uuid: str) -> typing.Union[NcsPathway, None]: - """Returns an NCS pathway matching the given UUID. - - :param pathway_uuid: UUID for the NCS pathway to retrieve. - :type pathway_uuid: str - - :returns: NCS pathway object matching the given UUID else None if - not found. - :rtype: NcsPathway - """ - pathways = [p for p in self.pathways if str(p.uuid) == pathway_uuid] - - if len(pathways) == 0: - return None - - return pathways[0] - - def is_valid(self) -> bool: - """Includes an additional check to assert if NCS pathways have - been specified if the layer has not been set or is not valid. - - Does not check for validity of individual NCS pathways in the - collection. - """ - if self.to_map_layer() is not None: - return super().is_valid() - else: - if len(self.pathways) == 0: - return False - - return True - - def scenario_layer_style_info(self) -> dict: - """Returns the fill symbol properties for styling the activity - layer in the final scenario result. - - :returns: Fill symbol properties for the activity layer - styling in the scenario layer or an empty dictionary if there was - no definition found in the root style. - :rtype: dict - """ - if ( - len(self.layer_styles) == 0 - or ACTIVITY_SCENARIO_STYLE_ATTRIBUTE not in self.layer_styles - ): - return dict() - - return self.layer_styles[ACTIVITY_SCENARIO_STYLE_ATTRIBUTE] - - def activity_layer_style_info(self) -> dict: - """Returns the color ramp properties for styling the activity - layer resulting from a scenario run. - - :returns: Color ramp properties for the activity styling or an - empty dictionary if there was no definition found in the root - style. - :rtype: dict - """ - if ( - len(self.layer_styles) == 0 - or ACTIVITY_LAYER_STYLE_ATTRIBUTE not in self.layer_styles - ): - return dict() - - return self.layer_styles[ACTIVITY_LAYER_STYLE_ATTRIBUTE] - - def scenario_fill_symbol(self) -> typing.Union[QgsFillSymbol, None]: - """Creates a fill symbol for the activity in the scenario. - - :returns: Fill symbol for the activity in the scenario - or None if there was no definition found. - :rtype: QgsFillSymbol - """ - scenario_style_info = self.scenario_layer_style_info() - if len(scenario_style_info) == 0: - return None - - return QgsFillSymbol.createSimple(scenario_style_info) - - def color_ramp(self) -> typing.Union[QgsColorRamp, None]: - """Create a color ramp for styling the activity layer resulting - from a scenario run. - - :returns: A color ramp for styling the activity layer or None - if there was no definition found. - :rtype: QgsColorRamp - """ - model_layer_info = self.activity_layer_style_info() - if len(model_layer_info) == 0: - return None - - ramp_info = model_layer_info.get(COLOR_RAMP_PROPERTIES_ATTRIBUTE, None) - if ramp_info is None or len(ramp_info) == 0: - return None - - ramp_type = model_layer_info.get(COLOR_RAMP_TYPE_ATTRIBUTE, None) - if ramp_type is None: - return None - - # New ramp types will need to be added here manually - if ramp_type == QgsColorBrewerColorRamp.typeString(): - return QgsColorBrewerColorRamp.create(ramp_info) - elif ramp_type == QgsCptCityColorRamp.typeString(): - return QgsCptCityColorRamp.create(ramp_info) - elif ramp_type == QgsGradientColorRamp.typeString(): - return QgsGradientColorRamp.create(ramp_info) - elif ramp_type == QgsLimitedRandomColorRamp.typeString(): - return QgsLimitedRandomColorRamp.create(ramp_info) - elif ramp_type == QgsPresetSchemeColorRamp.typeString(): - return QgsPresetSchemeColorRamp.create(ramp_info) - elif ramp_type == QgsRandomColorRamp.typeString(): - return QgsRandomColorRamp() - - return None - - -class ScenarioState(Enum): - """Defines scenario analysis process states""" - - IDLE = 0 - RUNNING = 1 - STOPPED = 3 - FINISHED = 4 - TERMINATED = 5 - - -@dataclasses.dataclass -class Scenario(BaseModelComponent): - """Object for the handling - workflow scenario information. - """ - - extent: SpatialExtent - activities: typing.List[Activity] - priority_layer_groups: typing.List - state: ScenarioState = ScenarioState.IDLE - server_uuid: UUID = None - - -@dataclasses.dataclass -class ScenarioResult: - """Scenario result details.""" - - scenario: typing.Optional[Scenario] - created_date: datetime.datetime = datetime.datetime.now() - analysis_output: typing.Dict = None - output_layer_name: str = "" - scenario_directory: str = "" - - -class DataSourceType(IntEnum): - """Specifies whether a data source is from a local or online source.""" - - LOCAL = 0 - ONLINE = 1 - UNDEFINED = -1 - - @staticmethod - def from_int(int_enum: int) -> "DataSourceType": - """Creates an enum from the corresponding int equivalent. - - :param int_enum: Integer representing the data source type. - :type int_enum: int - - :returns: Data source type enum corresponding to the given - integer else unknown if not found. - :rtype: DataSourceType - """ - return { - 0: DataSourceType.LOCAL, - 1: DataSourceType.ONLINE, - -1: DataSourceType.UNDEFINED, - }[int_enum] - - -class LayerSource(Enum): - """Specify if a layer source is cplus or naturebase.""" - - CPLUS = "CPLUS" - NATUREBASE = "Naturebase" - - -class AreaOfInterestSource(Enum): - """Defines the area of inteterest sources""" - - LAYER = 0 - EXTENT = 1 diff --git a/src/cplus_plugin/models/financial.py b/src/cplus_plugin/models/financial.py index 7094d1e5b..8ba9b8d34 100644 --- a/src/cplus_plugin/models/financial.py +++ b/src/cplus_plugin/models/financial.py @@ -5,7 +5,7 @@ import dataclasses import typing -from .base import NcsPathway +from cplus_core.models.base import Activity, NcsPathway @dataclasses.dataclass diff --git a/src/cplus_plugin/models/helpers.py b/src/cplus_plugin/models/helpers.py index 76e144d06..b102c817c 100644 --- a/src/cplus_plugin/models/helpers.py +++ b/src/cplus_plugin/models/helpers.py @@ -17,8 +17,7 @@ ) from qgis.PyQt import QtCore - -from .base import ( +from cplus_core.models.base import ( BaseModelComponent, BaseModelComponentType, Activity, diff --git a/src/cplus_plugin/models/report.py b/src/cplus_plugin/models/report.py index 8b0ffb953..4987fbe13 100644 --- a/src/cplus_plugin/models/report.py +++ b/src/cplus_plugin/models/report.py @@ -17,7 +17,7 @@ ) from qgis.PyQt import QtCore -from .base import Activity, Scenario, ScenarioResult +from cplus_core.models.base import Activity, Scenario, ScenarioResult @dataclasses.dataclass diff --git a/src/cplus_plugin/models/source.py b/src/cplus_plugin/models/source.py new file mode 100644 index 000000000..6ad86a658 --- /dev/null +++ b/src/cplus_plugin/models/source.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- + +""" QGIS CPLUS plugin models. +""" +from enum import Enum, IntEnum + + +class DataSourceType(IntEnum): + """Specifies whether a data source is from a local or online source.""" + + LOCAL = 0 + ONLINE = 1 + UNDEFINED = -1 + + @staticmethod + def from_int(int_enum: int) -> "DataSourceType": + """Creates an enum from the corresponding int equivalent. + + :param int_enum: Integer representing the data source type. + :type int_enum: int + + :returns: Data source type enum corresponding to the given + integer else unknown if not found. + :rtype: DataSourceType + """ + return { + 0: DataSourceType.LOCAL, + 1: DataSourceType.ONLINE, + -1: DataSourceType.UNDEFINED, + }[int_enum] + + +class LayerSource(Enum): + """Specify if a layer source is cplus or naturebase.""" + + CPLUS = "CPLUS" + NATUREBASE = "Naturebase" + + +class AreaOfInterestSource(Enum): + """Defines the area of inteterest sources""" + + LAYER = 0 + EXTENT = 1 diff --git a/src/cplus_plugin/models/validation.py b/src/cplus_plugin/models/validation.py index fc22b90a0..5fe118073 100644 --- a/src/cplus_plugin/models/validation.py +++ b/src/cplus_plugin/models/validation.py @@ -6,7 +6,7 @@ from enum import IntEnum import typing -from .base import ModelComponentType +from cplus_core.models.base import ModelComponentType class RuleType(IntEnum): diff --git a/src/cplus_plugin/tasks.py b/src/cplus_plugin/tasks.py deleted file mode 100644 index 7b7b2b561..000000000 --- a/src/cplus_plugin/tasks.py +++ /dev/null @@ -1,2617 +0,0 @@ -# coding=utf-8 -""" - Plugin tasks related to the scenario analysis - -""" -import datetime -import json -import math -import os -import uuid -import typing -from pathlib import Path - -from qgis import processing -from qgis.PyQt import QtCore -from qgis.core import ( - Qgis, - QgsCoordinateReferenceSystem, - QgsProcessing, - QgsProcessingContext, - QgsProcessingFeedback, - QgsRasterLayer, - QgsRectangle, - QgsVectorLayer, - QgsWkbTypes, -) -from qgis.core import QgsTask - -from .conf import settings_manager, Settings -from .definitions.constants import NO_DATA_VALUE -from .definitions.defaults import ( - SCENARIO_OUTPUT_FILE_NAME, -) -from .models.base import ScenarioResult, SpatialExtent, Activity, NcsPathway -from .resources import * -from .utils import ( - align_rasters, - clean_filename, - tr, - log, - FileUtils, - CustomJsonEncoder, - todict, -) - - -class ScenarioAnalysisTask(QgsTask): - """Prepares and runs the scenario analysis""" - - status_message_changed = QtCore.pyqtSignal(str) - info_message_changed = QtCore.pyqtSignal(str, int) - - custom_progress_changed = QtCore.pyqtSignal(float) - - def __init__( - self, - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, - scenario, - clip_to_studyarea: bool = False, - ): - super().__init__() - self.analysis_scenario_name = analysis_scenario_name - self.analysis_scenario_description = analysis_scenario_description - - self.analysis_activities = analysis_activities - self.analysis_priority_layers_groups = analysis_priority_layers_groups - self.analysis_extent = analysis_extent - self.analysis_extent_string = None - - self.clip_to_studyarea = clip_to_studyarea - - self.scenario_result = None - self.scenario_directory = None - - self.success = True - self.output = None - self.error = None - self.status_message = None - - self.info_message = None - - self.processing_cancelled = False - self.feedback = QgsProcessingFeedback() - self.processing_context = QgsProcessingContext() - - self.scenario = scenario - - def get_settings_value(self, name: str, default=None, setting_type=None): - """Gets value of the setting with the passed name. - - :param name: Name of setting key - :type name: str - - :param default: Default value returned when the setting key does not exist - :type default: Any - - :param setting_type: Type of the store setting - :type setting_type: Any - - :returns: Value of the setting - :rtype: Any - """ - return settings_manager.get_value(name, default, setting_type) - - def get_scenario_directory(self) -> str: - """Generate scenario directory for current task. - - :return: Path to scenario directory - :rtype: str - """ - base_dir = self.get_settings_value(Settings.BASE_DIR) - return os.path.join( - f"{base_dir}", - "scenario_" f'{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}', - ) - - def get_priority_layer(self, identifier) -> typing.Dict: - """Retrieves the priority layer that matches the passed identifier. - - :param identifier: Priority layers identifier - :type identifier: uuid.UUID - - :returns: Priority layer dict - :rtype: dict - """ - return settings_manager.get_priority_layer(identifier) - - def get_activity(self, activity_uuid) -> typing.Union[Activity, None]: - """Gets an activity object matching the given unique - identifier. - - :param activity_uuid: Unique identifier of the - activity object. - :type activity_uuid: str - - :returns: Returns the activity object matching the given - identifier else None if not found. - :rtype: Activity - """ - return settings_manager.get_activity(activity_uuid) - - def get_priority_layers(self) -> typing.List: - """Gets all the available priority layers in the plugin. - - :returns: Priority layers list - :rtype: list - """ - return settings_manager.get_priority_layers() - - def get_masking_layers(self) -> typing.List: - """Gets all the masking layers. - - :return: List of masking layer paths - :rtype: list - """ - masking_layers_paths = self.get_settings_value( - Settings.MASK_LAYERS_PATHS, default=None - ) - masking_layers = masking_layers_paths.split(",") if masking_layers_paths else [] - masking_layers.remove("") if "" in masking_layers else None - return masking_layers - - def get_reference_layer(self): - """Get the path of the reference layer - - Returns: - str|None: Return the path of the reference layer or None is it doesn't exist - """ - snapping_enabled = self.get_settings_value( - Settings.SNAPPING_ENABLED, default=False, setting_type=bool - ) - reference_layer = self.get_settings_value(Settings.SNAP_LAYER, default="") - reference_layer_path = Path(reference_layer) - if ( - snapping_enabled - and os.path.exists(reference_layer) - and reference_layer_path.is_file() - ): - return reference_layer - - def cancel_task(self, exception=None): - """Cancel current task. - - :param exception: Exception if stopped with error, defaults to None - :type exception: Any, optional - """ - self.error = exception - self.cancel() - - def log_message( - self, - message: str, - name: str = "qgis_cplus", - info: bool = True, - notify: bool = True, - ): - """Logs the message into QGIS logs using qgis_cplus as the default - log instance. - If notify_user is True, user will be notified about the log. - - :param message: The log message - :type message: str - - :param name: Name of te log instance, qgis_cplus is the default - :type message: str - - :param info: Whether the message is about info or a - warning - :type info: bool - - :param notify: Whether to notify user about the log - :type notify: bool - """ - if not isinstance(message, str): - if isinstance(message, dict): - message = json.dumps(message, cls=CustomJsonEncoder) - else: - message = json.dumps(todict(message), cls=CustomJsonEncoder) - log(message, name=name, info=info, notify=notify) - - def on_terminated(self, hide=False): - """Called when the task is terminated.""" - if hide: - message = "Processing has been minimized by the user." - else: - message = "Processing has been cancelled by the user." - if self.error: - message = f"Problem in running scenario analysis: {self.error}" - self.set_status_message(tr(message)) - self.log_message(message) - - def run(self): - """Runs the main scenario analysis task operations""" - - self.scenario_directory = self.get_scenario_directory() - - FileUtils.create_new_dir(self.scenario_directory) - - selected_pathway = None - pathway_found = False - - for activity in self.analysis_activities: - if pathway_found: - break - for pathway in activity.pathways: - if pathway is not None: - pathway_found = True - selected_pathway = pathway - break - - target_layer = QgsRasterLayer(selected_pathway.path, selected_pathway.name) - - self.analysis_crs = self.analysis_extent.crs - - if self.analysis_crs is not None: - # Use the CRS of the analysis if it is provided - dest_crs = QgsCoordinateReferenceSystem(self.analysis_crs) - else: - # Use the CRS of the target layer if it exists - # or use EPSG:4326 as a default CRS - dest_crs = ( - target_layer.crs() - if selected_pathway and selected_pathway.path - else QgsCoordinateReferenceSystem("EPSG:4326") - ) - - processing_extent = QgsRectangle( - float(self.analysis_extent.bbox[0]), - float(self.analysis_extent.bbox[2]), - float(self.analysis_extent.bbox[1]), - float(self.analysis_extent.bbox[3]), - ) - - snapped_extent = self.align_extent(target_layer, processing_extent) - - extent_string = ( - f"{snapped_extent.xMinimum()},{snapped_extent.xMaximum()}," - f"{snapped_extent.yMinimum()},{snapped_extent.yMaximum()}" - f" [{dest_crs.authid()}]" - ) - - self.log_message( - "Original area of interest extent: " - f"{processing_extent.asWktPolygon()} \n" - ) - self.log_message( - "Snapped area of interest extent " f"{snapped_extent.asWktPolygon()} \n" - ) - # Run pathways layers snapping using a specified reference layer - snapping_enabled = self.get_settings_value( - Settings.SNAPPING_ENABLED, default=False, setting_type=bool - ) - reference_layer = self.get_reference_layer() - if snapping_enabled and reference_layer: - self.snap_analysis_data( - self.analysis_activities, - extent_string, - ) - - # Clip to StudyArea - studyarea_path = self.get_settings_value(Settings.STUDYAREA_PATH, default="") - if self.clip_to_studyarea and os.path.exists(studyarea_path): - self.clip_analysis_data(studyarea_path) - - # Reproject the pathways and priority layers to the - # scenario CRS if it is not the same as the pathways CRS - - if self.analysis_crs is not None: - self.reproject_pathways( - target_extent=extent_string, - target_crs=QgsCoordinateReferenceSystem(self.analysis_crs), - ) - - # Replace no data value for the pathways and priority layers - nodata_value = float( - self.get_settings_value( - Settings.NCS_NO_DATA_VALUE, default=NO_DATA_VALUE, setting_type=float - ) - ) - self.log_message( - f"Replacing nodata value for the pathways and priority layers to {nodata_value}" - ) - self.run_pathways_replace_nodata(nodata_value=nodata_value) - - # Weight the pathways using the pathway suitability index - # and priority group coefficients for the PWLs - save_output = self.get_settings_value( - Settings.NCS_WEIGHTED, default=True, setting_type=bool - ) - self.run_pathways_weighting( - self.analysis_activities, - self.analysis_priority_layers_groups, - extent_string, - temporary_output=not save_output, - ) - - # Creating activities from the weighted pathways - save_output = self.get_settings_value( - Settings.LANDUSE_PROJECT, default=True, setting_type=bool - ) - self.run_activities_analysis( - self.analysis_activities, - extent_string, - temporary_output=not save_output, - ) - - # Run masking of the activities layers - masking_layers = self.get_masking_layers() - if masking_layers: - self.run_activities_masking( - self.analysis_activities, - masking_layers, - extent_string, - ) - - # Run internal masking of the activities layers - self.run_internal_activities_masking( - self.analysis_activities, - extent_string, - ) - # TODO enable the sieve functionality - sieve_enabled = self.get_settings_value( - Settings.SIEVE_ENABLED, default=False, setting_type=bool - ) - - if sieve_enabled: - self.run_activities_sieve( - self.analysis_activities, - ) - - # After creating activities, we normalize them using the - # suitability index - save_output = self.get_settings_value( - Settings.LANDUSE_NORMALIZED, default=True, setting_type=bool - ) - - # Clean up activities - self.run_activities_cleaning( - self.analysis_activities, extent_string, temporary_output=not save_output - ) - - # The highest position tool analysis - save_output = self.get_settings_value( - Settings.HIGHEST_POSITION, default=True, setting_type=bool - ) - self.run_highest_position_analysis(temporary_output=not save_output) - - return True - - def finished(self, result: bool): - """Calls the handler responsible for doing post analysis workflow. - - :param result: Whether the run() operation finished successfully - :type result: bool - """ - if result: - self.log_message("Finished from the main task \n") - else: - self.log_message(f"Error from task scenario task {self.error}") - - def set_status_message(self, message: str): - """Set status message in progress dialog - - :param message: Message to be displayed - :type message: str - """ - self.status_message = message - self.status_message_changed.emit(self.status_message) - - def set_info_message(self, message: str, level=Qgis.Info): - """Set info message. - - :param message: Message - :type message: str - :param level: log level, defaults to Qgis.Info - :type level: int, optional - """ - self.info_message = message - self.info_message_changed.emit(self.info_message, level) - - def set_custom_progress(self, value: float): - """Set task progress value. - - :param value: Value to be set on the progress bar - :type value: float - """ - self.custom_progress = value - self.custom_progress_changed.emit(self.custom_progress) - - def update_progress(self, value): - """Sets the value of the task progress - - :param value: Value to be set on the progress bar - :type value: float - """ - if not self.processing_cancelled: - self.set_custom_progress(value) - else: - self.feedback = QgsProcessingFeedback() - self.processing_context = QgsProcessingContext() - - def align_extent(self, raster_layer, target_extent): - """Snaps the passed extent to the activities pathway layer pixel bounds - - :param raster_layer: The target layer that the passed extent will be - aligned with - :type raster_layer: QgsRasterLayer - - :param target_extent: Spatial extent that will be used a target extent when - doing alignment. - :type target_extent: QgsRectangle - """ - - try: - raster_extent = raster_layer.extent() - - x_res = raster_layer.rasterUnitsPerPixelX() - y_res = raster_layer.rasterUnitsPerPixelY() - - left = raster_extent.xMinimum() + x_res * math.floor( - (target_extent.xMinimum() - raster_extent.xMinimum()) / x_res - ) - right = raster_extent.xMinimum() + x_res * math.ceil( - (target_extent.xMaximum() - raster_extent.xMinimum()) / x_res - ) - bottom = raster_extent.yMinimum() + y_res * math.floor( - (target_extent.yMinimum() - raster_extent.yMinimum()) / y_res - ) - top = raster_extent.yMaximum() - y_res * math.floor( - (raster_extent.yMaximum() - target_extent.yMaximum()) / y_res - ) - - return QgsRectangle(left, bottom, right, top) - - except Exception as e: - self.log_message( - tr( - f"Problem snapping area of " - f"interest extent, using the original extent," - f"{str(e)}" - ) - ) - - return target_extent - - def replace_nodata( - self, layer_path: str, output_path: str, nodata_value: float = -9999.0 - ): - """Adds nodata value info into the layer available - in the passed layer_path and saves the layer in the passed output_path. - - The addition will replace any current nodata value available in - the input layer. - - :param layer_path: Input layer path. Must be a valid file path to a raster layer. - :type layer_path: str - - :param output_path: Output layer path. Must be a valid file path where the modified raster will be saved. - :type output_path: str - - :param nodata_value: No data value to be set in the output layer. Defaults to -9999.0 - :type nodata_value: float - - :returns: Whether the task operations were successful - :rtype: bool - - """ - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - try: - alg_params = { - "COPY_SUBDATASETS": False, - "DATA_TYPE": 6, # Float32 - "EXTRA": "", - "INPUT": layer_path, - "NODATA": None, - "OPTIONS": "", - "TARGET_CRS": None, - "OUTPUT": QgsProcessing.TEMPORARY_OUTPUT, - } - translate_output = processing.run( - "gdal:translate", - alg_params, - context=self.processing_context, - feedback=self.feedback, - is_child_algorithm=True, - ) - - alg_params = { - "DATA_TYPE": 0, # Use Input Layer Data Type - "EXTRA": "", - "INPUT": translate_output["OUTPUT"], - "MULTITHREADING": False, - "NODATA": nodata_value, - "OPTIONS": "", - "RESAMPLING": 0, # Nearest Neighbour - "SOURCE_CRS": None, - "TARGET_CRS": None, - "TARGET_EXTENT": None, - "TARGET_EXTENT_CRS": None, - "TARGET_RESOLUTION": None, - "OUTPUT": output_path, - } - outputs = processing.run( - "gdal:warpreproject", - alg_params, - context=self.processing_context, - feedback=self.feedback, - is_child_algorithm=True, - ) - - return outputs is not None - except Exception as e: - log(f"Problem replacing no data value from a snapping output, {e}") - - return False - - def run_pathways_replace_nodata(self, nodata_value: float = -9999.0) -> bool: - """Replace the nodata value for activity pathways and priority layers. - :param nodata_value: The nodata value to replace in the pathways and priority layers - :type nodata_value: float - :returns: True if the task operation was successfully completed else False. - :rtype: bool - """ - if self.processing_cancelled: - return False - - self.set_status_message( - tr( - "Replacing the nodata value for the activity pathways and priority layers" - ) - ) - - pathways: typing.List[NcsPathway] = [] - - try: - for activity in self.analysis_activities: - if not activity.pathways and ( - activity.path is None or activity.path == "" - ): - self.set_info_message( - tr( - f"No defined activity pathways or " - f" activity layers for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"No defined activity pathways or " - f"activity layers for the activity {activity.name}" - ) - return False - - for pathway in activity.pathways: - if not (pathway in pathways): - pathways.append(pathway) - - if pathways is not None and len(pathways) > 0: - replaced_nodata_pathways_directory = os.path.join( - self.scenario_directory, "replaced_nodata_pathways" - ) - - FileUtils.create_new_dir(replaced_nodata_pathways_directory) - - for pathway in pathways: - pathway_layer = QgsRasterLayer(pathway.path, pathway.name) - - if self.processing_cancelled: - return False - if not pathway_layer.isValid(): - self.log_message( - f"Pathway layer {pathway.name} is not valid, " - f"skipping replacing nodata value for layer." - ) - continue - raster_provider = pathway_layer.dataProvider() - raster_no_data_value = raster_provider.sourceNoDataValue(1) - if raster_no_data_value == nodata_value: - self.log_message( - f"Pathway layer {pathway.name} already has the nodata value " - f"{nodata_value}, skipping replacing nodata value for layer." - ) - else: - self.log_message( - f"Replacing nodata value for {pathway.name} pathway layer " - f"to {nodata_value}\n" - ) - - output_file = os.path.join( - replaced_nodata_pathways_directory, - f"{Path(pathway.path).stem}_{str(self.scenario.uuid)[:4]}.tif", - ) - - result = self.replace_nodata( - pathway.path, output_file, nodata_value - ) - if result: - pathway.path = output_file - - self.log_message( - f"Replacing nodata value for {len(pathway.priority_layers)} " - f"priority weighting layers from pathway {pathway.name}\n" - ) - - if ( - pathway.priority_layers is not None - and len(pathway.priority_layers) > 0 - ): - replaced_nodata_priority_directory = os.path.join( - self.scenario_directory, "replaced_nodata_priority_layers" - ) - - FileUtils.create_new_dir(replaced_nodata_priority_directory) - - priority_layers = [] - for priority_layer in pathway.priority_layers: - if priority_layer is None: - continue - - if self.processing_cancelled: - return False - - priority_layer_settings = self.get_priority_layer( - priority_layer.get("uuid") - ) - if priority_layer_settings is None: - continue - - priority_layer_path = priority_layer_settings.get("path") - - if not Path(priority_layer_path).exists(): - priority_layers.append(priority_layer) - continue - - layer = QgsRasterLayer( - priority_layer_path, f"{str(uuid.uuid4())[:4]}" - ) - if not layer.isValid(): - self.log_message( - f"Priority layer {priority_layer.get('name')} " - f"from pathway {pathway.name} is not valid, " - f"skipping replacing nodata value for layer." - ) - continue - - raster_provider = layer.dataProvider() - raster_no_data_value = raster_provider.sourceNoDataValue(1) - if raster_no_data_value == nodata_value: - self.log_message( - f"Priority layer {priority_layer.get('name')} already has the nodata value " - f"{nodata_value}, skipping replacing nodata value for layer." - ) - else: - self.log_message( - f"Replacing nodata value for {priority_layer.get('name')} priority layer " - f"to {nodata_value}\n" - ) - - output_file = os.path.join( - replaced_nodata_priority_directory, - f"{Path(pathway.path).stem}_{str(self.scenario.uuid)[:4]}.tif", - ) - - result = self.replace_nodata( - priority_layer_path, output_file, nodata_value - ) - if result: - priority_layer["path"] = output_file - - priority_layers.append(priority_layer) - - pathway.priority_layers = priority_layers - - except Exception as e: - self.log_message(f"Problem replacing nodata value for layers, {e} \n") - self.cancel_task(e) - return False - - return True - - def clip_raster_by_mask( - self, input_raster_path: str, mask_layer_path: str, output_path: str - ) -> bool: - """Clip a given raster by the specified mask layer. - :param input_raster_path: Input raster path - :type input_raster_path: str - - :param mask_layer_path: Path to the masking layer - :type mask_layer_path: str - - :param output_path: Output layer path - :type output_path: str - - :returns: True if the task operation was successfully completed else False. - :rtype: bool - """ - raster_layer = QgsRasterLayer(input_raster_path, "raster_layer") - mask_layer = QgsVectorLayer(mask_layer_path, "mask_layer") - - if not raster_layer.isValid(): - self.log_message(f"Invalid raster layer: {input_raster_path}\n") - return False - - if not mask_layer.isValid(): - self.log_message(f"Invalid mask layer: {mask_layer_path}\n") - return False - - try: - alg_params = { - "INPUT": input_raster_path, - "MASK": mask_layer, - "SOURCE_CRS": raster_layer.crs(), - "DESTINATION_CRS": raster_layer.crs(), - "OUTPUT": output_path, - "NO_DATA": settings_manager.get_value( - Settings.NCS_NO_DATA_VALUE, NO_DATA_VALUE - ), - "CROP_TO_CUTLINE": True, - } - - self.log_message( - f"Used parameters for clipping the raster: {input_raster_path} " - f" using mask layer: {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - result = processing.run( - "gdal:cliprasterbymasklayer", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - if result.get("OUTPUT"): - return True - - except Exception as e: - self.log_message(f"Problem clipping the layer {e} \n") - return False - - def clip_analysis_data(self, studyarea_path: str) -> bool: - """Clips the activity pathways and priority layers by the given study area. - :param studyarea_path: The path to the study area layer - :type studyarea_path: str - :returns: True if the task operation was successfully completed else False. - :rtype: bool - """ - if self.processing_cancelled: - return False - - mask_layer = QgsVectorLayer(studyarea_path, "mask_layer") - - if not mask_layer.isValid(): - self.log_message( - f"Invalid mask layer: {studyarea_path} " - f"Skipping clipping of activity pathways and priority layers\n" - ) - return False - - self.set_status_message( - tr( - "Clipping the activity pathways and priority layers by the study area layer" - ) - ) - - clipped_pathways_directory = os.path.join( - self.scenario_directory, "clipped_pathways" - ) - FileUtils.create_new_dir(clipped_pathways_directory) - - clipped_priority_directory = os.path.join( - self.scenario_directory, "clipped_priority_layers" - ) - FileUtils.create_new_dir(clipped_priority_directory) - - pathways: typing.List[NcsPathway] = [] - - try: - for activity in self.analysis_activities: - if not activity.pathways and ( - activity.path is None or activity.path == "" - ): - self.set_info_message( - tr( - f"No defined activity pathways or " - f" activity layers for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"No defined activity pathways or " - f"activity layers for the activity {activity.name}" - ) - return False - - for pathway in activity.pathways: - if not (pathway in pathways): - pathways.append(pathway) - - if pathways is not None and len(pathways) > 0: - for pathway in pathways: - pathway_layer = QgsRasterLayer(pathway.path, pathway.name) - - if self.processing_cancelled: - return False - if not pathway_layer.isValid(): - self.log_message( - f"Pathway layer {pathway.name} is not valid, " - f"skipping clipping the layer." - ) - continue - self.log_message( - f"Clipping the {pathway.name} pathway layer by " - f"the study area layer\n" - ) - - output_file = os.path.join( - clipped_pathways_directory, - f"{Path(pathway.path).stem}_{str(self.scenario.uuid)[:4]}.tif", - ) - - result = self.clip_raster_by_mask( - pathway.path, studyarea_path, output_file - ) - if result: - pathway.path = output_file - - self.log_message( - f"Clipping the {pathway.name} pathway's {len(pathway.priority_layers)} " - f"priority weighting layers by study area\n" - ) - - if ( - pathway.priority_layers is not None - and len(pathway.priority_layers) > 0 - ): - priority_layers = [] - for priority_layer in pathway.priority_layers: - if priority_layer is None: - continue - - if self.processing_cancelled: - return False - - priority_layer_settings = self.get_priority_layer( - priority_layer.get("uuid") - ) - if priority_layer_settings is None: - continue - - priority_layer_path = priority_layer_settings.get("path") - - if not Path(priority_layer_path).exists(): - priority_layers.append(priority_layer) - continue - - layer = QgsRasterLayer( - priority_layer_path, f"{str(uuid.uuid4())[:4]}" - ) - if not layer.isValid(): - self.log_message( - f"Priority layer {priority_layer.get('name')} " - f"from pathway {pathway.name} is not valid, " - f"skipping clipping the layer." - ) - continue - - self.log_message( - f"Clipping the {priority_layer.get('name')} priority layer " - f"by study area layer\n" - ) - - output_file = os.path.join( - clipped_priority_directory, - f"{Path(pathway.path).stem}_{str(self.scenario.uuid)[:4]}.tif", - ) - - result = self.clip_raster_by_mask( - priority_layer_path, studyarea_path, output_file - ) - if result: - priority_layer["path"] = output_file - - priority_layers.append(priority_layer) - - pathway.priority_layers = priority_layers - - except Exception as e: - self.log_message(f"Problem clipping the layers, {e} \n") - self.cancel_task(e) - return False - - return True - - def snap_analysis_data(self, activities, extent): - """Snaps the passed activities pathways, carbon layers and priority layers - to align with the reference layer set on the settings - manager. - - :param activities: List of the selected activities - :type activities: typing.List[Activity] - - :param extent: The selected extent from user - :type extent: list - """ - if self.processing_cancelled: - # Will not proceed if processing has been cancelled by the user - return False - - self.set_status_message( - tr( - "Snapping the selected activity pathways, " - "carbon layers and priority layers" - ) - ) - - pathways = [] - - try: - for activity in activities: - if not activity.pathways and ( - activity.path is None or activity.path == "" - ): - self.set_info_message( - tr( - f"No defined activity pathways or a" - f" activity layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"No defined activity pathways or a " - f"activity layer for the activity {activity.name}" - ) - return False - - for pathway in activity.pathways: - if not (pathway in pathways): - pathways.append(pathway) - - reference_layer_path = self.get_settings_value(Settings.SNAP_LAYER) - rescale_values = self.get_settings_value( - Settings.RESCALE_VALUES, default=False, setting_type=bool - ) - - resampling_method = self.get_settings_value( - Settings.RESAMPLING_METHOD, default=0 - ) - - if pathways is not None and len(pathways) > 0: - snapped_pathways_directory = os.path.join( - self.scenario_directory, "pathways" - ) - - FileUtils.create_new_dir(snapped_pathways_directory) - - for pathway in pathways: - pathway_layer = QgsRasterLayer(pathway.path, pathway.name) - nodata_value = pathway_layer.dataProvider().sourceNoDataValue(1) - - if self.processing_cancelled: - return False - - # carbon layer snapping - self.log_message( - f"Snapping carbon layers from {pathway.name} pathway" - ) - - if ( - pathway.carbon_paths is not None - and len(pathway.carbon_paths) > 0 - ): - snapped_carbon_directory = os.path.join( - self.scenario_directory, "carbon_layers" - ) - - FileUtils.create_new_dir(snapped_carbon_directory) - - snapped_carbon_paths = [] - - for carbon_path in pathway.carbon_paths: - carbon_layer = QgsRasterLayer( - carbon_path, f"{str(uuid.uuid4())[:4]}" - ) - nodata_value_carbon = ( - carbon_layer.dataProvider().sourceNoDataValue(1) - ) - - carbon_output_path = self.snap_layer( - carbon_path, - reference_layer_path, - extent, - snapped_carbon_directory, - rescale_values, - resampling_method, - nodata_value_carbon, - ) - - if carbon_output_path: - snapped_carbon_paths.append(carbon_output_path) - else: - snapped_carbon_paths.append(carbon_path) - - pathway.carbon_paths = snapped_carbon_paths - - self.log_message(f"Snapping {pathway.name} pathway layer \n") - - # Pathway snapping - output_path = self.snap_layer( - pathway.path, - reference_layer_path, - extent, - snapped_pathways_directory, - rescale_values, - resampling_method, - nodata_value, - ) - if output_path: - pathway.path = output_path - - for pwl_pathway in pathways: - self.log_message( - f"Snapping {len(pwl_pathway.priority_layers)} " - f"priority weighting layers from pathway {pwl_pathway.name} with layers\n" - ) - - if ( - pwl_pathway.priority_layers is not None - and len(pwl_pathway.priority_layers) > 0 - ): - snapped_priority_directory = os.path.join( - self.scenario_directory, "priority_layers" - ) - - FileUtils.create_new_dir(snapped_priority_directory) - - priority_layers = [] - for priority_layer in pwl_pathway.priority_layers: - if priority_layer is None: - continue - - priority_layer_settings = self.get_priority_layer( - priority_layer.get("uuid") - ) - if priority_layer_settings is None: - continue - - priority_layer_path = priority_layer_settings.get("path") - - if not Path(priority_layer_path).exists(): - priority_layers.append(priority_layer) - continue - - layer = QgsRasterLayer( - priority_layer_path, f"{str(uuid.uuid4())[:4]}" - ) - nodata_value_priority = layer.dataProvider().sourceNoDataValue( - 1 - ) - - priority_output_path = self.snap_layer( - priority_layer_path, - reference_layer_path, - extent, - snapped_priority_directory, - rescale_values, - resampling_method, - nodata_value_priority, - ) - - if priority_output_path: - priority_layer["path"] = priority_output_path - - priority_layers.append(priority_layer) - - pwl_pathway.priority_layers = priority_layers - - except Exception as e: - self.log_message(f"Problem snapping layers, {e} \n") - self.cancel_task(e) - return False - - return True - - def snap_layer( - self, - input_path, - reference_path, - extent, - directory, - rescale_values, - resampling_method, - nodata_value, - ): - """Snaps the passed input layer using the reference layer and updates - the snap output no data value to be the same as the original input layer - no data value. - - :param input_path: Input layer source - :type input_path: str - - :param reference_path: Reference layer source - :type reference_path: str - - :param extent: Clip extent - :type extent: list - - :param directory: Absolute path of the output directory for the snapped - layers - :type directory: str - - :param rescale_values: Whether to rescale pixel values - :type rescale_values: bool - - :param resample_method: Method to use when resampling - :type resample_method: QgsAlignRaster.ResampleAlg - - :param nodata_value: Original no data value of the input layer - :type nodata_value: float - - """ - - input_result_path, reference_result_path = align_rasters( - input_path, - reference_path, - extent, - directory, - rescale_values, - resampling_method, - ) - - if input_result_path is not None: - result_path = Path(input_result_path) - - directory = result_path.parent - name = result_path.stem - - output_path = os.path.join(directory, f"{name}_final.tif") - - self.replace_nodata(input_result_path, output_path, nodata_value) - - return output_path - - def reproject_layer( - self, - input_path: str, - target_crs: QgsCoordinateReferenceSystem, - output_directory: str = None, - target_extent: str = None, - ) -> str: - """Reprojects the input layer to the target CRS and saves it in the - specified output directory. - :param input_path: Input layer path - :type input_path: str - :param target_crs: Target CRS to reproject the layer to - :type target_crs: QgsCoordinateReferenceSystem - :param output_directory: Directory to save the reprojected layer, defaults to None - :type output_directory: str, optional - :param target_extent: Target extent, defaults to None - :type target_extent: str, optional - :returns: Path to the reprojected layer - :rtype: str - """ - if not os.path.exists(input_path): - self.log_message( - f"Input layer {input_path} does not exist, " - f"skipping the layer reprojection." - ) - return None - - if output_directory is None: - output_directory = Path(input_path).parent - - output_file = os.path.join( - output_directory, - f"{Path(input_path).stem}_{str(self.scenario.uuid)[:4]}.tif", - ) - - alg_params = { - "INPUT": input_path, - "TARGET_CRS": target_crs, - "OUTPUT": output_file, - } - - if target_extent is not None and target_extent != "": - alg_params["TARGET_EXTENT"] = target_extent - - self.log_message(f"Used parameters for layer reprojection: " f"{alg_params} \n") - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return None - - results = processing.run( - "gdal:warpreproject", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - return results["OUTPUT"] - - def reproject_pathways( - self, - target_crs: QgsCoordinateReferenceSystem, - target_extent: str = None, - ): - """ - Reprojects the activity pathways and priority layers to the target CRS. - :param target_crs: Target CRS to reproject the layers to - :type target_crs: QgsCoordinateReferenceSystem - :param target_extent: Target extent, defaults to None - :type target_extent: str, optional - :returns: True if the task operation was successfully completed else False. - :rtype: bool - """ - if self.processing_cancelled: - return False - - if target_crs is None or not target_crs.isValid(): - self.set_info_message( - tr("Invalid target CRS for reprojecting pathways."), - level=Qgis.Critical, - ) - self.log_message("Invalid target CRS for reprojecting pathways.") - return False - - self.set_status_message( - tr("Reprojecting the activity pathways and priority layers") - ) - - pathways: typing.List[NcsPathway] = [] - - try: - for activity in self.analysis_activities: - if not activity.pathways and ( - activity.path is None or activity.path == "" - ): - self.set_info_message( - tr( - f"No defined activity pathways or a" - f" activity layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"No defined activity pathways or a " - f"activity layer for the activity {activity.name}" - ) - return False - - for pathway in activity.pathways: - if not (pathway in pathways): - pathways.append(pathway) - - if pathways is not None and len(pathways) > 0: - reprojected_pathways_directory = os.path.join( - self.scenario_directory, "reprojected_pathways" - ) - - FileUtils.create_new_dir(reprojected_pathways_directory) - - for pathway in pathways: - pathway_layer = QgsRasterLayer(pathway.path, pathway.name) - - if self.processing_cancelled: - return False - if not pathway_layer.isValid(): - self.log_message( - f"Pathway layer {pathway.name} is not valid, " - f"skipping layer reprojection." - ) - continue - - if pathway_layer.crs() == target_crs: - self.log_message( - f"Pathway layer {pathway.name} is already in the target CRS " - f"{target_crs.authid()}, skipping layer reprojection." - ) - else: - self.log_message( - f"Reprojecting {pathway.name} pathway layer to {target_crs.authid()}\n" - ) - - output_path = self.reproject_layer( - pathway.path, - target_crs, - reprojected_pathways_directory, - target_extent, - ) - if output_path: - pathway.path = output_path - - self.log_message( - f"Reprojecting {len(pathway.priority_layers)} " - f"priority weighting layers from pathway {pathway.name}\n" - ) - - if ( - pathway.priority_layers is not None - and len(pathway.priority_layers) > 0 - ): - reprojected_priority_directory = os.path.join( - self.scenario_directory, "reprojected_priority_layers" - ) - - FileUtils.create_new_dir(reprojected_priority_directory) - - priority_layers = [] - for priority_layer in pathway.priority_layers: - if priority_layer is None: - continue - - priority_layer_settings = self.get_priority_layer( - priority_layer.get("uuid") - ) - if priority_layer_settings is None: - continue - - priority_layer_path = priority_layer_settings.get("path") - - if not Path(priority_layer_path).exists(): - priority_layers.append(priority_layer) - continue - - layer = QgsRasterLayer( - priority_layer_path, f"{str(uuid.uuid4())[:4]}" - ) - if not layer.isValid(): - self.log_message( - f"Priority layer {priority_layer.get('name')} " - f"from pathway {pathway.name} is not valid, " - f"skipping layer reprojection." - ) - continue - - if layer.crs() == target_crs: - self.log_message( - f"Priority layer {priority_layer.get('name')} " - f"from pathway {pathway.name} is already in the target CRS " - f"{target_crs.authid()}, skipping layer reprojection." - ) - else: - output_path = self.reproject_layer( - priority_layer_path, - target_crs, - reprojected_priority_directory, - target_extent, - ) - if output_path: - priority_layer["path"] = output_path - - priority_layers.append(priority_layer) - - pathway.priority_layers = priority_layers - - except Exception as e: - self.log_message(f"Problem reprojecting layers, {e} \n") - self.cancel_task(e) - return False - - return True - - def run_activities_analysis(self, activities, extent, temporary_output=False): - """Runs the required activity analysis on the passed - activities pathways. The analysis is responsible for creating activities - layers from their respective pathways layers. - - :param activities: List of the selected activities - :type activities: typing.List[Activity] - - :param extent: selected extent from user - :type extent: SpatialExtent - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: Whether the task operations was successful - :rtype: bool - """ - if self.processing_cancelled: - # Will not proceed if processing has been cancelled by the user - return False - - self.set_status_message(tr("Creating activity layers from pathways")) - - try: - for activity in activities: - activities_directory = os.path.join( - self.scenario_directory, "activities" - ) - FileUtils.create_new_dir(activities_directory) - file_name = clean_filename(activity.name.replace(" ", "_")) - - layers = [] - if not activity.pathways and ( - activity.path is None or activity.path == "" - ): - self.set_info_message( - tr( - f"No defined activity pathways or a" - f" activity layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"No defined activity pathways or an " - f"activity layer for the activity {activity.name}" - ) - - return False - - output_file = os.path.join( - activities_directory, f"{file_name}_{str(uuid.uuid4())[:4]}.tif" - ) - - # Due to the activities base class - # activity only one of the following blocks will be executed, - # the activity either contain a path or - # pathways - if activity.path is not None and activity.path != "": - layers = [activity.path] - - for pathway in activity.pathways: - layers.append(pathway.path) - - output = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - reference_layer = self.get_reference_layer() - if (reference_layer is None or reference_layer == "") and len( - layers - ) > 0: - reference_layer = layers[0] - - # Actual processing calculation - alg_params = { - "IGNORE_NODATA": True, - "INPUT": layers, - "EXTENT": extent, - "OUTPUT_NODATA_VALUE": settings_manager.get_value( - Settings.NCS_NO_DATA_VALUE, NO_DATA_VALUE - ), - "REFERENCE_LAYER": reference_layer, - "STATISTIC": 0, # Sum - "OUTPUT": output, - } - - self.log_message( - f"Used parameters for " f"activities generation: {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - results = processing.run( - "native:cellstatistics", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - activity.path = results["OUTPUT"] - - except Exception as e: - self.log_message(f"Problem creating activity layers, {e}") - self.cancel_task(e) - return False - - return True - - def run_activities_masking( - self, activities, masking_layers, extent, temporary_output=False - ): - """Applies the mask layers into the passed activities - - :param activities: List of the selected activities - :type activities: typing.List[Activity] - - :param masking_layers: Paths to the mask layers to be used - :type masking_layers: dict - - :param extent: selected extent from user - :type extent: str - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: Whether the task operations was successful - :rtype: bool - """ - if self.processing_cancelled: - # Will not proceed if processing has been cancelled by the user - return False - - self.set_status_message(tr("Masking activities using the saved masked layers")) - - try: - if len(masking_layers) < 1: - return False - if len(masking_layers) > 1: - initial_mask_layer = self.merge_vector_layers(masking_layers) - else: - mask_layer_path = masking_layers[0] - initial_mask_layer = QgsVectorLayer(mask_layer_path, "mask", "ogr") - - if not initial_mask_layer.isValid(): - self.log_message( - f"Skipping activities masking " - f"using layer {mask_layer_path}, not a valid layer." - ) - return False - - # see https://qgis.org/pyqgis/master/core/Qgis.html#qgis.core.Qgis.GeometryType - if Qgis.versionInt() < 33000: - layer_check = ( - initial_mask_layer.geometryType() == QgsWkbTypes.PolygonGeometry - ) - else: - layer_check = ( - initial_mask_layer.geometryType() == Qgis.GeometryType.Polygon - ) - - if not layer_check: - self.log_message( - f"Skipping activities masking " - f"using layer {mask_layer_path}, not a polygon layer." - ) - return False - - extent_layer = self.layer_extent(extent) - mask_layer = self.mask_layer_difference(initial_mask_layer, extent_layer) - - if isinstance(mask_layer, str): - mask_layer = QgsVectorLayer(mask_layer, "ogr") - - if not mask_layer.isValid(): - self.log_message( - f"Skipping activities masking " - f"the created difference mask layer {mask_layer.source()}," - f" not a valid layer." - ) - return False - - for activity in activities: - if activity.path is None or activity.path == "": - if not self.processing_cancelled: - self.set_info_message( - tr( - f"Problem when masking activities, " - f"there is no map layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"Problem when masking activities, " - f"there is no map layer for the activity {activity.name}" - ) - else: - # If the user cancelled the processing - self.set_info_message( - tr(f"Processing has been cancelled by the user."), - level=Qgis.Critical, - ) - self.log_message(f"Processing has been cancelled by the user.") - - return False - - masked_activities_directory = os.path.join( - self.scenario_directory, "masked_activities" - ) - FileUtils.create_new_dir(masked_activities_directory) - file_name = clean_filename(activity.name.replace(" ", "_")) - - output_file = os.path.join( - masked_activities_directory, - f"{file_name}_{str(uuid.uuid4())[:4]}.tif", - ) - - output = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - activity_layer = QgsRasterLayer(activity.path, "activity_layer") - - if activity_layer.crs() != mask_layer.crs(): - self.log_message( - f"Skipping masking, activity layer and" - f" mask layer(s) have different CRS" - ) - continue - - if not activity_layer.extent().intersects(mask_layer.extent()): - self.log_message( - "Skipping masking, the extents of the activity layer " - "and mask layer(s) do not overlap." - ) - continue - - # Actual processing calculation - alg_params = { - "INPUT": activity.path, - "MASK": mask_layer, - "SOURCE_CRS": activity_layer.crs(), - "DESTINATION_CRS": activity_layer.crs(), - "TARGET_EXTENT": extent, - "OUTPUT": output, - "NO_DATA": settings_manager.get_value( - Settings.NCS_NO_DATA_VALUE, NO_DATA_VALUE - ), - } - - self.log_message( - f"Used parameters for masking activity {activity.name} " - f"using project mask layers: {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - results = processing.run( - "gdal:cliprasterbymasklayer", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - activity.path = results["OUTPUT"] - - except Exception as e: - self.log_message(f"Problem masking activities layers, {e} \n") - self.cancel_task(e) - return False - - return True - - def run_internal_activities_masking( - self, activities, extent, temporary_output=False - ): - """Applies the mask layers into the passed activities - - :param activities: List of the selected activities - :type activities: typing.List[Activity] - - :param extent: selected extent from user - :type extent: str - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: Whether the task operations was successful - :rtype: bool - """ - if self.processing_cancelled: - # Will not proceed if processing has been cancelled by the user - return False - - self.set_status_message( - tr("Masking activities using their respective mask layers.") - ) - - try: - for activity in activities: - masking_layers = activity.mask_paths - - if len(masking_layers) < 1: - self.log_message( - f"Skipping activity masking " - f"No mask layer(s) for activity {activity.name}" - ) - continue - if len(masking_layers) > 1: - initial_mask_layer = self.merge_vector_layers(masking_layers) - else: - mask_layer_path = masking_layers[0] - initial_mask_layer = QgsVectorLayer(mask_layer_path, "mask", "ogr") - - if not initial_mask_layer.isValid(): - self.log_message( - f"Skipping activity masking " - f"using layer {mask_layer_path}, not a valid layer." - ) - continue - - # see https://qgis.org/pyqgis/master/core/Qgis.html#qgis.core.Qgis.GeometryType - if Qgis.versionInt() < 33000: - layer_check = ( - initial_mask_layer.geometryType() == QgsWkbTypes.PolygonGeometry - ) - else: - layer_check = ( - initial_mask_layer.geometryType() == Qgis.GeometryType.Polygon - ) - - if not layer_check: - self.log_message( - f"Skipping activity masking " - f"using layer {mask_layer_path}, not a polygon layer." - ) - continue - - extent_layer = self.layer_extent(extent) - - if extent_layer.crs() != initial_mask_layer.crs(): - self.log_message( - "Skipping masking, the mask layers crs" - " do not match the scenario crs." - ) - continue - - if not extent_layer.extent().intersects(initial_mask_layer.extent()): - self.log_message( - "Skipping masking, the mask layers extent" - " and the scenario extent do not overlap." - ) - continue - - mask_layer = self.mask_layer_difference( - initial_mask_layer, extent_layer - ) - - if isinstance(mask_layer, str): - mask_layer = QgsVectorLayer(mask_layer, "ogr") - - if not mask_layer.isValid(): - self.log_message( - f"Skipping activity masking " - f"the created difference mask layer {mask_layer.source()}," - f"is not a valid layer." - ) - continue - if activity.path is None or activity.path == "": - if not self.processing_cancelled: - self.set_info_message( - tr( - f"Problem when masking activity, " - f"there is no map layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"Problem when masking activity, " - f"there is no map layer for the activity {activity.name}" - ) - else: - # If the user cancelled the processing - self.set_info_message( - tr(f"Processing has been cancelled by the user."), - level=Qgis.Critical, - ) - self.log_message(f"Processing has been cancelled by the user.") - - continue - - masked_activities_directory = os.path.join( - self.scenario_directory, "final_masked_activities" - ) - FileUtils.create_new_dir(masked_activities_directory) - file_name = clean_filename(activity.name.replace(" ", "_")) - - output_file = os.path.join( - masked_activities_directory, - f"{file_name}_{str(uuid.uuid4())[:4]}.tif", - ) - - output = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - activity_layer = QgsRasterLayer(activity.path, "activity_layer") - - if activity_layer.crs() != mask_layer.crs(): - self.log_message( - f"Skipping masking, activity layer and" - f" mask layer(s) have different CRS" - ) - continue - - if not activity_layer.extent().intersects(mask_layer.extent()): - self.log_message( - "Skipping masking, the extents of the activity layer " - "and mask layers do not overlap." - ) - continue - - # Actual processing calculation - alg_params = { - "INPUT": activity.path, - "MASK": mask_layer, - "SOURCE_CRS": activity_layer.crs(), - "DESTINATION_CRS": activity_layer.crs(), - "TARGET_EXTENT": extent, - "OUTPUT": output, - "NO_DATA": settings_manager.get_value( - Settings.NCS_NO_DATA_VALUE, NO_DATA_VALUE - ), - } - - self.log_message( - f"Used parameters for masking the activity {activity.name}" - f" using activity respective mask layer(s): {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - results = processing.run( - "gdal:cliprasterbymasklayer", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - activity.path = results["OUTPUT"] - - except Exception as e: - self.log_message(f"Problem masking activities layers, {e} \n") - self.cancel_task(e) - - return False - - return True - - def merge_vector_layers(self, layers): - """Merges the passed vector layers into a single layer - - :param layers: List of the vector layers paths - :type layers: typing.List[str] - - :return: Merged vector layer - :rtype: QgsMapLayer - """ - - input_map_layers = [] - - for layer_path in layers: - layer = QgsVectorLayer(layer_path, "mask", "ogr") - if layer.isValid(): - input_map_layers.append(layer) - else: - self.log_message( - f"Skipping invalid mask layer {layer_path} from masking." - ) - if len(input_map_layers) == 0: - return None - if len(input_map_layers) == 1: - return input_map_layers[0].source() - - self.set_status_message(tr("Merging mask layers")) - - # Actual processing calculation - alg_params = { - "LAYERS": input_map_layers, - "CRS": None, - "OUTPUT": QgsProcessing.TEMPORARY_OUTPUT, - } - - self.log_message(f"Used parameters for merging mask layers: {alg_params} \n") - - results = processing.run( - "native:mergevectorlayers", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - - return results["OUTPUT"] - - def layer_extent(self, extent): - """Creates a new vector layer contains has a - feature with geometry matching an extent parameter. - - :param extent: Extent parameter - :type extent: str - - :returns: Vector layer - :rtype: QgsVectorLayer - """ - - alg_params = { - "INPUT": extent, - "CRS": None, - "OUTPUT": QgsProcessing.TEMPORARY_OUTPUT, - } - - results = processing.run( - "native:extenttolayer", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - - return results["OUTPUT"] - - def mask_layer_difference(self, input_layer, overlay_layer): - """Creates a new vector layer that contains - difference of features between the two passed layers. - - :param input_layer: Input layer - :type input_layer: QgsVectorLayer - - :param overlay_layer: Target overlay layer - :type overlay_layer: QgsVectorLayer - - :returns: Vector layer - :rtype: QgsVectorLayer - """ - - alg_params = { - "INPUT": input_layer, - "OVERLAY": overlay_layer, - "OVERLAY_FIELDS_PREFIX": "", - "GRID_SIZE": None, - "OUTPUT": QgsProcessing.TEMPORARY_OUTPUT, - } - - results = processing.run( - "native:symmetricaldifference", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - - return results["OUTPUT"] - - def run_activities_sieve(self, models, temporary_output=False): - """Runs the sieve functionality analysis on the passed models layers, - removing the models layer polygons that are smaller than the provided - threshold size (in pixels) and replaces them with the pixel value of - the largest neighbour polygon. - - :param models: List of the analyzed activities - :type models: typing.List[ImplementationModel] - - :param extent: Selected area of interest extent - :type extent: str - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: Whether the task operations was successful - :rtype: bool - """ - if self.processing_cancelled: - # Will not proceed if processing has been cancelled by the user - return False - - self.set_status_message(tr("Applying sieve function to the activities")) - - try: - for model in models: - if model.path is None or model.path == "": - if not self.processing_cancelled: - self.set_info_message( - tr( - f"Problem when running sieve function on models, " - f"there is no map layer for the model {model.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"Problem when running sieve function on models, " - f"there is no map layer for the model {model.name}" - ) - else: - # If the user cancelled the processing - self.set_info_message( - tr(f"Processing has been cancelled by the user."), - level=Qgis.Critical, - ) - self.log_message(f"Processing has been cancelled by the user.") - - return False - - sieved_ims_directory = os.path.join( - self.scenario_directory, "sieved_ims" - ) - FileUtils.create_new_dir(sieved_ims_directory) - file_name = clean_filename(model.name.replace(" ", "_")) - - output_file = os.path.join( - sieved_ims_directory, f"{file_name}_{str(uuid.uuid4())[:4]}.tif" - ) - - threshold_value = float( - self.get_settings_value(Settings.SIEVE_THRESHOLD, default=10.0) - ) - - mask_layer = self.get_settings_value( - Settings.SIEVE_MASK_PATH, default="" - ) - - output = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - # Actual processing calculation - alg_params = { - "INPUT": model.path, - "THRESHOLD": threshold_value, - "MASK_LAYER": mask_layer, - "OUTPUT": output, - } - - self.log_message(f"Used parameters for sieving: {alg_params} \n") - - input_name = os.path.splitext(os.path.basename(model.path))[0] - - # Step 1: Create a binary mask from the original raster - binary_mask = processing.run( - "qgis:rastercalculator", - { - "CELLSIZE": 0, - "LAYERS": [model.path], - "CRS": None, - "EXPRESSION": f"{input_name}@1 > 0", - "OUTPUT": "TEMPORARY_OUTPUT", - }, - )["OUTPUT"] - - # Step 2: Run sieve analysis from on the binary mask - sieved_mask = processing.run( - "gdal:sieve", - { - "INPUT": binary_mask, - "THRESHOLD": threshold_value, - "EIGHT_CONNECTEDNESS": True, - "NO_MASK": True, - "MASK_LAYER": None, - "OUTPUT": "TEMPORARY_OUTPUT", - }, - context=self.processing_context, - feedback=self.feedback, - )["OUTPUT"] - - expr = f"({os.path.splitext(os.path.basename(sieved_mask))[0]}@1 > 0) * {os.path.splitext(os.path.basename(sieved_mask))[0]}@1" - - # Step 3: Remove and convert any no data value to 0 - sieved_mask_clean = processing.run( - "qgis:rastercalculator", - { - "CELLSIZE": 0, - "LAYERS": [sieved_mask], - "CRS": None, - "EXPRESSION": expr, - "OUTPUT": "TEMPORARY_OUTPUT", - }, - context=self.processing_context, - feedback=self.feedback, - )["OUTPUT"] - - expr_2 = f"{input_name}@1 * {os.path.splitext(os.path.basename(sieved_mask_clean))[0]}@1" - - # Step 4: Join the sieved mask with the original input layer to filter out the small areas - sieve_output = processing.run( - "qgis:rastercalculator", - { - "CELLSIZE": 0, - "LAYERS": [model.path, sieved_mask_clean], - "CRS": None, - "EXPRESSION": expr_2, - "OUTPUT": "TEMPORARY_OUTPUT", - }, - context=self.processing_context, - feedback=self.feedback, - )["OUTPUT"] - - # Step 5. Replace all 0 with -9999 using if ("combined@1" <= 0, -9999, "combined@1") - sieve_output_updated = processing.run( - "gdal:rastercalculator", - { - "INPUT_A": f"{sieve_output}", - "BAND_A": 1, - "FORMULA": "9999*(A<=0)*(-1)+A*(A>0)", - "NO_DATA": None, - "EXTENT_OPT": 0, - "PROJWIN": None, - "RTYPE": 5, - "OPTIONS": "", - "EXTRA": "", - "OUTPUT": "TEMPORARY_OUTPUT", - }, - context=self.processing_context, - feedback=self.feedback, - )["OUTPUT"] - - if not os.path.exists(sieve_output_updated): - self.log_message( - f"Problem running sieve function " - f"on models layers, sieve intermediate layer not found" - f" \n" - ) - self.cancel_task() - return False - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - # Step 6. Run sum statistics with ignore no data values set to false and no data value - results = processing.run( - "native:cellstatistics", - { - "INPUT": [sieve_output_updated], - "STATISTIC": 0, - "IGNORE_NODATA": False, - "REFERENCE_LAYER": sieve_output_updated, - "OUTPUT_NODATA_VALUE": settings_manager.get_value( - Settings.NCS_NO_DATA_VALUE, NO_DATA_VALUE - ), - "OUTPUT": output, - }, - context=self.processing_context, - feedback=self.feedback, - ) - - model.path = results["OUTPUT"] - - except Exception as e: - self.log_message(f"Problem running sieve function on models layers, {e} \n") - self.cancel_task(e) - return False - - return True - - def run_pathways_weighting( - self, activities, priority_layers_groups, extent, temporary_output=False - ) -> bool: - """Runs weighting analysis on the pathways in the activities using - the corresponding NCS PWLs. - - The formula is: (suitability_index * pathway) + - (priority group coefficient 1 * PWL 1) + - (priority group coefficient 2 * PWL 2) ... - - :param activities: List of the selected activities - :type activities: typing.List[Activity] - - :param priority_layers_groups: Used priority layers groups and their values - :type priority_layers_groups: dict - - :param extent: selected extent from user - :type extent: str - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: True if the task operation was successfully completed else False. - :rtype: bool - """ - if self.processing_cancelled: - return False - - self.set_status_message(tr(f"Weighting of pathways")) - - if len(activities) == 0: - msg = tr(f"No defined activities for running pathways weighting.") - self.set_info_message( - msg, - level=Qgis.Critical, - ) - self.log_message(msg) - return False - - # Get valid pathways - pathways = [] - activities_paths = [] - - try: - # Validate activities and corresponding pathways - for activity in activities: - if not activity.pathways and ( - activity.path is None or activity.path == "" - ): - self.set_info_message( - tr( - f"No defined activity pathways or an" - f" activity layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"No defined activity pathways or an " - f"activity layer for the activity {activity.name}" - ) - return False - - for pathway in activity.pathways: - if pathway not in pathways: - pathways.append(pathway) - - if activity.path is not None and activity.path != "": - activities_paths.append(activity.path) - - if not pathways and len(activities_paths) > 0: - self.run_activities_analysis(activities, extent) - return False - - suitability_index = float( - self.get_settings_value(Settings.PATHWAY_SUITABILITY_INDEX, default=0) - ) - - settings_priority_layers = self.get_priority_layers() - - weighted_pathways_directory = os.path.join( - self.scenario_directory, "weighted_pathways" - ) - FileUtils.create_new_dir(weighted_pathways_directory) - - for pathway in pathways: - # Skip processing if cancelled - if self.processing_cancelled: - return False - - base_names = [] - layers = [pathway.path] - run_calculation = False - - # Include suitability index if not zero - pathway_basename = Path(pathway.path).stem - if suitability_index > 0: - base_names.append(f'({suitability_index}*"{pathway_basename}@1")') - run_calculation = True - else: - base_names.append(f'("{pathway_basename}@1")') - - for layer in pathway.priority_layers: - if not any(priority_layers_groups): - self.log_message( - f"There are no defined priority layers in groups," - f" skipping the inclusion of PWLs in pathways " - f"weighting." - ) - break - - if layer is None: - continue - - settings_layer = self.get_priority_layer(layer.get("uuid")) - if settings_layer is None: - continue - - pwl = settings_layer.get("path") - - missing_pwl_message = ( - f"Path {pwl} for priority " - f"weighting layer {layer.get('name')} " - f"doesn't exist, skipping the layer " - f"from the pathway {pathway.name} weighting." - ) - if pwl is None: - self.log_message(missing_pwl_message) - continue - - pwl_path = Path(pwl) - - if not pwl_path.exists(): - self.log_message(missing_pwl_message) - continue - - pwl_path_basename = pwl_path.stem - - for priority_layer in settings_priority_layers: - if priority_layer.get("name") == layer.get("name"): - for group in priority_layer.get("groups", []): - value = group.get("value") - priority_group_coefficient = float(value) - if priority_group_coefficient > 0: - if pwl not in layers: - layers.append(pwl) - - pwl_expression = f'({priority_group_coefficient}*"{pwl_path_basename}@1")' - base_names.append(pwl_expression) - if not run_calculation: - run_calculation = True - - # No need to run the calculation if suitability index is - # zero or there are no PWLs in the activity. - if not run_calculation: - continue - - file_name = clean_filename(pathway.name.replace(" ", "_")) - output_file = os.path.join( - weighted_pathways_directory, - f"{file_name}_{str(uuid.uuid4())[:4]}.tif", - ) - expression = " + ".join(base_names) - - output = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - # Actual processing calculation - alg_params = { - "CELLSIZE": 0, - "CRS": None, - "EXPRESSION": expression, - "EXTENT": extent, - "LAYERS": layers, - "OUTPUT": output, - } - - self.log_message( - f" Used parameters for calculating weighting pathways {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - results = processing.run( - "qgis:rastercalculator", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - pathway.path = results["OUTPUT"] - - except Exception as e: - self.log_message(f"Problem weighting pathways, {e}\n") - self.cancel_task(e) - return False - - return True - - def run_activities_cleaning(self, activities, extent=None, temporary_output=False): - """Cleans the weighted activities replacing - zero values with no-data as they are not statistical meaningful for the - scenario analysis. - - :param activities: Activities to be cleaned up. - :type activities: typing.List[Activity] - - :param extent: Selected extent from user - :type extent: str - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: Whether the task operations was successful - :rtype: bool - """ - - if self.processing_cancelled: - return False - - self.set_status_message(tr("Updating activity values")) - - try: - for activity in activities: - if activity.path is None or activity.path == "": - self.set_info_message( - tr( - f"Problem when running activity updates, " - f"there is no map layer for the activity {activity.name}" - ), - level=Qgis.Critical, - ) - self.log_message( - f"Problem when running activity updates, " - f"there is no map layer for the activity {activity.name}" - ) - - return False - - layers = [activity.path] - - file_name = clean_filename(activity.name.replace(" ", "_")) - - weighted_pathways_dir = os.path.join( - self.scenario_directory, "weighted_pathways" - ) - FileUtils.create_new_dir(weighted_pathways_dir) - output_file = os.path.join( - weighted_pathways_dir, - f"{file_name}_{str(uuid.uuid4())[:4]}_cleaned.tif", - ) - - # Actual processing calculation - # The aim is to convert pixels values to no data, that is why we are - # using the sum operation with only one layer. - - output = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - alg_params = { - "IGNORE_NODATA": True, - "INPUT": layers, - "EXTENT": extent, - "OUTPUT_NODATA_VALUE": 0, - "REFERENCE_LAYER": layers[0] if len(layers) > 0 else None, - "STATISTIC": 0, # Sum - "OUTPUT": output, - } - - self.log_message( - f"Used parameters for " - f"updates on the cleaned activities: {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - results = processing.run( - "native:cellstatistics", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - activity.path = results["OUTPUT"] - - except Exception as e: - self.log_message(f"Problem cleaning activities, {e}") - self.cancel_task(e) - return False - - return True - - def run_highest_position_analysis(self, temporary_output=False): - """Runs the highest position analysis which is last step - in scenario analysis. Uses the activities set by the current ongoing - analysis. - - :param temporary_output: Whether to save the processing outputs as temporary - files - :type temporary_output: bool - - :returns: Whether the task operations was successful - :rtype: bool - - """ - if self.processing_cancelled: - # Will not proceed if processing has been cancelled by the user - return False - - passed_extent_box = self.analysis_extent.bbox - passed_extent = QgsRectangle( - passed_extent_box[0], - passed_extent_box[2], - passed_extent_box[1], - passed_extent_box[3], - ) - - # We explicitly set the created_date since the current implementation - # of the data model means that the attribute value is set only once when - # the class is loaded hence subsequent instances will have the same value. - self.scenario_result = ScenarioResult( - scenario=self.scenario, - scenario_directory=self.scenario_directory, - created_date=datetime.datetime.now(), - ) - - try: - layers = {} - - self.set_status_message(tr("Calculating the highest position")) - - for activity in self.analysis_activities: - if activity.path is not None and activity.path != "": - raster_layer = QgsRasterLayer(activity.path, activity.name) - layers[activity.name] = ( - raster_layer if raster_layer is not None else None - ) - else: - for pathway in activity.pathways: - layers[activity.name] = QgsRasterLayer(pathway.path) - - source_crs = QgsCoordinateReferenceSystem("EPSG:4326") - dest_crs = list(layers.values())[0].crs() if len(layers) > 0 else source_crs - - extent_string = ( - f"{passed_extent.xMinimum()},{passed_extent.xMaximum()}," - f"{passed_extent.yMinimum()},{passed_extent.yMaximum()}" - f" [{dest_crs.authid()}]" - ) - - output_file = os.path.join( - self.scenario_directory, - f"{SCENARIO_OUTPUT_FILE_NAME}_{str(self.scenario.uuid)[:4]}.tif", - ) - - # Preparing the input rasters for the highest position - # analysis in a correct order - activity_names = [activity.name for activity in self.analysis_activities] - all_activities = sorted( - self.analysis_activities, - key=lambda activity_instance: activity_instance.style_pixel_value, - ) - for index, activity in enumerate(all_activities): - activity.style_pixel_value = index + 1 - - all_activity_names = [activity.name for activity in all_activities] - sources = [] - for activity_name in all_activity_names: - if activity_name in activity_names: - sources.append(layers[activity_name].source()) - - self.log_message( - f"Layers sources {[Path(source).stem for source in sources]}" - ) - - output_file = ( - QgsProcessing.TEMPORARY_OUTPUT if temporary_output else output_file - ) - - alg_params = { - "IGNORE_NODATA": True, - "INPUT_RASTERS": sources, - "EXTENT": extent_string, - "OUTPUT_NODATA_VALUE": settings_manager.get_value( - Settings.NCS_NO_DATA_VALUE, NO_DATA_VALUE - ), - "REFERENCE_LAYER": list(layers.values())[0] - if len(layers) >= 1 - else None, - "OUTPUT": output_file, - } - - self.log_message( - f"Used parameters for highest position analysis {alg_params} \n" - ) - - self.feedback = QgsProcessingFeedback() - self.feedback.progressChanged.connect(self.update_progress) - - if self.processing_cancelled: - return False - - self.output = processing.run( - "native:highestpositioninrasterstack", - alg_params, - context=self.processing_context, - feedback=self.feedback, - ) - - except Exception as err: - self.log_message( - tr( - "An error occurred when running task for " - 'scenario analysis, error message "{}"'.format(str(err)) - ) - ) - self.cancel_task(err) - return False - - return True diff --git a/test/model_data_for_testing.py b/test/model_data_for_testing.py index 514f24eb6..5990e41ae 100644 --- a/test/model_data_for_testing.py +++ b/test/model_data_for_testing.py @@ -19,7 +19,7 @@ UUID_ATTRIBUTE, ) from cplus_plugin.definitions.defaults import PILOT_AREA_EXTENT -from cplus_plugin.models.base import ( +from cplus_core.models.base import ( Activity, LayerType, NcsPathway, diff --git a/test/test_activity_widget.py b/test/test_activity_widget.py index 9581f5956..0c460ff13 100644 --- a/test/test_activity_widget.py +++ b/test/test_activity_widget.py @@ -2,7 +2,6 @@ from unittest.mock import MagicMock from qgis.gui import QgsMessageBar from cplus_plugin.gui.activity_widget import ActivityContainerWidget -from cplus_plugin.models.base import Activity, NcsPathway from cplus_plugin.gui.component_item_model import ActivityItem, NcsPathwayItem from model_data_for_testing import get_activity, get_valid_ncs_pathway from utilities_for_testing import get_qgis_app diff --git a/test/test_api_request.py b/test/test_api_request.py index dd5d73b88..a96879264 100644 --- a/test/test_api_request.py +++ b/test/test_api_request.py @@ -20,7 +20,7 @@ from cplus_plugin.api.carbon import IrrecoverableCarbonDownloadTask from cplus_plugin.conf import settings_manager, Settings from cplus_plugin.definitions.defaults import BASE_API_URL, IRRECOVERABLE_CARBON_API_URL -from cplus_plugin.models.base import DataSourceType +from cplus_plugin.models.source import DataSourceType from utilities_for_testing import get_qgis_app diff --git a/test/test_data_model_helpers.py b/test/test_data_model_helpers.py index e43ed5dfc..0788c8905 100644 --- a/test/test_data_model_helpers.py +++ b/test/test_data_model_helpers.py @@ -5,7 +5,7 @@ from unittest import TestCase -from cplus_plugin.models.base import NcsPathway +from cplus_core.models.base import NcsPathway from cplus_plugin.models.helpers import ( clone_layer_component, create_metric_configuration, diff --git a/test/test_metrics.py b/test/test_metrics.py index 955f0aa58..385ecc8f2 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -23,7 +23,7 @@ register_metric_functions, unregister_metric_functions, ) -from cplus_plugin.models.base import DataSourceType +from cplus_plugin.models.source import DataSourceType from cplus_plugin.models.helpers import create_metric_configuration from cplus_plugin.models.report import ActivityContextInfo diff --git a/test/test_scenario_history_tasks.py b/test/test_scenario_history_tasks.py index 56e343652..4bf2640cb 100644 --- a/test/test_scenario_history_tasks.py +++ b/test/test_scenario_history_tasks.py @@ -1,3 +1,4 @@ +import os import unittest import uuid from unittest.mock import patch, MagicMock @@ -7,7 +8,8 @@ DeleteScenarioTask, ) from cplus_plugin.api.request import CplusApiRequest -from cplus_plugin.models.base import Scenario, SpatialExtent +from cplus_core.models.base import Scenario, SpatialExtent +from cplus_core.analysis import TaskConfig class TestFetchScenarioHistoryTask(unittest.TestCase): @@ -139,15 +141,17 @@ def test_run_success( ) analysis_activities = scenario.activities analysis_priority_layers_groups = scenario.priority_layer_groups - task = FetchScenarioOutputTask( - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, + task_config = TaskConfig( scenario, - None, + [], + analysis_priority_layers_groups, + analysis_activities, + analysis_activities, + pathway_suitability_index=1.0, + carbon_coefficient=1.0, + base_dir=os.path.join("/tmp", str(scenario.uuid)), ) + task = FetchScenarioOutputTask(task_config, None) with patch.object( FetchScenarioOutputTask, "fetch_scenario_output" @@ -182,15 +186,17 @@ def test_run_failure(self, mock_log): ) analysis_activities = scenario.activities analysis_priority_layers_groups = scenario.priority_layer_groups - task = FetchScenarioOutputTask( - analysis_scenario_name, - analysis_scenario_description, - analysis_activities, - analysis_priority_layers_groups, - analysis_extent, + task_config = TaskConfig( scenario, - None, + [], + analysis_priority_layers_groups, + analysis_activities, + analysis_activities, + pathway_suitability_index=1.0, + carbon_coefficient=1.0, + base_dir=os.path.join("/tmp", str(scenario.uuid)), ) + task = FetchScenarioOutputTask(task_config, None) with patch.object( CplusApiRequest, diff --git a/test/test_scenario_tasks.py b/test/test_scenario_tasks.py index 3554c55b8..3a9961d8a 100644 --- a/test/test_scenario_tasks.py +++ b/test/test_scenario_tasks.py @@ -7,19 +7,17 @@ import os import uuid -import processing import datetime from processing.core.Processing import Processing from qgis.core import QgsRasterLayer +from cplus_core.analysis import ScenarioAnalysisTask, TaskConfig +from cplus_core.models.base import Scenario, NcsPathway, Activity, SpatialExtent +from cplus_core.utils.helper import BaseFileUtils from cplus_plugin.conf import settings_manager, Settings -from cplus_plugin.tasks import ScenarioAnalysisTask -from cplus_plugin.utils import FileUtils -from cplus_plugin.models.base import Scenario, NcsPathway, Activity, SpatialExtent - class ScenarioAnalysisTaskTest(unittest.TestCase): def setUp(self): @@ -98,14 +96,24 @@ def test_scenario_pathways_weighting(self): priority_layer_groups=[], ) - analysis_task = ScenarioAnalysisTask( - "test_scenario_pathways_weighting", - "test_scenario_pathways_weighting_description", - [test_activity], - [], - test_layer.extent(), + scenario_directory = os.path.join( + f"{base_dir}", + f'scenario_{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}' + f"_{str(uuid.uuid4())[:4]}", + ) + BaseFileUtils.create_new_dir(scenario_directory) + + task_config = TaskConfig( scenario, + [], + [], + [test_activity], + [test_activity], + pathway_suitability_index=1.0, + carbon_coefficient=1.0, + base_dir=scenario_directory, ) + analysis_task = ScenarioAnalysisTask(task_config) extent_string = ( f"{test_extent.xMinimum()},{test_extent.xMaximum()}," @@ -213,14 +221,29 @@ def test_scenario_activities_creation(self): priority_layer_groups=[], ) - analysis_task = ScenarioAnalysisTask( - "test_scenario_activities_creation", - "test_scenario_activities_creation_description", - [test_activity], - [], - test_extent, + base_dir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "data", + "pathways", + ) + scenario_directory = os.path.join( + f"{base_dir}", + f'scenario_{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}' + f"_{str(uuid.uuid4())[:4]}", + ) + BaseFileUtils.create_new_dir(scenario_directory) + + task_config = TaskConfig( scenario, + [], + [], + [test_activity], + [test_activity], + pathway_suitability_index=1.0, + carbon_coefficient=1.0, + base_dir=scenario_directory, ) + analysis_task = ScenarioAnalysisTask(task_config) extent_string = ( f"{test_extent.xMinimum()},{test_extent.xMaximum()}," @@ -316,16 +339,31 @@ def test_scenario_activities_masking(self): extent=spatial_extent, priority_layer_groups=[], ) + base_dir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "data", + "pathways", + ) + scenario_directory = os.path.join( + f"{base_dir}", + f'scenario_{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}' + f"{str(uuid.uuid4())[:4]}", + ) + BaseFileUtils.create_new_dir(scenario_directory) - analysis_task = ScenarioAnalysisTask( - "test_scenario_activities_masking", - "test_scenario_activities_masking_description", - [test_activity], - [], - test_extent, + task_config = TaskConfig( scenario, + [], + [], + [test_activity], + [test_activity], + pathway_suitability_index=1.0, + carbon_coefficient=1.0, + base_dir=scenario_directory, ) + analysis_task = ScenarioAnalysisTask(task_config) + extent_string = ( f"{test_extent.xMinimum()},{test_extent.xMaximum()}," f"{test_extent.yMinimum()},{test_extent.yMaximum()}" @@ -338,12 +376,6 @@ def test_scenario_activities_masking(self): "activities", ) - scenario_directory = os.path.join( - f"{base_dir}", - f'scenario_{datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}' - f"_{str(uuid.uuid4())[:4]}", - ) - analysis_task.scenario_directory = scenario_directory settings_manager.set_value(Settings.BASE_DIR, base_dir) @@ -359,6 +391,7 @@ def test_scenario_activities_masking(self): [test_activity], extent_string, temporary_output=True ) + print(results) self.assertTrue(results) self.assertIsInstance(results, bool) diff --git a/test/test_settings.py b/test/test_settings.py index 766a72194..a926bcf3e 100644 --- a/test/test_settings.py +++ b/test/test_settings.py @@ -3,7 +3,7 @@ from utilities_for_testing import get_qgis_app from cplus_plugin.definitions.defaults import IRRECOVERABLE_CARBON_API_URL -from cplus_plugin.models.base import DataSourceType +from cplus_plugin.models.source import DataSourceType from cplus_plugin.gui.settings.cplus_options import CplusSettings from cplus_plugin.gui.settings.report_options import ReportSettingsWidget from cplus_plugin.conf import (