Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions infrahub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(
self.group_context: InfrahubGroupContext | InfrahubGroupContextSync
self._initialize()
self._request_context: RequestContext | None = None
_ = self.config.tls_context # Early load of the TLS context to catch errors

def _initialize(self) -> None:
"""Sets the properties for each version of the client"""
Expand Down Expand Up @@ -1024,7 +1025,7 @@ async def _default_request_method(

async with httpx.AsyncClient(
**proxy_config, # type: ignore[arg-type]
verify=self.config.tls_ca_file if self.config.tls_ca_file else not self.config.tls_insecure,
verify=self.config.tls_context,
) as client:
try:
response = await client.request(
Expand Down Expand Up @@ -2748,7 +2749,7 @@ def _default_request_method(

with httpx.Client(
**proxy_config, # type: ignore[arg-type]
verify=self.config.tls_ca_file if self.config.tls_ca_file else not self.config.tls_insecure,
verify=self.config.tls_context,
) as client:
try:
response = client.request(
Expand Down
31 changes: 29 additions & 2 deletions infrahub_sdk/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import ssl
from copy import deepcopy
from typing import Any

from pydantic import Field, field_validator, model_validator
from pydantic import Field, PrivateAttr, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing_extensions import Self

Expand Down Expand Up @@ -78,6 +79,7 @@ class ConfigBase(BaseSettings):
Can be useful to test with self-signed certificates.""",
)
tls_ca_file: str | None = Field(default=None, description="File path to CA cert or bundle in PEM format")
_ssl_context: ssl.SSLContext | None = PrivateAttr(default=None)

@model_validator(mode="before")
@classmethod
Expand Down Expand Up @@ -133,6 +135,28 @@ def default_infrahub_branch(self) -> str:
def password_authentication(self) -> bool:
return bool(self.username)

@property
def tls_context(self) -> ssl.SSLContext:
if self._ssl_context:
return self._ssl_context

if self.tls_insecure:
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self._ssl_context.check_hostname = False
self._ssl_context.verify_mode = ssl.CERT_NONE
return self._ssl_context

if self.tls_ca_file:
self._ssl_context = ssl.create_default_context(cafile=self.tls_ca_file)
Comment on lines +143 to +150
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate mutual exclusivity of tls_insecure and tls_ca_file.

When tls_insecure=True, the tls_ca_file is silently ignored (lines 149-150 are never reached). This could confuse users who set both options, expecting the CA file to be used.

Consider adding a model validator to enforce mutual exclusivity:

+    @model_validator(mode="after")
+    def validate_tls_config(self) -> Self:
+        if self.tls_insecure and self.tls_ca_file:
+            raise ValueError("'tls_insecure' and 'tls_ca_file' are mutually exclusive")
+        return self
+
     @property
     def tls_context(self) -> ssl.SSLContext:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self.tls_insecure:
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self._ssl_context.check_hostname = False
self._ssl_context.verify_mode = ssl.CERT_NONE
return self._ssl_context
if self.tls_ca_file:
self._ssl_context = ssl.create_default_context(cafile=self.tls_ca_file)
@model_validator(mode="after")
def validate_tls_config(self) -> Self:
if self.tls_insecure and self.tls_ca_file:
raise ValueError("'tls_insecure' and 'tls_ca_file' are mutually exclusive")
return self
@property
def tls_context(self) -> ssl.SSLContext:
if self.tls_insecure:
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self._ssl_context.check_hostname = False
self._ssl_context.verify_mode = ssl.CERT_NONE
return self._ssl_context
if self.tls_ca_file:
self._ssl_context = ssl.create_default_context(cafile=self.tls_ca_file)
🤖 Prompt for AI Agents
In infrahub_sdk/config.py around lines 143 to 150, the code silently ignores
tls_ca_file when tls_insecure=True; add a model-level validation that enforces
mutual exclusivity by checking both attributes after initialization and raising
a clear ValueError (or ValidationError) if both tls_insecure and tls_ca_file are
provided; implement this as a pydantic @root_validator (or equivalent) in the
Config model so users receive a deterministic error instead of having
tls_ca_file silently ignored.

Comment on lines +149 to +150
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate tls_ca_file exists before passing to SSL context.

Line 150 passes tls_ca_file to ssl.create_default_context without verifying the file exists. This will raise a runtime error with a potentially unclear message if the file is missing or inaccessible.

Add a field validator to check file existence:

+    @field_validator("tls_ca_file")
+    @classmethod
+    def validate_tls_ca_file(cls, value: str | None) -> str | None:
+        if value is not None:
+            from pathlib import Path
+            if not Path(value).is_file():
+                raise ValueError(f"TLS CA file not found: {value}")
+        return value
+
     @property
     def tls_context(self) -> ssl.SSLContext:
🤖 Prompt for AI Agents
In infrahub_sdk/config.py around lines 149-150, the code passes self.tls_ca_file
into ssl.create_default_context without verifying the path exists; add a
validator that checks tls_ca_file is not None and points to an existing,
readable file (use pathlib.Path(self.tls_ca_file).is_file() or os.path.exists +
os.access), raise a clear ValueError with a helpful message if the file is
missing or inaccessible, and only call ssl.create_default_context(cafile=...)
when the validator passes. Ensure the validator runs during config
construction/validation so runtime ssl.create_default_context is never called
with an invalid path.


if self._ssl_context is None:
self._ssl_context = ssl.create_default_context()

return self._ssl_context

def set_ssl_context(self, context: ssl.SSLContext) -> None:
self._ssl_context = context


class Config(ConfigBase):
recorder: RecorderType = Field(default=RecorderType.NONE, description="Select builtin recorder for later replay.")
Expand Down Expand Up @@ -174,4 +198,7 @@ def clone(self, branch: str | None = None) -> Config:
if field not in covered_keys:
config[field] = deepcopy(getattr(self, field))

return Config(**config)
new_config = Config(**config)
if self._ssl_context:
new_config.set_ssl_context(self._ssl_context)
return new_config
86 changes: 85 additions & 1 deletion tests/unit/sdk/test_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import inspect
import ssl
from pathlib import Path

import pytest
from pytest_httpx import HTTPXMock

from infrahub_sdk import InfrahubClient, InfrahubClientSync
from infrahub_sdk import Config, InfrahubClient, InfrahubClientSync
from infrahub_sdk.exceptions import NodeNotFoundError
from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync
from tests.unit.sdk.conftest import BothClients
Expand All @@ -28,6 +30,88 @@

client_types = ["standard", "sync"]

CURRENT_DIRECTORY = Path(__file__).parent


async def test_verify_config_caches_default_ssl_context(monkeypatch) -> None:
contexts: list[tuple[str | None, object]] = []

def fake_create_default_context(*args: object, **kwargs: object) -> object:
context = object()
contexts.append((kwargs.get("cafile"), context))
return context

monkeypatch.setattr("ssl.create_default_context", fake_create_default_context)

client = InfrahubClient(config=Config(address="http://mock"))

first = client.config.tls_context
second = client.config.tls_context

assert first is second
assert contexts == [(None, first)]


async def test_verify_config_caches_tls_ca_file_context(monkeypatch) -> None:
contexts: list[tuple[str | None, object]] = []

def fake_create_default_context(*args: object, **kwargs: object) -> object:
context = object()
contexts.append((kwargs.get("cafile"), context))
return context

monkeypatch.setattr("ssl.create_default_context", fake_create_default_context)

client = InfrahubClient(
config=Config(address="http://mock", tls_ca_file=str(CURRENT_DIRECTORY / "test_data/path-1.pem"))
)

first = client.config.tls_context
second = client.config.tls_context

assert first is second
assert contexts == [(str(CURRENT_DIRECTORY / "test_data/path-1.pem"), first)]

client.config.tls_ca_file = str(CURRENT_DIRECTORY / "test_data/path-2.pem")
third = client.config.tls_context

assert third is first
assert contexts == [
(str(CURRENT_DIRECTORY / "test_data/path-1.pem"), first),
]


async def test_verify_config_respects_tls_insecure(monkeypatch) -> None:
def fake_create_default_context(*args: object, **kwargs: object) -> object:
raise AssertionError("create_default_context should not be called when TLS is insecure")

monkeypatch.setattr("ssl.create_default_context", fake_create_default_context)

client = InfrahubClient(config=Config(address="http://mock", tls_insecure=True))

verify_value = client.config.tls_context

assert verify_value.check_hostname is False
assert verify_value.verify_mode == ssl.CERT_NONE


async def test_verify_config_uses_custom_tls_context(monkeypatch) -> None:
def fake_create_default_context(*args: object, **kwargs: object) -> object:
raise AssertionError("create_default_context should not be called when custom context is provided")

monkeypatch.setattr("ssl.create_default_context", fake_create_default_context)

config = Config(address="http://mock")
custom_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
config.set_ssl_context(custom_context)

client = InfrahubClient(config=config)

clone_client = client.clone()

assert client.config.tls_context is custom_context
assert clone_client.config.tls_context is custom_context


async def test_method_sanity() -> None:
"""Validate that there is at least one public method and that both clients look the same."""
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/sdk/test_data/path-1.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-----BEGIN CERTIFICATE-----
MIIBQDCB86ADAgECAhR6y429KiST51bZy+t330M7dN5SbzAFBgMrZXAwFjEUMBIG
A1UEAwwLZXhhbXBsZS5jb20wHhcNMjUxMDE1MTE0MjUwWhcNMzUxMDEzMTE0MjUw
WjAWMRQwEgYDVQQDDAtleGFtcGxlLmNvbTAqMAUGAytlcAMhAPIl8y8AXSWF33vX
JT2YwhMJzarOuSdPif01Gxr3Rr6Lo1MwUTAdBgNVHQ4EFgQU4heN1ZhyXpOujgcJ
WZ4LQk2m7RAwHwYDVR0jBBgwFoAU4heN1ZhyXpOujgcJWZ4LQk2m7RAwDwYDVR0T
AQH/BAUwAwEB/zAFBgMrZXADQQBoEf+8R+KWwGdaoeqinWOvrqbVZatMis0eUMvA
o+vABSPU7LIYGxLT6fpUwFSTvempzNqGZMVJ9UvVH+hYDU4D
-----END CERTIFICATE-----
9 changes: 9 additions & 0 deletions tests/unit/sdk/test_data/path-2.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-----BEGIN CERTIFICATE-----
MIIBQDCB86ADAgECAhQTRmRZxUSA5L7VfYJb3/t+dRK0ETAFBgMrZXAwFjEUMBIG
A1UEAwwLZXhhbXBsZS5jb20wHhcNMjUxMDE1MTE0MzM0WhcNMzUxMDEzMTE0MzM0
WjAWMRQwEgYDVQQDDAtleGFtcGxlLmNvbTAqMAUGAytlcAMhAK1O3ZhE5qzfT7Qx
+0My3ToDVDi5wwpllkKn0X50zXFao1MwUTAdBgNVHQ4EFgQUH+qBMU+h4t1vdLbO
jMSSgXdURewwHwYDVR0jBBgwFoAUH+qBMU+h4t1vdLbOjMSSgXdURewwDwYDVR0T
AQH/BAUwAwEB/zAFBgMrZXADQQB3Z03f3gQcktxk4h/v8pVi5soz8viPx17TSPXf
1WYG+Jlk4C5GQ+tyjZgZUE9LL2BFRYBv28V/NPT/0TjPGtcC
-----END CERTIFICATE-----