diff --git a/src/codeflare_sdk/ray/__init__.py b/src/codeflare_sdk/ray/__init__.py
index 7bd0b2c8..396b6e1a 100644
--- a/src/codeflare_sdk/ray/__init__.py
+++ b/src/codeflare_sdk/ray/__init__.py
@@ -10,6 +10,7 @@
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
+ KueueWorkloadInfo,
)
from .cluster import (
@@ -21,4 +22,4 @@
RayClusterStatus,
CodeFlareClusterStatus,
RayCluster,
-)
+)
\ No newline at end of file
diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py
index 8538dba3..04cd7b76 100644
--- a/src/codeflare_sdk/ray/cluster/cluster.py
+++ b/src/codeflare_sdk/ray/cluster/cluster.py
@@ -19,7 +19,7 @@
"""
from time import sleep
-from typing import List, Optional, Tuple, Dict
+from typing import List as ListType, Optional, Tuple, Dict, Any
import copy
from ray.job_submission import JobSubmissionClient, JobStatus
@@ -44,6 +44,7 @@
RayCluster,
RayClusterStatus,
)
+from ..rayjobs.status import KueueWorkloadInfo
from ..appwrapper import (
AppWrapper,
AppWrapperStatus,
@@ -62,7 +63,13 @@
from kubernetes import client as k8s_client
from kubernetes.client.rest import ApiException
-from kubernetes.client.rest import ApiException
+# Flag to track if ipywidgets is available
+WIDGETS_AVAILABLE = True
+try:
+ import ipywidgets as widgets
+ from IPython.display import display
+except ImportError:
+ WIDGETS_AVAILABLE = False
CF_SDK_FIELD_MANAGER = "codeflare-sdk"
@@ -74,6 +81,500 @@ class Cluster:
Note that currently, the underlying implementation is a Ray cluster.
"""
+
+ # Class variable for global widget preference
+ _default_use_widgets = False
+
+ @classmethod
+ def set_widgets_default(cls, use_widgets: bool) -> None:
+ """Set the default widget display preference for all Cluster methods."""
+ cls._default_use_widgets = use_widgets
+
+ @classmethod
+ def get_widgets_default(cls) -> bool:
+ """Get the current default widget display preference."""
+ return cls._default_use_widgets
+
+ @staticmethod
+ def Status(cluster_name: str, namespace: str = "default", use_widgets: Optional[bool] = None, return_status: bool = False) -> Optional[Tuple[RayClusterStatus, bool]]:
+ """
+ Get the status of a RayCluster.
+
+ Args:
+ cluster_name: Name of the RayCluster
+ namespace: Kubernetes namespace
+ use_widgets: Whether to use Jupyter widgets for display (overrides global setting)
+ return_status: Whether to return the status tuple instead of displaying
+
+ Returns:
+ Optional[Tuple[RayClusterStatus, bool]]: Status and ready state if return_status=True
+ """
+ # Check if Kubernetes config is available
+ if not config_check():
+ return None if not return_status else (RayClusterStatus.UNKNOWN, False)
+
+ # Determine widget usage
+ use_widgets = use_widgets if use_widgets is not None else Cluster._default_use_widgets
+ if use_widgets and not WIDGETS_AVAILABLE:
+ # Widgets not available, falling back to console output
+ pass
+ use_widgets = False
+
+ try:
+ # Get RayCluster
+ api = client.CustomObjectsApi(get_api_client())
+ cluster = api.get_namespaced_custom_object(
+ group="ray.io",
+ version="v1",
+ namespace=namespace,
+ plural="rayclusters",
+ name=cluster_name
+ )
+
+ if not cluster:
+ if use_widgets:
+ Cluster._display_cluster_status_widget(None, cluster_name, namespace)
+ else:
+ print_no_resources_found()
+ return None if not return_status else (RayClusterStatus.UNKNOWN, False)
+
+ # Parse cluster info
+ cluster_info = _map_to_ray_cluster(cluster)
+ if not cluster_info:
+ if use_widgets:
+ Cluster._display_cluster_status_widget(None, cluster_name, namespace)
+ else:
+ print_no_resources_found()
+ return None if not return_status else (RayClusterStatus.UNKNOWN, False)
+
+ # Check if cluster is managed by AppWrapper
+ if cluster["metadata"].get("ownerReferences"):
+ for owner in cluster["metadata"]["ownerReferences"]:
+ if owner["kind"] == "AppWrapper":
+ cluster_info.is_appwrapper = True
+ break
+
+ # Get Kueue workload info if cluster is managed by AppWrapper
+ if cluster_info.is_appwrapper:
+ workload_info = Cluster._get_cluster_kueue_workload_info(cluster_name, namespace)
+ if workload_info:
+ cluster_info.local_queue = workload_info.name
+ cluster_info.kueue_workload = workload_info
+
+ # Get dashboard URL
+ cluster_info.dashboard = Cluster._get_cluster_dashboard_url(cluster_name, namespace)
+
+ # Display status
+ if use_widgets:
+ Cluster._display_cluster_status_widget(cluster_info)
+ else:
+ print_cluster_status(cluster_info)
+
+ # Return status tuple if requested
+ if return_status:
+ ready = cluster_info.status == RayClusterStatus.READY
+ return cluster_info.status, ready
+
+ return None
+
+ except Exception as e:
+ error_msg = _kube_api_error_handling(e)
+ if return_status:
+ return RayClusterStatus.UNKNOWN, False
+ return None
+
+ @staticmethod
+ def List(namespace: str = "default", page: int = 1, page_size: int = 10, use_widgets: Optional[bool] = None, return_list: bool = False) -> Optional[ListType[RayCluster]]:
+ """
+ List all RayClusters in the specified namespace with pagination.
+
+ Args:
+ namespace: Kubernetes namespace
+ page: Page number (1-based)
+ page_size: Number of items per page
+ use_widgets: Whether to use Jupyter widgets for display (overrides global setting)
+ return_list: Whether to return the list instead of displaying
+
+ Returns:
+ Optional[List[RayCluster]]: List of RayCluster objects if return_list=True
+ """
+ # Check if Kubernetes config is available
+ if not config_check():
+ return None if not return_list else []
+
+ # Determine widget usage
+ use_widgets = use_widgets if use_widgets is not None else Cluster._default_use_widgets
+ if use_widgets and not WIDGETS_AVAILABLE:
+ # Widgets not available, falling back to console output
+ pass
+ use_widgets = False
+
+ try:
+ # Get all RayClusters
+ api = client.CustomObjectsApi(get_api_client())
+ clusters = api.list_namespaced_custom_object(
+ group="ray.io",
+ version="v1",
+ namespace=namespace,
+ plural="rayclusters"
+ )
+
+ if not clusters or not clusters.get("items"):
+ if return_list:
+ return []
+ if use_widgets:
+ Cluster._display_clusters_list_widget([], page, page_size)
+ else:
+ print_no_resources_found()
+ return None
+
+ # Parse cluster info for each cluster
+ cluster_infos = []
+ for cluster in clusters["items"]:
+ cluster_info = _map_to_ray_cluster(cluster)
+ if not cluster_info:
+ continue
+
+ # Check if cluster is managed by AppWrapper
+ if cluster["metadata"].get("ownerReferences"):
+ for owner in cluster["metadata"]["ownerReferences"]:
+ if owner["kind"] == "AppWrapper":
+ cluster_info.is_appwrapper = True
+ break
+
+ # Get Kueue workload info if cluster is managed by AppWrapper
+ if cluster_info.is_appwrapper:
+ workload_info = Cluster._get_cluster_kueue_workload_info(cluster["metadata"]["name"], namespace)
+ if workload_info:
+ cluster_info.local_queue = workload_info.name
+ cluster_info.kueue_workload = workload_info
+
+ # Get dashboard URL
+ cluster_info.dashboard = Cluster._get_cluster_dashboard_url(cluster["metadata"]["name"], namespace)
+
+ cluster_infos.append(cluster_info)
+
+ # Sort clusters by name
+ cluster_infos.sort(key=lambda x: x.name)
+
+ # Calculate pagination
+ start_idx = (page - 1) * page_size
+ end_idx = start_idx + page_size
+ total_pages = (len(cluster_infos) + page_size - 1) // page_size
+
+ # Get clusters for current page
+ current_clusters = cluster_infos[start_idx:end_idx]
+
+ # Display clusters
+ if use_widgets:
+ Cluster._display_clusters_list_widget(current_clusters, page, page_size, total_pages)
+ else:
+ print_clusters(current_clusters)
+
+ # Return full list if requested
+ return cluster_infos if return_list else None
+
+ except Exception as e:
+ error_msg = _kube_api_error_handling(e)
+ if return_list:
+ return []
+ return None
+
+ @staticmethod
+ def _get_cluster_kueue_workload_info(cluster_name: str, namespace: str) -> Optional[KueueWorkloadInfo]:
+ """Get Kueue workload information for a RayCluster."""
+ try:
+ api = client.CustomObjectsApi(get_api_client())
+ workloads = api.list_namespaced_custom_object(
+ group="kueue.x-k8s.io",
+ version="v1beta1",
+ namespace=namespace,
+ plural="workloads"
+ )
+
+ for workload in workloads.get("items", []):
+ if workload["metadata"]["ownerReferences"][0]["name"] == cluster_name:
+ status = workload.get("status", {})
+ admission = status.get("admission", {})
+
+ # Get error information if cluster failed
+ error_msg, error_reason = Cluster._get_workload_error_info(workload)
+
+ return KueueWorkloadInfo(
+ name=workload["metadata"]["name"],
+ status=status.get("conditions", [{}])[0].get("type", "Unknown"),
+ priority=workload["spec"].get("priority", 0),
+ admission_time=Cluster._get_admission_time(admission),
+ error_message=error_msg,
+ error_reason=error_reason
+ )
+
+ return None
+
+ except Exception as e:
+ # Silently fail - Kueue may not be installed or workload info unavailable
+ pass
+ return None
+
+ @staticmethod
+ def _get_admission_time(admission: Dict[str, Any]) -> Optional[str]:
+ """Extract admission time from Kueue workload admission data."""
+ if not admission:
+ return None
+ return admission.get("podSetAssignments", [{}])[0].get("admissionTime")
+
+ @staticmethod
+ def _get_workload_error_info(workload: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
+ """Extract error information from a failed Kueue workload."""
+ status = workload.get("status", {})
+ conditions = status.get("conditions", [])
+
+ for condition in conditions:
+ if condition.get("type") == "Failed":
+ return condition.get("message"), condition.get("reason")
+
+ return None, None
+
+ @staticmethod
+ def _get_cluster_dashboard_url(cluster_name: str, namespace: str) -> str:
+ """Get the dashboard URL for a RayCluster."""
+ # Try HTTPRoute first (RHOAI v3.0+)
+ dashboard_url = _get_dashboard_url_from_httproute(cluster_name, namespace)
+ if dashboard_url:
+ return dashboard_url
+
+ # Fall back to OpenShift Routes or Ingresses
+ if _is_openshift_cluster():
+ try:
+ api = client.CustomObjectsApi(get_api_client())
+ routes = api.list_namespaced_custom_object(
+ group="route.openshift.io",
+ version="v1",
+ namespace=namespace,
+ plural="routes"
+ )
+
+ for route in routes["items"]:
+ if route["metadata"]["name"] == f"ray-dashboard-{cluster_name}" or route["metadata"]["name"].startswith(f"{cluster_name}-ingress"):
+ protocol = "https" if route["spec"].get("tls") else "http"
+ return f"{protocol}://{route['spec']['host']}"
+
+ except Exception as e:
+ # Silently fail - routes may not be available
+ pass
+
+ else:
+ try:
+ api = client.NetworkingV1Api(get_api_client())
+ ingresses = api.list_namespaced_ingress(namespace)
+
+ for ingress in ingresses.items:
+ if ingress.metadata.name == f"ray-dashboard-{cluster_name}" or ingress.metadata.name.startswith(f"{cluster_name}-ingress"):
+ protocol = "https" if ingress.metadata.annotations and "route.openshift.io/termination" in ingress.metadata.annotations else "http"
+ return f"{protocol}://{ingress.spec.rules[0].host}"
+
+ except Exception as e:
+ # Silently fail - ingresses may not be available
+ pass
+
+ return "Dashboard not available yet"
+
+ @staticmethod
+ def _display_cluster_status_widget(cluster_info: Optional[RayCluster], cluster_name: str = None, namespace: str = None) -> None:
+ """Display cluster status using ipywidgets."""
+ if not cluster_info:
+ # Cluster not found
+ output = widgets.HTML(
+ f'
'
+ f'β οΈ No RayCluster found with name "{cluster_name}" in namespace "{namespace}"'
+ '
'
+ )
+ display(output)
+ return
+
+ # Create status badge
+ status_colors = {
+ RayClusterStatus.READY: ("#2ecc71", "white"), # Green
+ RayClusterStatus.STARTING: ("#3498db", "white"), # Blue
+ RayClusterStatus.FAILED: ("#e74c3c", "white"), # Red
+ RayClusterStatus.STOPPED: ("#f1c40f", "black") # Yellow
+ }
+ status_color = status_colors.get(cluster_info.status, ("#95a5a6", "white")) # Gray for unknown
+
+ # Build HTML table
+ html = f'''
+
+
+
+ | Name: |
+ {cluster_info.name} |
+ Status: |
+
+
+ {cluster_info.status.value}
+
+ |
+
+
+ | Namespace: |
+ {cluster_info.namespace} |
+ Workers: |
+ {cluster_info.num_workers} |
+
+
+ | Head CPU: |
+ {cluster_info.head_cpu_requests}/{cluster_info.head_cpu_limits} |
+ Head Memory: |
+ {cluster_info.head_mem_requests}/{cluster_info.head_mem_limits} |
+
+
+ | Worker CPU: |
+ {cluster_info.worker_cpu_requests}/{cluster_info.worker_cpu_limits} |
+ Worker Memory: |
+ {cluster_info.worker_mem_requests}/{cluster_info.worker_mem_limits} |
+
+
+ | Dashboard: |
+
+
+ {cluster_info.dashboard}
+
+ |
+
+ '''
+
+ # Add extended resources if any
+ if cluster_info.head_extended_resources:
+ html += '| Head Resources: | '
+ for resource, count in cluster_info.head_extended_resources.items():
+ html += f'{resource}: {count}, '
+ html = html.rstrip(', ') + ' |
'
+
+ if cluster_info.worker_extended_resources:
+ html += '| Worker Resources: | '
+ for resource, count in cluster_info.worker_extended_resources.items():
+ html += f'{resource}: {count}, '
+ html = html.rstrip(', ') + ' |
'
+
+ # Add Kueue information if available
+ if cluster_info.kueue_workload:
+ html += f'''
+
+ | Queue: |
+ {cluster_info.local_queue or 'N/A'} |
+ Priority: |
+ {cluster_info.kueue_workload.priority} |
+
+
+ | Queue Status: |
+ {cluster_info.kueue_workload.status} |
+ Admission: |
+ {cluster_info.kueue_workload.admission_time or 'N/A'} |
+
+ '''
+
+ # Add error information for failed clusters
+ if cluster_info.status == RayClusterStatus.FAILED and cluster_info.kueue_workload.error_message:
+ html += f'''
+
+ | Error: |
+
+ {cluster_info.kueue_workload.error_reason}: {cluster_info.kueue_workload.error_message}
+ |
+
+ '''
+
+ html += '''
+
+
+ '''
+
+ # Display the widget
+ output = widgets.HTML(html)
+ display(output)
+
+ @staticmethod
+ def _display_clusters_list_widget(clusters: ListType[RayCluster], page: int, page_size: int, total_pages: int = 1) -> None:
+ """Display clusters list using ipywidgets."""
+ if not clusters:
+ output = widgets.HTML(
+ ''
+ 'β οΈ No RayClusters found'
+ '
'
+ )
+ display(output)
+ return
+
+ # Create HTML table
+ html = '''
+
+
+
+
+ | Name |
+ Status |
+ Workers |
+ Queue |
+ Queue Status |
+ Dashboard |
+
+
+
+ '''
+
+ # Status colors
+ status_colors = {
+ RayClusterStatus.READY: ("#2ecc71", "white"), # Green
+ RayClusterStatus.STARTING: ("#3498db", "white"), # Blue
+ RayClusterStatus.FAILED: ("#e74c3c", "white"), # Red
+ RayClusterStatus.STOPPED: ("#f1c40f", "black") # Yellow
+ }
+
+ # Add rows
+ for cluster in clusters:
+ status_color = status_colors.get(cluster.status, ("#95a5a6", "white")) # Gray for unknown
+
+ html += f'''
+
+ | {cluster.name} |
+
+
+ {cluster.status.value}
+
+ |
+ {cluster.num_workers} |
+ {cluster.local_queue or 'N/A'} |
+ {cluster.kueue_workload.status if cluster.kueue_workload else 'N/A'} |
+
+
+ {cluster.dashboard}
+
+ |
+
+ '''
+
+ html += '''
+
+
+ '''
+
+ # Add pagination info
+ if total_pages > 1:
+ html += f'''
+
+ Page {page} of {total_pages}
+ '''
+ if page > 1:
+ html += f' (use page={page - 1} for previous)'
+ if page < total_pages:
+ html += f' (page={page + 1} for next)'
+ html += '
'
+
+ html += '
'
+
+ # Display the widget
+ output = widgets.HTML(html)
+ display(output)
def __init__(self, config: ClusterConfiguration):
"""
@@ -951,8 +1452,8 @@ def _ray_cluster_status(name, namespace="default") -> Optional[RayCluster]:
def _get_ray_clusters(
- namespace="default", filter: Optional[List[RayClusterStatus]] = None
-) -> List[RayCluster]:
+ namespace="default", filter: Optional[ListType[RayClusterStatus]] = None
+) -> ListType[RayCluster]:
list_of_clusters = []
try:
config_check()
@@ -971,16 +1472,133 @@ def _get_ray_clusters(
for rc in rcs["items"]:
ray_cluster = _map_to_ray_cluster(rc)
if filter and ray_cluster.status in filter:
+ # Check for AppWrapper ownership
+ metadata = rc.get("metadata", {})
+ owner_refs = metadata.get("ownerReferences", [])
+ for owner in owner_refs:
+ if owner.get("kind") == "AppWrapper":
+ ray_cluster.is_appwrapper = True
+ break
+
+ # Fetch Kueue workload info for all clusters
+ workload_info = _get_cluster_kueue_workload_info_func(
+ metadata.get("name"), namespace
+ )
+ if workload_info:
+ ray_cluster.local_queue = workload_info.queue_name
+ ray_cluster.kueue_workload = workload_info
+
list_of_clusters.append(ray_cluster)
else:
for rc in rcs["items"]:
- list_of_clusters.append(_map_to_ray_cluster(rc))
+ ray_cluster = _map_to_ray_cluster(rc)
+ # Check for AppWrapper ownership
+ metadata = rc.get("metadata", {})
+ owner_refs = metadata.get("ownerReferences", [])
+ for owner in owner_refs:
+ if owner.get("kind") == "AppWrapper":
+ ray_cluster.is_appwrapper = True
+ break
+
+ # Fetch Kueue workload info for all clusters
+ workload_info = _get_cluster_kueue_workload_info_func(
+ metadata.get("name"), namespace
+ )
+ if workload_info:
+ ray_cluster.local_queue = workload_info.queue_name
+ ray_cluster.kueue_workload = workload_info
+
+ list_of_clusters.append(ray_cluster)
return list_of_clusters
+def _get_cluster_kueue_workload_info_func(
+ cluster_name: str, namespace: str
+) -> Optional[KueueWorkloadInfo]:
+ """
+ Get Kueue workload information for a RayCluster.
+
+ This function looks for Kueue workloads that have a RayCluster owner reference
+ matching the given cluster name. Works for all RayClusters, whether they're
+ AppWrapper-managed or directly managed by Kueue.
+
+ Args:
+ cluster_name: Name of the RayCluster
+ namespace: Kubernetes namespace
+
+ Returns:
+ KueueWorkloadInfo if found, None otherwise
+ """
+ try:
+ api = client.CustomObjectsApi(get_api_client())
+ workloads = api.list_namespaced_custom_object(
+ group="kueue.x-k8s.io",
+ version="v1beta1",
+ namespace=namespace,
+ plural="workloads",
+ )
+
+ for workload in workloads.get("items", []):
+ owner_refs = workload.get("metadata", {}).get("ownerReferences", [])
+ for owner_ref in owner_refs:
+ if (
+ owner_ref.get("kind") == "RayCluster"
+ and owner_ref.get("name") == cluster_name
+ ):
+ # Found the workload for this RayCluster
+ metadata = workload.get("metadata", {})
+ status = workload.get("status", {})
+ spec = workload.get("spec", {})
+ admission = status.get("admission", {})
+
+ # Get queue name from workload spec
+ queue_name = spec.get("queueName", "")
+
+ # Get status from conditions
+ conditions = status.get("conditions", [])
+ workload_status = "Unknown"
+ for condition in conditions:
+ if condition.get("status") == "True":
+ workload_status = condition.get("type", "Unknown")
+ break
+
+ # Get admission time
+ admission_time = None
+ if admission:
+ admission_time = admission.get("clusterQueue")
+
+ # Get error information if available
+ error_msg = None
+ error_reason = None
+ for condition in conditions:
+ if condition.get("type") == "Finished" and condition.get(
+ "status"
+ ) == "True":
+ if condition.get("reason") == "Failed":
+ error_reason = condition.get("reason")
+ error_msg = condition.get("message")
+
+ return KueueWorkloadInfo(
+ name=metadata.get("name", "unknown"),
+ queue_name=queue_name,
+ status=workload_status,
+ priority=spec.get("priority"),
+ creation_time=metadata.get("creationTimestamp"),
+ admission_time=admission_time,
+ error_message=error_msg,
+ error_reason=error_reason,
+ )
+
+ return None
+
+ except Exception as e:
+ # Silently fail if Kueue is not installed or workload info unavailable
+ return None
+
+
def _get_app_wrappers(
- namespace="default", filter=List[AppWrapperStatus]
-) -> List[AppWrapper]:
+ namespace="default", filter=ListType[AppWrapperStatus]
+) -> ListType[AppWrapper]:
list_of_app_wrappers = []
try:
diff --git a/src/codeflare_sdk/ray/cluster/pretty_print.py b/src/codeflare_sdk/ray/cluster/pretty_print.py
index faa03258..7315d98b 100644
--- a/src/codeflare_sdk/ray/cluster/pretty_print.py
+++ b/src/codeflare_sdk/ray/cluster/pretty_print.py
@@ -157,6 +157,23 @@ def print_clusters(clusters: List[RayCluster]):
) # format that is used to generate the name of the service
table0.add_row()
table0.add_row(f"[link={dashboard} blue underline]Dashboard:link:[/link]")
+
+ # Add Kueue information if available (for any cluster, not just AppWrapper-managed)
+ if cluster.kueue_workload:
+ table0.add_row()
+ table0.add_row("[bold blue]π― Kueue Integration[/bold blue]")
+ table0.add_row(f"[bold]Local Queue:[/bold] {cluster.local_queue or 'N/A'}")
+ table0.add_row(f"[bold]Workload Status:[/bold] [bold green]{cluster.kueue_workload.status}[/bold green]")
+ table0.add_row(f"[bold]Workload Name:[/bold] {cluster.kueue_workload.name}")
+ if cluster.kueue_workload.priority is not None:
+ table0.add_row(f"[bold]Priority:[/bold] {cluster.kueue_workload.priority}")
+ if cluster.kueue_workload.admission_time:
+ table0.add_row(f"[bold]Admitted:[/bold] {cluster.kueue_workload.admission_time}")
+ if cluster.kueue_workload.error_message:
+ table0.add_row(f"[bold red]Error:[/bold red] {cluster.kueue_workload.error_reason}: {cluster.kueue_workload.error_message}")
+ if cluster.is_appwrapper:
+ table0.add_row(f"[bold]AppWrapper Managed:[/bold] [green]Yes[/green]")
+
table0.add_row("") # empty row for spacing
#'table1' to display the worker counts
diff --git a/src/codeflare_sdk/ray/cluster/status.py b/src/codeflare_sdk/ray/cluster/status.py
index 136ae302..57fa9685 100644
--- a/src/codeflare_sdk/ray/cluster/status.py
+++ b/src/codeflare_sdk/ray/cluster/status.py
@@ -21,7 +21,10 @@
from dataclasses import dataclass, field
from enum import Enum
import typing
-from typing import Union
+from typing import Union, Optional, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from ..rayjobs.status import KueueWorkloadInfo
class RayClusterStatus(Enum):
@@ -55,6 +58,26 @@ class CodeFlareClusterStatus(Enum):
class RayCluster:
"""
For storing information about a Ray cluster.
+
+ Attributes:
+ name: Name of the RayCluster
+ status: Current status of the cluster
+ head_cpu_requests: CPU requests for head node
+ head_cpu_limits: CPU limits for head node
+ head_mem_requests: Memory requests for head node
+ head_mem_limits: Memory limits for head node
+ num_workers: Number of worker nodes
+ worker_mem_requests: Memory requests per worker
+ worker_mem_limits: Memory limits per worker
+ worker_cpu_requests: CPU requests per worker
+ worker_cpu_limits: CPU limits per worker
+ namespace: Kubernetes namespace
+ dashboard: Dashboard URL
+ worker_extended_resources: Extended resources for workers (e.g., GPUs)
+ head_extended_resources: Extended resources for head node
+ local_queue: Kueue LocalQueue name (if managed by Kueue)
+ kueue_workload: Kueue workload information (if managed by Kueue)
+ is_appwrapper: Whether cluster is managed by AppWrapper
"""
name: str
@@ -72,3 +95,6 @@ class RayCluster:
dashboard: str
worker_extended_resources: typing.Dict[str, int] = field(default_factory=dict)
head_extended_resources: typing.Dict[str, int] = field(default_factory=dict)
+ local_queue: Optional[str] = None
+ kueue_workload: Optional["KueueWorkloadInfo"] = None
+ is_appwrapper: bool = False
diff --git a/src/codeflare_sdk/ray/rayjobs/__init__.py b/src/codeflare_sdk/ray/rayjobs/__init__.py
index cd6b4123..628995e3 100644
--- a/src/codeflare_sdk/ray/rayjobs/__init__.py
+++ b/src/codeflare_sdk/ray/rayjobs/__init__.py
@@ -1,3 +1,3 @@
from .rayjob import RayJob, ManagedClusterConfig
-from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo
-from .config import ManagedClusterConfig
+from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo, KueueWorkloadInfo
+from .config import ManagedClusterConfig
\ No newline at end of file
diff --git a/src/codeflare_sdk/ray/rayjobs/pretty_print.py b/src/codeflare_sdk/ray/rayjobs/pretty_print.py
index 34e8dfa1..671d09c6 100644
--- a/src/codeflare_sdk/ray/rayjobs/pretty_print.py
+++ b/src/codeflare_sdk/ray/rayjobs/pretty_print.py
@@ -20,9 +20,9 @@
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
-from typing import Tuple, Optional
+from typing import Tuple, Optional, List
-from .status import RayJobDeploymentStatus, RayJobInfo
+from .status import RayJobDeploymentStatus, RayJobInfo, KueueWorkloadInfo
def print_job_status(job_info: RayJobInfo):
@@ -37,6 +37,10 @@ def print_job_status(job_info: RayJobInfo):
table.add_row(f"[bold]Status:[/bold] {job_info.status.value}")
table.add_row(f"[bold]RayCluster:[/bold] {job_info.cluster_name}")
table.add_row(f"[bold]Namespace:[/bold] {job_info.namespace}")
+
+ # Add cluster management info
+ managed_text = "[bold green]Yes (Job-managed)[/bold green]" if job_info.is_managed_cluster else "[dim]No (Existing cluster)[/dim]"
+ table.add_row(f"[bold]Managed Cluster:[/bold] {managed_text}")
# Add timing information if available
if job_info.start_time:
@@ -47,6 +51,23 @@ def print_job_status(job_info: RayJobInfo):
if job_info.failed_attempts > 0:
table.add_row(f"[bold]Failed Attempts:[/bold] {job_info.failed_attempts}")
+ # Add Kueue information if available
+ if job_info.kueue_workload:
+ table.add_row()
+ table.add_row("[bold blue]π― Kueue Integration[/bold blue]")
+ table.add_row(f"[bold]Local Queue:[/bold] {job_info.local_queue}")
+ table.add_row(f"[bold]Workload Status:[/bold] [bold green]{job_info.kueue_workload.status}[/bold green]")
+ table.add_row(f"[bold]Workload Name:[/bold] {job_info.kueue_workload.name}")
+ if job_info.kueue_workload.priority is not None:
+ table.add_row(f"[bold]Priority:[/bold] {job_info.kueue_workload.priority}")
+ if job_info.kueue_workload.admission_time:
+ table.add_row(f"[bold]Admitted:[/bold] {job_info.kueue_workload.admission_time}")
+ elif job_info.local_queue:
+ table.add_row()
+ table.add_row("[bold blue]π― Kueue Integration[/bold blue]")
+ table.add_row(f"[bold]Local Queue:[/bold] {job_info.local_queue}")
+ table.add_row("[dim]Workload information not available[/dim]")
+
_print_table_in_panel(table)
@@ -66,6 +87,162 @@ def print_no_job_found(job_name: str, namespace: str):
_print_table_in_panel(table)
+def print_jobs_list(job_list: List[RayJobInfo], namespace: str, pagination_info: Optional[dict] = None):
+ """
+ Pretty print a list of RayJobs using Rich formatting with pagination support.
+
+ Args:
+ job_list: List of RayJobInfo objects (for current page)
+ namespace: Kubernetes namespace
+ pagination_info: Optional pagination information dict
+ """
+ if not job_list:
+ # Create table for no jobs found
+ table = _create_info_table(
+ "[white on yellow][bold]Namespace", namespace, "[bold yellow]No RayJobs found"
+ )
+ table.add_row()
+ table.add_row("No RayJobs found in this namespace.")
+ table.add_row("Jobs may have been deleted or completed with TTL cleanup.")
+ _print_table_in_panel(table)
+ return
+
+ # Create main table for job list
+ console = Console()
+
+ # Create title with pagination info
+ title = f"[bold blue]π RayJobs in namespace: {namespace}[/bold blue]"
+ if pagination_info and pagination_info["total_pages"] > 1:
+ title += f" [dim](Page {pagination_info['current_page']} of {pagination_info['total_pages']})[/dim]"
+
+ # Create jobs table with responsive width
+ jobs_table = Table(
+ title=title,
+ show_header=True,
+ header_style="bold magenta",
+ border_style="blue",
+ expand=True, # Allow table to expand to terminal width
+ min_width=120, # Minimum width for readability
+ )
+
+ # Add columns with flexible width allocation and wrapping
+ jobs_table.add_column("Status", style="bold", min_width=12, max_width=16)
+ jobs_table.add_column("Job Name", style="bold cyan", min_width=15, max_width=30)
+ jobs_table.add_column("Job ID", style="dim", min_width=12, max_width=25)
+ jobs_table.add_column("Cluster", min_width=12, max_width=25)
+ jobs_table.add_column("Managed", min_width=8, max_width=12)
+ jobs_table.add_column("Queue", min_width=10, max_width=18)
+ jobs_table.add_column("Kueue Status", min_width=8, max_width=15)
+ jobs_table.add_column("Start Time", style="dim", min_width=8, max_width=12)
+
+ # Add rows for each job
+ for job_info in job_list:
+ status_display, _ = _get_status_display(job_info.status)
+
+ # Format start time more compactly
+ if job_info.start_time:
+ try:
+ # Extract just time portion and make it compact
+ start_time = job_info.start_time.split('T')[1][:8] # HH:MM:SS
+ except (IndexError, AttributeError):
+ start_time = "N/A"
+ else:
+ start_time = "N/A"
+
+ # Truncate long values intelligently (Rich will handle further truncation if needed)
+ job_name = _truncate_text(job_info.name, 28)
+ job_id = _truncate_text(job_info.job_id, 23)
+ cluster_name = _truncate_text(job_info.cluster_name, 23)
+
+ # Cluster management info
+ managed_display = "β
Yes" if job_info.is_managed_cluster else "β No"
+ managed_style = "bold green" if job_info.is_managed_cluster else "dim"
+
+ # Kueue information
+ queue_display = _truncate_text(job_info.local_queue or "N/A", 16)
+ kueue_status_display = "N/A"
+ kueue_status_style = "dim"
+
+ if job_info.kueue_workload:
+ kueue_status_display = job_info.kueue_workload.status
+ if job_info.kueue_workload.status == "Admitted":
+ kueue_status_style = "bold green"
+ elif job_info.kueue_workload.status == "Pending":
+ kueue_status_style = "bold yellow"
+ elif job_info.kueue_workload.status == "Finished":
+ kueue_status_style = "bold blue"
+
+ jobs_table.add_row(
+ status_display,
+ job_name,
+ job_id,
+ cluster_name,
+ f"[{managed_style}]{managed_display}[/{managed_style}]",
+ queue_display,
+ f"[{kueue_status_style}]{kueue_status_display}[/{kueue_status_style}]",
+ start_time,
+ )
+
+ # Print the table
+ console.print(jobs_table)
+
+ # Add pagination information
+ if pagination_info:
+ console.print() # Add spacing
+ if pagination_info["total_pages"] > 1:
+ console.print(
+ f"[dim]Showing {pagination_info['showing_start']}-{pagination_info['showing_end']} "
+ f"of {pagination_info['total_jobs']} jobs "
+ f"(Page {pagination_info['current_page']} of {pagination_info['total_pages']})[/dim]"
+ )
+
+ # Navigation hints
+ nav_hints = []
+ if pagination_info['current_page'] > 1:
+ nav_hints.append(f"Previous: RayJob.List(page={pagination_info['current_page'] - 1})")
+ if pagination_info['current_page'] < pagination_info['total_pages']:
+ nav_hints.append(f"Next: RayJob.List(page={pagination_info['current_page'] + 1})")
+
+ if nav_hints:
+ console.print(f"[dim]Navigation: {' | '.join(nav_hints)}[/dim]")
+
+ # Add summary information
+ kueue_jobs = [job for job in job_list if job.local_queue]
+ managed_jobs = [job for job in job_list if job.is_managed_cluster]
+
+ if kueue_jobs or managed_jobs:
+ console.print() # Add spacing
+ if managed_jobs:
+ total_managed = len(managed_jobs)
+ total_jobs = pagination_info["total_jobs"] if pagination_info else len(job_list)
+ console.print(f"[bold green]ποΈ {total_managed} of {total_jobs} jobs use job-managed clusters[/bold green]")
+ if kueue_jobs:
+ total_kueue = len(kueue_jobs)
+ total_jobs = pagination_info["total_jobs"] if pagination_info else len(job_list)
+ console.print(f"[bold blue]π― {total_kueue} of {total_jobs} jobs are managed by Kueue[/bold blue]")
+
+
+def _truncate_text(text: str, max_length: int) -> str:
+ """
+ Truncate text intelligently with ellipsis if needed.
+
+ Args:
+ text: Text to truncate
+ max_length: Maximum length including ellipsis
+
+ Returns:
+ Truncated text with ellipsis if needed
+ """
+ if len(text) <= max_length:
+ return text
+
+ # Leave room for ellipsis
+ if max_length <= 3:
+ return text[:max_length]
+
+ return text[:max_length-3] + "..."
+
+
def _get_status_display(status: RayJobDeploymentStatus) -> Tuple[str, str]:
"""
Get the display string and header color for a given status.
diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py
index c06c596e..da57f0f8 100644
--- a/src/codeflare_sdk/ray/rayjobs/rayjob.py
+++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py
@@ -23,10 +23,12 @@
from typing import Dict, Any, Optional, Tuple, Union
from ray.runtime_env import RuntimeEnv
+from kubernetes import client
+
from codeflare_sdk.common.kueue.kueue import get_default_kueue_name
from codeflare_sdk.common.utils.constants import MOUNT_PATH
-
from codeflare_sdk.common.utils.utils import get_ray_image_for_python_version
+from codeflare_sdk.common.kubernetes_cluster.auth import get_api_client, config_check
from codeflare_sdk.vendored.python_client.kuberay_job_api import RayjobApi
from codeflare_sdk.vendored.python_client.kuberay_cluster_api import RayClusterApi
from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig
@@ -43,13 +45,135 @@
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
+ KueueWorkloadInfo,
)
from . import pretty_print
+# Widget imports (optional, for Jupyter notebook support)
+try:
+ import ipywidgets as widgets
+ from IPython.display import display
+ WIDGETS_AVAILABLE = True
+except ImportError:
+ WIDGETS_AVAILABLE = False
+
logger = logging.getLogger(__name__)
+def _get_kueue_workload_info(job_name: str, namespace: str, labels: dict) -> Optional[KueueWorkloadInfo]:
+ """
+ Get Kueue workload information for a RayJob if it's managed by Kueue.
+
+ Args:
+ job_name: Name of the RayJob
+ namespace: Kubernetes namespace
+ labels: Labels from the RayJob metadata
+
+ Returns:
+ KueueWorkloadInfo if the job is managed by Kueue, None otherwise
+ """
+ # Check if job has Kueue queue label
+ queue_name = labels.get("kueue.x-k8s.io/queue-name")
+ if not queue_name:
+ return None
+
+ try:
+ # Check and load Kubernetes config (handles oc login, kubeconfig, etc.)
+ config_check()
+ # List all workloads in the namespace to find the one for this RayJob
+ api_instance = client.CustomObjectsApi(get_api_client())
+ workloads = api_instance.list_namespaced_custom_object(
+ group="kueue.x-k8s.io",
+ version="v1beta1",
+ plural="workloads",
+ namespace=namespace,
+ )
+
+ # Find workload with matching RayJob owner reference
+ for workload in workloads.get("items", []):
+ owner_refs = workload.get("metadata", {}).get("ownerReferences", [])
+
+ for owner_ref in owner_refs:
+ if (
+ owner_ref.get("kind") == "RayJob"
+ and owner_ref.get("name") == job_name
+ ):
+ # Found the workload for this RayJob
+ workload_metadata = workload.get("metadata", {})
+ workload_status = workload.get("status", {})
+
+ # Extract workload information
+ workload_info = KueueWorkloadInfo(
+ name=workload_metadata.get("name", "unknown"),
+ queue_name=queue_name,
+ status=_get_workload_status_summary(workload_status),
+ priority=workload.get("spec", {}).get("priority"),
+ creation_time=workload_metadata.get("creationTimestamp"),
+ admission_time=_get_admission_time(workload_status),
+ )
+
+ logger.debug(f"Found Kueue workload for RayJob {job_name}: {workload_info.name}")
+ return workload_info
+
+ # No workload found for this RayJob
+ logger.debug(f"No Kueue workload found for RayJob {job_name}")
+ return None
+
+ except Exception as e:
+ logger.warning(f"Failed to get Kueue workload info for RayJob {job_name}: {e}")
+ return None
+
+
+def _get_workload_status_summary(workload_status: dict) -> str:
+ """
+ Get a summary status from Kueue workload status.
+
+ Args:
+ workload_status: The status section from a Kueue workload CR
+
+ Returns:
+ String summary of the workload status
+ """
+ conditions = workload_status.get("conditions", [])
+
+ # Check conditions in priority order
+ for condition_type in ["Finished", "Admitted", "QuotaReserved", "Pending"]:
+ for condition in conditions:
+ if (
+ condition.get("type") == condition_type
+ and condition.get("status") == "True"
+ ):
+ return condition_type
+
+ # If no clear status, return "Unknown"
+ return "Unknown"
+
+
+def _get_admission_time(workload_status: dict) -> Optional[str]:
+ """
+ Get the admission time from Kueue workload status.
+
+ Args:
+ workload_status: The status section from a Kueue workload CR
+
+ Returns:
+ Admission timestamp if available, None otherwise
+ """
+ conditions = workload_status.get("conditions", [])
+
+ for condition in conditions:
+ if (
+ condition.get("type") == "Admitted"
+ and condition.get("status") == "True"
+ ):
+ return condition.get("lastTransitionTime")
+
+ return None
+
+
+
+
class RayJob:
"""
A client for managing Ray jobs using the KubeRay operator.
@@ -57,6 +181,40 @@ class RayJob:
This class provides a simplified interface for submitting and managing
RayJob CRs (using the KubeRay RayJob python client).
"""
+
+ # Global configuration for widget display preference
+ _default_use_widgets = False
+
+ @classmethod
+ def set_widgets_default(cls, use_widgets: bool):
+ """
+ Set the global default for widget display in Status() and List() methods.
+
+ Args:
+ use_widgets (bool): Whether to use Jupyter widgets by default
+
+ Example:
+ >>> from codeflare_sdk import RayJob
+ >>> RayJob.set_widgets_default(True) # Enable widgets globally
+ >>> RayJob.Status("my-job") # Now uses widgets by default
+ >>> RayJob.List() # Now uses widgets by default
+ """
+ cls._default_use_widgets = use_widgets
+ logger.info(f"Set global widget default to: {use_widgets}")
+
+ @classmethod
+ def get_widgets_default(cls) -> bool:
+ """
+ Get the current global default for widget display.
+
+ Returns:
+ bool: Current global widget setting
+
+ Example:
+ >>> from codeflare_sdk import RayJob
+ >>> print(f"Widgets enabled: {RayJob.get_widgets_default()}")
+ """
+ return cls._default_use_widgets
def __init__(
self,
@@ -573,6 +731,27 @@ def status(
except ValueError:
deployment_status = RayJobDeploymentStatus.UNKNOWN
+ # Get Kueue workload information if available
+ # We need to fetch the RayJob CR to get labels
+ kueue_workload = None
+ local_queue = None
+ try:
+ config_check()
+ api_instance = client.CustomObjectsApi(get_api_client())
+ rayjob_cr = api_instance.get_namespaced_custom_object(
+ group="ray.io",
+ version="v1",
+ namespace=self.namespace,
+ plural="rayjobs",
+ name=self.name,
+ )
+ labels = rayjob_cr.get("metadata", {}).get("labels", {})
+ local_queue = labels.get("kueue.x-k8s.io/queue-name")
+ if local_queue:
+ kueue_workload = _get_kueue_workload_info(self.name, self.namespace, labels)
+ except Exception as e:
+ logger.debug(f"Could not fetch Kueue workload info for {self.name}: {e}")
+
# Create RayJobInfo dataclass
job_info = RayJobInfo(
name=self.name,
@@ -584,6 +763,9 @@ def status(
end_time=status_data.get("endTime"),
failed_attempts=status_data.get("failed", 0),
succeeded_attempts=status_data.get("succeeded", 0),
+ kueue_workload=kueue_workload,
+ local_queue=local_queue,
+ is_managed_cluster=self._cluster_config is not None,
)
# Map to CodeFlare status and determine readiness
@@ -594,6 +776,611 @@ def status(
return codeflare_status, ready
+ @staticmethod
+ def Status(
+ job_name: str,
+ namespace: Optional[str] = None,
+ use_widgets: Optional[bool] = None
+ ):
+ """
+ Get the status of a RayJob by name and display it with Rich console formatting.
+
+ Args:
+ job_name (str): The name of the RayJob
+ namespace (Optional[str]): The Kubernetes namespace (auto-detected if not specified)
+ use_widgets (Optional[bool]): Whether to display in Jupyter widgets (default: None, uses global setting)
+
+ Returns:
+ Tuple of (CodeflareRayJobStatus, ready: bool) when use_widgets=False (default), None when use_widgets=True
+
+ Example:
+ >>> from codeflare_sdk.ray import RayJob
+ >>> status, ready = RayJob.Status("my-job") # Rich console (default)
+ >>> RayJob.set_widgets_default(True) # Enable widgets globally
+ >>> RayJob.Status("my-job") # Now uses widgets by default
+ """
+ if namespace is None:
+ namespace = get_current_namespace()
+
+ # Use global default if not specified
+ if use_widgets is None:
+ use_widgets = RayJob._default_use_widgets
+
+ # Initialize the API (no parameters needed)
+ api = RayjobApi()
+
+ try:
+ # Check and load Kubernetes config (handles oc login, kubeconfig, etc.)
+ config_check()
+ status_data = api.get_job_status(name=job_name, k8s_namespace=namespace)
+
+ if not status_data:
+ if use_widgets:
+ RayJob._display_job_status_widget(None, job_name, namespace)
+ return None # Don't return tuple to avoid Jupyter auto-display
+ else:
+ pretty_print.print_no_job_found(job_name, namespace)
+ return CodeflareRayJobStatus.UNKNOWN, False
+
+ # Map deployment status to our enums
+ deployment_status_str = status_data.get("jobDeploymentStatus", "Unknown")
+
+ try:
+ deployment_status = RayJobDeploymentStatus(deployment_status_str)
+ except ValueError:
+ deployment_status = RayJobDeploymentStatus.UNKNOWN
+
+ # Create RayJobInfo dataclass - we need to determine cluster_name
+ cluster_name = status_data.get("rayClusterName", "unknown")
+
+ # Get Kueue workload information and cluster management info
+ # We need to fetch the RayJob CR to get labels and spec
+ kueue_workload = None
+ local_queue = None
+ is_managed_cluster = False
+ try:
+ api_instance = client.CustomObjectsApi(get_api_client())
+ rayjob_cr = api_instance.get_namespaced_custom_object(
+ group="ray.io",
+ version="v1",
+ namespace=namespace,
+ plural="rayjobs",
+ name=job_name,
+ )
+ labels = rayjob_cr.get("metadata", {}).get("labels", {})
+ spec = rayjob_cr.get("spec", {})
+
+ # Determine if this is a managed cluster
+ is_managed_cluster = "rayClusterSpec" in spec
+
+ local_queue = labels.get("kueue.x-k8s.io/queue-name")
+ if local_queue:
+ kueue_workload = _get_kueue_workload_info(job_name, namespace, labels)
+ except Exception as e:
+ logger.debug(f"Could not fetch Kueue workload info for {job_name}: {e}")
+
+ job_info = RayJobInfo(
+ name=job_name,
+ job_id=status_data.get("jobId", ""),
+ status=deployment_status,
+ namespace=namespace,
+ cluster_name=cluster_name,
+ start_time=status_data.get("startTime"),
+ end_time=status_data.get("endTime"),
+ failed_attempts=status_data.get("failed", 0),
+ succeeded_attempts=status_data.get("succeeded", 0),
+ kueue_workload=kueue_workload,
+ local_queue=local_queue,
+ is_managed_cluster=is_managed_cluster,
+ )
+
+ # Map to CodeFlare status and determine readiness using static method
+ codeflare_status, ready = RayJob._map_to_codeflare_status_static(deployment_status)
+
+ if use_widgets:
+ RayJob._display_job_status_widget(job_info, job_name, namespace)
+ return None # Don't return tuple to avoid Jupyter auto-display
+ else:
+ pretty_print.print_job_status(job_info)
+ return codeflare_status, ready
+
+ except Exception as e:
+ logger.error(f"Failed to get status for RayJob {job_name}: {e}")
+ if use_widgets:
+ RayJob._display_job_status_widget(None, job_name, namespace)
+ return None # Don't return tuple to avoid Jupyter auto-display
+ else:
+ pretty_print.print_no_job_found(job_name, namespace)
+ return CodeflareRayJobStatus.UNKNOWN, False
+
+ @staticmethod
+ def List(
+ namespace: Optional[str] = None,
+ use_widgets: Optional[bool] = None,
+ page_size: int = 10,
+ page: int = 1
+ ):
+ """
+ List all RayJobs in a namespace and display them with Rich console formatting.
+
+ Args:
+ namespace (Optional[str]): The Kubernetes namespace (auto-detected if not specified)
+ use_widgets (Optional[bool]): Whether to display in Jupyter widgets (default: None, uses global setting)
+ page_size (int): Number of jobs to display per page (default: 10)
+ page (int): Page number to display (default: 1)
+
+ Returns:
+ list: List of RayJobInfo objects when use_widgets=False, None when use_widgets=True
+
+ Example:
+ >>> from codeflare_sdk.ray import RayJob
+ >>> jobs = RayJob.List() # First 10 jobs (default)
+ >>> RayJob.List(page=2) # Next 10 jobs
+ >>> RayJob.List(page_size=5, page=1) # First 5 jobs
+ >>> RayJob.set_widgets_default(True) # Enable widgets globally
+ """
+ if namespace is None:
+ namespace = get_current_namespace()
+
+ # Use global default if not specified
+ if use_widgets is None:
+ use_widgets = RayJob._default_use_widgets
+
+ try:
+ # Check and load Kubernetes config (handles oc login, kubeconfig, etc.)
+ config_check()
+ # Use Kubernetes API to list RayJob custom resources
+ api_instance = client.CustomObjectsApi(get_api_client())
+ rayjobs = api_instance.list_namespaced_custom_object(
+ group="ray.io",
+ version="v1",
+ namespace=namespace,
+ plural="rayjobs",
+ )
+
+ job_list = []
+ for rayjob_cr in rayjobs["items"]:
+ # Extract job information from the custom resource
+ metadata = rayjob_cr.get("metadata", {})
+ spec = rayjob_cr.get("spec", {})
+ status = rayjob_cr.get("status", {})
+
+ job_name = metadata.get("name", "unknown")
+ job_id = status.get("jobId", "")
+ deployment_status_str = status.get("jobDeploymentStatus", "Unknown")
+
+ try:
+ deployment_status = RayJobDeploymentStatus(deployment_status_str)
+ except ValueError:
+ deployment_status = RayJobDeploymentStatus.UNKNOWN
+
+ # Get cluster name and determine if it's managed
+ cluster_name = "unknown"
+ is_managed_cluster = False
+ if "rayClusterSpec" in spec:
+ cluster_name = f"{job_name}-cluster" # Managed cluster
+ is_managed_cluster = True
+ elif "clusterSelector" in spec:
+ cluster_selector = spec["clusterSelector"]
+ cluster_name = cluster_selector.get("ray.io/cluster", "unknown")
+ is_managed_cluster = False
+
+ # Get Kueue workload information if available
+ labels = metadata.get("labels", {})
+ local_queue = labels.get("kueue.x-k8s.io/queue-name")
+ kueue_workload = None
+ if local_queue:
+ kueue_workload = _get_kueue_workload_info(job_name, namespace, labels)
+
+ job_info = RayJobInfo(
+ name=job_name,
+ job_id=job_id,
+ status=deployment_status,
+ namespace=namespace,
+ cluster_name=cluster_name,
+ start_time=status.get("startTime"),
+ end_time=status.get("endTime"),
+ failed_attempts=status.get("failed", 0),
+ succeeded_attempts=status.get("succeeded", 0),
+ kueue_workload=kueue_workload,
+ local_queue=local_queue,
+ is_managed_cluster=is_managed_cluster,
+ )
+ job_list.append(job_info)
+
+ # Apply pagination
+ total_jobs = len(job_list)
+ total_pages = (total_jobs + page_size - 1) // page_size # Ceiling division
+
+ # Validate page number
+ if page < 1:
+ page = 1
+ elif page > total_pages and total_pages > 0:
+ page = total_pages
+
+ # Calculate pagination slice
+ start_idx = (page - 1) * page_size
+ end_idx = start_idx + page_size
+ paginated_jobs = job_list[start_idx:end_idx]
+
+ # Create pagination info
+ pagination_info = {
+ "current_page": page,
+ "total_pages": total_pages,
+ "page_size": page_size,
+ "total_jobs": total_jobs,
+ "showing_start": start_idx + 1 if paginated_jobs else 0,
+ "showing_end": min(end_idx, total_jobs),
+ }
+
+ if use_widgets:
+ RayJob._display_jobs_list_widget(paginated_jobs, namespace, pagination_info)
+ return None # Don't return job_list to avoid Jupyter auto-display
+ else:
+ pretty_print.print_jobs_list(paginated_jobs, namespace, pagination_info)
+ return job_list # Return full list for programmatic use
+
+ except Exception as e:
+ logger.error(f"Failed to list RayJobs in namespace {namespace}: {e}")
+ empty_pagination_info = {
+ "current_page": 1,
+ "total_pages": 0,
+ "page_size": page_size,
+ "total_jobs": 0,
+ "showing_start": 0,
+ "showing_end": 0,
+ }
+ if use_widgets:
+ RayJob._display_jobs_list_widget([], namespace, empty_pagination_info)
+ return None # Don't return empty list to avoid Jupyter auto-display
+ else:
+ pretty_print.print_jobs_list([], namespace, empty_pagination_info)
+ return []
+
+ @staticmethod
+ def _map_to_codeflare_status_static(
+ deployment_status: RayJobDeploymentStatus,
+ ) -> Tuple[CodeflareRayJobStatus, bool]:
+ """
+ Static version of status mapping for use by the static status method.
+ """
+ status_mapping = {
+ RayJobDeploymentStatus.COMPLETE: (CodeflareRayJobStatus.COMPLETE, True),
+ RayJobDeploymentStatus.FAILED: (CodeflareRayJobStatus.FAILED, True),
+ RayJobDeploymentStatus.RUNNING: (CodeflareRayJobStatus.RUNNING, False),
+ RayJobDeploymentStatus.SUSPENDED: (CodeflareRayJobStatus.SUSPENDED, False),
+ RayJobDeploymentStatus.UNKNOWN: (CodeflareRayJobStatus.UNKNOWN, False),
+ }
+
+ return status_mapping.get(
+ deployment_status, (CodeflareRayJobStatus.UNKNOWN, False)
+ )
+
+ @staticmethod
+ def _display_job_status_widget(job_info: Optional[RayJobInfo], job_name: str, namespace: str):
+ """
+ Display RayJob status in a Jupyter widget.
+
+ Args:
+ job_info: RayJobInfo object or None if job not found
+ job_name: Name of the job
+ namespace: Kubernetes namespace
+ """
+ if not WIDGETS_AVAILABLE:
+ # Fall back to console output if widgets not available
+ if job_info:
+ pretty_print.print_job_status(job_info)
+ else:
+ pretty_print.print_no_job_found(job_name, namespace)
+ return
+
+ # Create the widget display
+ if job_info is None:
+ # Job not found - match the list widget styling
+ status_widget = widgets.HTML(
+ value=f"""
+
+
β RayJob Not Found
+
+
+ | Job Name: |
+ {job_name} |
+
+
+ | Namespace: |
+ {namespace} |
+
+
+ | Status: |
+ The RayJob may have been deleted or never existed. |
+
+
+
+ """
+ )
+ else:
+ # Job found - create status display matching list widget style
+ status_color, status_icon = RayJob._get_status_color_and_icon(job_info.status)
+
+ # Build main job information table
+ job_table_rows = f"""
+
+ | Name: |
+ {job_info.name} |
+
+
+ | Job ID: |
+ {job_info.job_id} |
+
+
+ | Status: |
+
+ {status_icon} {job_info.status.value}
+ |
+
+
+ | RayCluster: |
+ {job_info.cluster_name} |
+
+
+ | Namespace: |
+ {job_info.namespace} |
+
+
+ | Managed Cluster: |
+
+ {'β
Yes (Job-managed)' if job_info.is_managed_cluster else 'β No (Existing cluster)'}
+ |
+
+ """
+
+ # Add timing information if available
+ if job_info.start_time:
+ job_table_rows += f"""
+
+ | Started: |
+ {job_info.start_time} |
+
+ """
+
+ if job_info.end_time:
+ job_table_rows += f"""
+
+ | Ended: |
+ {job_info.end_time} |
+
+ """
+
+ # Add failure info if available
+ if job_info.failed_attempts > 0:
+ job_table_rows += f"""
+
+ | Failed Attempts: |
+ {job_info.failed_attempts} |
+
+ """
+
+ # Build Kueue section
+ kueue_section = ""
+ if job_info.kueue_workload:
+ kueue_color = "#007bff" # Blue for Kueue
+ if job_info.kueue_workload.status == "Admitted":
+ kueue_color = "#28a745" # Green for admitted
+ elif job_info.kueue_workload.status == "Pending":
+ kueue_color = "#ffc107" # Yellow for pending
+
+ kueue_rows = f"""
+
+ | π― Local Queue: |
+ {job_info.local_queue} |
+
+
+ | π― Workload Status: |
+
+ {job_info.kueue_workload.status}
+ |
+
+
+ | π― Workload Name: |
+ {job_info.kueue_workload.name} |
+
+ """
+
+ if job_info.kueue_workload.priority is not None:
+ kueue_rows += f"""
+
+ | π― Priority: |
+ {job_info.kueue_workload.priority} |
+
+ """
+
+ if job_info.kueue_workload.admission_time:
+ kueue_rows += f"""
+
+ | π― Admitted: |
+ {job_info.kueue_workload.admission_time} |
+
+ """
+
+ kueue_section = kueue_rows
+
+ elif job_info.local_queue:
+ kueue_section = f"""
+
+ | π― Local Queue: |
+ {job_info.local_queue} |
+
+
+ | π― Workload Info: |
+ Not available |
+
+ """
+
+ status_widget = widgets.HTML(
+ value=f"""
+
+
{status_icon} RayJob Status
+
+ {job_table_rows}
+ {kueue_section}
+
+
+ """
+ )
+
+ # Display the widget
+ display(status_widget)
+
+ @staticmethod
+ def _get_status_color_and_icon(status: RayJobDeploymentStatus) -> Tuple[str, str]:
+ """
+ Get color and icon for job status display.
+
+ Returns:
+ Tuple of (color, icon)
+ """
+ status_mapping = {
+ RayJobDeploymentStatus.COMPLETE: ("#28a745", "β
"),
+ RayJobDeploymentStatus.RUNNING: ("#007bff", "π"),
+ RayJobDeploymentStatus.FAILED: ("#dc3545", "β"),
+ RayJobDeploymentStatus.SUSPENDED: ("#ffc107", "βΈοΈ"),
+ RayJobDeploymentStatus.UNKNOWN: ("#6c757d", "β"),
+ }
+ return status_mapping.get(status, ("#6c757d", "β"))
+
+ @staticmethod
+ def _display_jobs_list_widget(job_list, namespace: str, pagination_info: Optional[dict] = None):
+ """
+ Display a list of RayJobs in a Jupyter widget table with pagination.
+
+ Args:
+ job_list: List of RayJobInfo objects (for current page)
+ namespace: Kubernetes namespace
+ pagination_info: Optional pagination information dict
+ """
+ if not WIDGETS_AVAILABLE:
+ # Fall back to console output if widgets not available
+ pretty_print.print_jobs_list(job_list, namespace, pagination_info)
+ return
+
+ if not job_list:
+ # No jobs found
+ no_jobs_widget = widgets.HTML(
+ value=f"""
+
+
π No RayJobs Found
+
Namespace: {namespace}
+
No RayJobs found in this namespace. Jobs may have been deleted or completed with TTL cleanup.
+
+ """
+ )
+ display(no_jobs_widget)
+ return
+
+ # Create table header with pagination info
+ title = f"π RayJobs in namespace: {namespace}"
+ if pagination_info and pagination_info["total_pages"] > 1:
+ title += f" (Page {pagination_info['current_page']} of {pagination_info['total_pages']})"
+
+ table_html = f"""
+
+
{title}
+
+
+
+ | Status |
+ Job Name |
+ Job ID |
+ Cluster |
+ Managed |
+ Queue |
+ Kueue Status |
+ Start Time |
+
+
+
+ """
+
+ # Add rows for each job
+ for job_info in job_list:
+ status_color, status_icon = RayJob._get_status_color_and_icon(job_info.status)
+ start_time = job_info.start_time or "N/A"
+
+ # Cluster management info
+ managed_display = "β
Yes" if job_info.is_managed_cluster else "β No"
+ managed_color = "#28a745" if job_info.is_managed_cluster else "#6c757d"
+
+ # Kueue information
+ queue_display = job_info.local_queue or "N/A"
+ kueue_status_display = "N/A"
+ kueue_status_color = "#6c757d"
+
+ if job_info.kueue_workload:
+ kueue_status_display = job_info.kueue_workload.status
+ if job_info.kueue_workload.status == "Admitted":
+ kueue_status_color = "#28a745" # Green
+ elif job_info.kueue_workload.status == "Pending":
+ kueue_status_color = "#ffc107" # Yellow
+ elif job_info.kueue_workload.status == "Finished":
+ kueue_status_color = "#007bff" # Blue
+
+ table_html += f"""
+
+ |
+
+ {status_icon} {job_info.status.value}
+
+ |
+ {job_info.name} |
+ {job_info.job_id} |
+ {job_info.cluster_name} |
+
+ {managed_display}
+ |
+ {queue_display} |
+
+ {kueue_status_display}
+ |
+ {start_time} |
+
+ """
+
+ table_html += """
+
+
+ """
+
+ # Add pagination navigation info
+ if pagination_info and pagination_info["total_pages"] > 1:
+ table_html += f"""
+
+
+ Showing {pagination_info['showing_start']}-{pagination_info['showing_end']}
+ of {pagination_info['total_jobs']} jobs
+
+
+ Navigation:
+ """
+
+ if pagination_info['current_page'] > 1:
+ table_html += f" RayJob.List(page={pagination_info['current_page'] - 1}) [Previous]"
+ if pagination_info['current_page'] < pagination_info['total_pages']:
+ if pagination_info['current_page'] > 1:
+ table_html += " |"
+ table_html += f" RayJob.List(page={pagination_info['current_page'] + 1}) [Next]"
+
+ table_html += """
+
+
+ """
+
+ table_html += "
"
+
+ # Display the widget
+ jobs_widget = widgets.HTML(value=table_html)
+ display(jobs_widget)
+
+
def _map_to_codeflare_status(
self, deployment_status: RayJobDeploymentStatus
) -> Tuple[CodeflareRayJobStatus, bool]:
diff --git a/src/codeflare_sdk/ray/rayjobs/status.py b/src/codeflare_sdk/ray/rayjobs/status.py
index 027ed09c..11f99922 100644
--- a/src/codeflare_sdk/ray/rayjobs/status.py
+++ b/src/codeflare_sdk/ray/rayjobs/status.py
@@ -47,6 +47,22 @@ class CodeflareRayJobStatus(Enum):
UNKNOWN = 5
+@dataclass
+class KueueWorkloadInfo:
+ """
+ For storing information about a Kueue workload associated with a RayJob or RayCluster.
+ """
+
+ name: str
+ queue_name: str = "" # LocalQueue name (optional for RayClusters)
+ status: str = "Unknown" # e.g., "Pending", "Admitted", "Finished"
+ priority: Optional[int] = None
+ creation_time: Optional[str] = None
+ admission_time: Optional[str] = None
+ error_message: Optional[str] = None
+ error_reason: Optional[str] = None
+
+
@dataclass
class RayJobInfo:
"""
@@ -62,3 +78,8 @@ class RayJobInfo:
end_time: Optional[str] = None
failed_attempts: int = 0
succeeded_attempts: int = 0
+ # Kueue integration
+ kueue_workload: Optional[KueueWorkloadInfo] = None
+ local_queue: Optional[str] = None
+ # Cluster management
+ is_managed_cluster: bool = False