Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/codeflare_sdk/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
KueueWorkloadInfo,
)

from .cluster import (
Expand All @@ -21,4 +22,4 @@
RayClusterStatus,
CodeFlareClusterStatus,
RayCluster,
)
)
632 changes: 625 additions & 7 deletions src/codeflare_sdk/ray/cluster/cluster.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/codeflare_sdk/ray/cluster/pretty_print.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,23 @@ def print_clusters(clusters: List[RayCluster]):
) # format that is used to generate the name of the service
table0.add_row()
table0.add_row(f"[link={dashboard} blue underline]Dashboard:link:[/link]")

# Add Kueue information if available (for any cluster, not just AppWrapper-managed)
if cluster.kueue_workload:
table0.add_row()
table0.add_row("[bold blue]🎯 Kueue Integration[/bold blue]")
table0.add_row(f"[bold]Local Queue:[/bold] {cluster.local_queue or 'N/A'}")
table0.add_row(f"[bold]Workload Status:[/bold] [bold green]{cluster.kueue_workload.status}[/bold green]")
table0.add_row(f"[bold]Workload Name:[/bold] {cluster.kueue_workload.name}")
if cluster.kueue_workload.priority is not None:
table0.add_row(f"[bold]Priority:[/bold] {cluster.kueue_workload.priority}")
if cluster.kueue_workload.admission_time:
table0.add_row(f"[bold]Admitted:[/bold] {cluster.kueue_workload.admission_time}")
if cluster.kueue_workload.error_message:
table0.add_row(f"[bold red]Error:[/bold red] {cluster.kueue_workload.error_reason}: {cluster.kueue_workload.error_message}")
if cluster.is_appwrapper:
table0.add_row(f"[bold]AppWrapper Managed:[/bold] [green]Yes[/green]")

table0.add_row("") # empty row for spacing

#'table1' to display the worker counts
Expand Down
28 changes: 27 additions & 1 deletion src/codeflare_sdk/ray/cluster/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from dataclasses import dataclass, field
from enum import Enum
import typing
from typing import Union
from typing import Union, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from ..rayjobs.status import KueueWorkloadInfo


class RayClusterStatus(Enum):
Expand Down Expand Up @@ -55,6 +58,26 @@ class CodeFlareClusterStatus(Enum):
class RayCluster:
"""
For storing information about a Ray cluster.

Attributes:
name: Name of the RayCluster
status: Current status of the cluster
head_cpu_requests: CPU requests for head node
head_cpu_limits: CPU limits for head node
head_mem_requests: Memory requests for head node
head_mem_limits: Memory limits for head node
num_workers: Number of worker nodes
worker_mem_requests: Memory requests per worker
worker_mem_limits: Memory limits per worker
worker_cpu_requests: CPU requests per worker
worker_cpu_limits: CPU limits per worker
namespace: Kubernetes namespace
dashboard: Dashboard URL
worker_extended_resources: Extended resources for workers (e.g., GPUs)
head_extended_resources: Extended resources for head node
local_queue: Kueue LocalQueue name (if managed by Kueue)
kueue_workload: Kueue workload information (if managed by Kueue)
is_appwrapper: Whether cluster is managed by AppWrapper
"""

name: str
Expand All @@ -72,3 +95,6 @@ class RayCluster:
dashboard: str
worker_extended_resources: typing.Dict[str, int] = field(default_factory=dict)
head_extended_resources: typing.Dict[str, int] = field(default_factory=dict)
local_queue: Optional[str] = None
kueue_workload: Optional["KueueWorkloadInfo"] = None
is_appwrapper: bool = False
4 changes: 2 additions & 2 deletions src/codeflare_sdk/ray/rayjobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .rayjob import RayJob, ManagedClusterConfig
from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo
from .config import ManagedClusterConfig
from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo, KueueWorkloadInfo
from .config import ManagedClusterConfig
181 changes: 179 additions & 2 deletions src/codeflare_sdk/ray/rayjobs/pretty_print.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from typing import Tuple, Optional
from typing import Tuple, Optional, List

from .status import RayJobDeploymentStatus, RayJobInfo
from .status import RayJobDeploymentStatus, RayJobInfo, KueueWorkloadInfo


def print_job_status(job_info: RayJobInfo):
Expand All @@ -37,6 +37,10 @@ def print_job_status(job_info: RayJobInfo):
table.add_row(f"[bold]Status:[/bold] {job_info.status.value}")
table.add_row(f"[bold]RayCluster:[/bold] {job_info.cluster_name}")
table.add_row(f"[bold]Namespace:[/bold] {job_info.namespace}")

# Add cluster management info
managed_text = "[bold green]Yes (Job-managed)[/bold green]" if job_info.is_managed_cluster else "[dim]No (Existing cluster)[/dim]"
table.add_row(f"[bold]Managed Cluster:[/bold] {managed_text}")

# Add timing information if available
if job_info.start_time:
Expand All @@ -47,6 +51,23 @@ def print_job_status(job_info: RayJobInfo):
if job_info.failed_attempts > 0:
table.add_row(f"[bold]Failed Attempts:[/bold] {job_info.failed_attempts}")

# Add Kueue information if available
if job_info.kueue_workload:
table.add_row()
table.add_row("[bold blue]🎯 Kueue Integration[/bold blue]")
table.add_row(f"[bold]Local Queue:[/bold] {job_info.local_queue}")
table.add_row(f"[bold]Workload Status:[/bold] [bold green]{job_info.kueue_workload.status}[/bold green]")
table.add_row(f"[bold]Workload Name:[/bold] {job_info.kueue_workload.name}")
if job_info.kueue_workload.priority is not None:
table.add_row(f"[bold]Priority:[/bold] {job_info.kueue_workload.priority}")
if job_info.kueue_workload.admission_time:
table.add_row(f"[bold]Admitted:[/bold] {job_info.kueue_workload.admission_time}")
elif job_info.local_queue:
table.add_row()
table.add_row("[bold blue]🎯 Kueue Integration[/bold blue]")
table.add_row(f"[bold]Local Queue:[/bold] {job_info.local_queue}")
table.add_row("[dim]Workload information not available[/dim]")

_print_table_in_panel(table)


Expand All @@ -66,6 +87,162 @@ def print_no_job_found(job_name: str, namespace: str):
_print_table_in_panel(table)


def print_jobs_list(job_list: List[RayJobInfo], namespace: str, pagination_info: Optional[dict] = None):
"""
Pretty print a list of RayJobs using Rich formatting with pagination support.

Args:
job_list: List of RayJobInfo objects (for current page)
namespace: Kubernetes namespace
pagination_info: Optional pagination information dict
"""
if not job_list:
# Create table for no jobs found
table = _create_info_table(
"[white on yellow][bold]Namespace", namespace, "[bold yellow]No RayJobs found"
)
table.add_row()
table.add_row("No RayJobs found in this namespace.")
table.add_row("Jobs may have been deleted or completed with TTL cleanup.")
_print_table_in_panel(table)
return

# Create main table for job list
console = Console()

# Create title with pagination info
title = f"[bold blue]🚀 RayJobs in namespace: {namespace}[/bold blue]"
if pagination_info and pagination_info["total_pages"] > 1:
title += f" [dim](Page {pagination_info['current_page']} of {pagination_info['total_pages']})[/dim]"

# Create jobs table with responsive width
jobs_table = Table(
title=title,
show_header=True,
header_style="bold magenta",
border_style="blue",
expand=True, # Allow table to expand to terminal width
min_width=120, # Minimum width for readability
)

# Add columns with flexible width allocation and wrapping
jobs_table.add_column("Status", style="bold", min_width=12, max_width=16)
jobs_table.add_column("Job Name", style="bold cyan", min_width=15, max_width=30)
jobs_table.add_column("Job ID", style="dim", min_width=12, max_width=25)
jobs_table.add_column("Cluster", min_width=12, max_width=25)
jobs_table.add_column("Managed", min_width=8, max_width=12)
jobs_table.add_column("Queue", min_width=10, max_width=18)
jobs_table.add_column("Kueue Status", min_width=8, max_width=15)
jobs_table.add_column("Start Time", style="dim", min_width=8, max_width=12)

# Add rows for each job
for job_info in job_list:
status_display, _ = _get_status_display(job_info.status)

# Format start time more compactly
if job_info.start_time:
try:
# Extract just time portion and make it compact
start_time = job_info.start_time.split('T')[1][:8] # HH:MM:SS
except (IndexError, AttributeError):
start_time = "N/A"
else:
start_time = "N/A"

# Truncate long values intelligently (Rich will handle further truncation if needed)
job_name = _truncate_text(job_info.name, 28)
job_id = _truncate_text(job_info.job_id, 23)
cluster_name = _truncate_text(job_info.cluster_name, 23)

# Cluster management info
managed_display = "✅ Yes" if job_info.is_managed_cluster else "❌ No"
managed_style = "bold green" if job_info.is_managed_cluster else "dim"

# Kueue information
queue_display = _truncate_text(job_info.local_queue or "N/A", 16)
kueue_status_display = "N/A"
kueue_status_style = "dim"

if job_info.kueue_workload:
kueue_status_display = job_info.kueue_workload.status
if job_info.kueue_workload.status == "Admitted":
kueue_status_style = "bold green"
elif job_info.kueue_workload.status == "Pending":
kueue_status_style = "bold yellow"
elif job_info.kueue_workload.status == "Finished":
kueue_status_style = "bold blue"

jobs_table.add_row(
status_display,
job_name,
job_id,
cluster_name,
f"[{managed_style}]{managed_display}[/{managed_style}]",
queue_display,
f"[{kueue_status_style}]{kueue_status_display}[/{kueue_status_style}]",
start_time,
)

# Print the table
console.print(jobs_table)

# Add pagination information
if pagination_info:
console.print() # Add spacing
if pagination_info["total_pages"] > 1:
console.print(
f"[dim]Showing {pagination_info['showing_start']}-{pagination_info['showing_end']} "
f"of {pagination_info['total_jobs']} jobs "
f"(Page {pagination_info['current_page']} of {pagination_info['total_pages']})[/dim]"
)

# Navigation hints
nav_hints = []
if pagination_info['current_page'] > 1:
nav_hints.append(f"Previous: RayJob.List(page={pagination_info['current_page'] - 1})")
if pagination_info['current_page'] < pagination_info['total_pages']:
nav_hints.append(f"Next: RayJob.List(page={pagination_info['current_page'] + 1})")

if nav_hints:
console.print(f"[dim]Navigation: {' | '.join(nav_hints)}[/dim]")

# Add summary information
kueue_jobs = [job for job in job_list if job.local_queue]
managed_jobs = [job for job in job_list if job.is_managed_cluster]

if kueue_jobs or managed_jobs:
console.print() # Add spacing
if managed_jobs:
total_managed = len(managed_jobs)
total_jobs = pagination_info["total_jobs"] if pagination_info else len(job_list)
console.print(f"[bold green]🏗️ {total_managed} of {total_jobs} jobs use job-managed clusters[/bold green]")
if kueue_jobs:
total_kueue = len(kueue_jobs)
total_jobs = pagination_info["total_jobs"] if pagination_info else len(job_list)
console.print(f"[bold blue]🎯 {total_kueue} of {total_jobs} jobs are managed by Kueue[/bold blue]")


def _truncate_text(text: str, max_length: int) -> str:
"""
Truncate text intelligently with ellipsis if needed.

Args:
text: Text to truncate
max_length: Maximum length including ellipsis

Returns:
Truncated text with ellipsis if needed
"""
if len(text) <= max_length:
return text

# Leave room for ellipsis
if max_length <= 3:
return text[:max_length]

return text[:max_length-3] + "..."


def _get_status_display(status: RayJobDeploymentStatus) -> Tuple[str, str]:
"""
Get the display string and header color for a given status.
Expand Down
Loading
Loading