Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions geonode/security/request_configuration_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#########################################################################
#
# Copyright (C) 2025 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
from django.conf import settings
from geonode.base.auth import get_or_create_token
from geonode.security.request_configuration_registry import BaseRequestConfigurationRuleHandler


class BaseConfigurationRuleHandler(BaseRequestConfigurationRuleHandler):
"""
Base handler for configuration rules.
"""

def get_rules(self, user):
rules = []
token_obj = get_or_create_token(user)
access_token = token_obj.token

rules.extend(
[
{
"urlPattern": f"{settings.GEOSERVER_WEB_UI_LOCATION.rstrip('/')}/.*",
"params": {"access_token": access_token},
},
{"urlPattern": f"{settings.SITEURL.rstrip('/')}/gs.*", "params": {"access_token": access_token}},
{
"urlPattern": f"{settings.SITEURL.rstrip('/')}/api/v2.*",
"headers": {"Authorization": f"Bearer {access_token}"},
},
]
)
return rules
79 changes: 79 additions & 0 deletions geonode/security/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
get_users_with_perms,
get_visible_resources,
)
from .request_configuration_handlers import BaseConfigurationRuleHandler

from .permissions import PermSpec, PermSpecCompact
from django.core.cache import cache
Expand Down Expand Up @@ -3086,6 +3087,84 @@ def test_group_managers_permissons_handler(self):
self.assertListEqual(updated_perms_empty["users"][self.group_manager], [])


class TestBaseConfigurationRuleHandler(GeoNodeBaseTestSupport):
"""Test case for BaseConfigurationRuleHandler"""

def _verify_rules(self, rules, expected_token, user_label=""):
"""Helper method to verify the rules structure and content."""
# Verify the structure and content of the rules
self.assertEqual(len(rules), 3, f"{user_label}: Should have 3 rules")

# Test first rule (GEOSERVER_WEB_UI_LOCATION)
self.assertEqual(
rules[0]["urlPattern"],
"https://example.com/geoserver/.*",
f"{user_label}: Incorrect GEOSERVER_WEB_UI_LOCATION pattern",
)
self.assertEqual(
rules[0]["params"]["access_token"],
expected_token,
f"{user_label}: Token mismatch in GEOSERVER_WEB_UI_LOCATION rule",
)

# Test second rule (HOSTNAME/gs*)
self.assertEqual(
rules[1]["urlPattern"], "https://example.com/gs.*", f"{user_label}: Incorrect HOSTNAME/gs pattern"
)
self.assertEqual(
rules[1]["params"]["access_token"], expected_token, f"{user_label}: Token mismatch in HOSTNAME/gs rule"
)

# Test third rule (HOSTNAME/api/v2*)
self.assertEqual(
rules[2]["urlPattern"], "https://example.com/api/v2.*", f"{user_label}: Incorrect HOSTNAME/api/v2 pattern"
)
self.assertEqual(
rules[2]["headers"]["Authorization"],
f"Bearer {expected_token}",
f"{user_label}: Token mismatch in HOSTNAME/api/v2 rule",
)

@override_settings(GEOSERVER_WEB_UI_LOCATION="https://example.com/geoserver", SITEURL="https://example.com")
def test_get_rules_with_multiple_users(self):
"""Test that get_rules returns the expected rules with valid tokens for multiple users."""
# Create test users
user1 = get_user_model().objects.create_user(
username="testuser1", password="testpass1", email="user1@example.com"
)

user2 = get_user_model().objects.create_user(
username="testuser2", password="testpass2", email="user2@example.com"
)

# Get or create tokens for the users
token1 = get_or_create_token(user1).token
token2 = get_or_create_token(user2).token

self.assertNotEqual(token1, token2, "Tokens should be unique per user")

# Initialize the handler
handler = BaseConfigurationRuleHandler()

# Test for user1
rules1 = handler.get_rules(user1)
self._verify_rules(rules1, token1, "user1")

# Test for user2
rules2 = handler.get_rules(user2)
self._verify_rules(rules2, token2, "user2")

# Verify tokens are used correctly for each user
self.assertEqual(rules1[0]["params"]["access_token"], token1, "User1's token should be used in the rules")
self.assertEqual(rules2[0]["params"]["access_token"], token2, "User2's token should be used in the rules")

# Verify tokens exist in the database
from oauth2_provider.models import AccessToken

self.assertTrue(AccessToken.objects.filter(token=token1).exists(), "User1's token should exist in the database")
self.assertTrue(AccessToken.objects.filter(token=token2).exists(), "User2's token should exist in the database")


@override_settings(
CACHES={
"default": {
Expand Down
2 changes: 2 additions & 0 deletions geonode/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,8 @@ def get_geonode_catalogue_service():
"geonode.security.handlers.AdvancedWorkflowPermissionsHandler",
]

REQUEST_CONFIGURATION_RULES_HANDLERS = ["geonode.security.request_configuration_handlers.BaseConfigurationRuleHandler"]

FEATURE_VALIDATORS = [
"geonode.upload.feature_validators.GeoserverFeatureValidator",
]
Expand Down
Loading