# dapi/systems.py
from tapipy.tapis import Tapis
from tapipy.errors import BaseTapyException, UnauthorizedError, NotFoundError
from typing import Dict, List, Any, Optional
from .exceptions import SystemInfoError, CredentialError
[docs]
def list_system_queues(t: Tapis, system_id: str, verbose: bool = True) -> List[Any]:
"""
Retrieves the list of batch logical queues available on a specific Tapis execution system.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the execution system (e.g., 'frontera', 'stampede2').
verbose: If True, prints the found queues.
Returns:
A list of queue objects (typically TapisResult instances or similar dict-like structures)
defined for the system. Returns an empty list if the system exists but has no queues defined.
Raises:
SystemInfoError: If the system is not found or an API error occurs.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
try:
if verbose:
print(f"\nFetching queue information for system '{system_id}'...")
# Get system details - Fetch the full object to ensure queues are included
# Removed 'select' parameter for simplicity and robustness against API variations
system_details = t.systems.getSystem(systemId=system_id)
# Use 'batchLogicalQueues' based on the direct API call result
queues = getattr(system_details, "batchLogicalQueues", [])
if not queues:
# Check if the system itself was found but just has no queues
try:
# Minimal check to confirm system existence if queues list was empty
# This might be slightly redundant if getSystem above succeeded, but safe.
t.systems.getSystem(systemId=system_id, select="id")
if verbose:
# Updated message
print(
f"System '{system_id}' found, but it has no batch logical queues defined."
)
return [] # Return empty list as system exists but has no queues
except BaseTapyException as e_check:
# If this minimal check fails with 404, the system wasn't found initially
if (
hasattr(e_check, "response")
and e_check.response
and e_check.response.status_code == 404
):
raise SystemInfoError(
f"Execution system '{system_id}' not found."
) from e_check
else: # Other error during the existence check
raise SystemInfoError(
f"Error confirming existence of system '{system_id}': {e_check}"
) from e_check
if verbose:
# Updated message
print(f"Found {len(queues)} batch logical queues for system '{system_id}':")
for q in queues:
name = getattr(q, "name", "N/A")
hpc_queue = getattr(
q, "hpcQueueName", "N/A"
) # Actual scheduler queue name
max_jobs = getattr(q, "maxJobs", "N/A")
max_user_jobs = getattr(q, "maxUserJobs", "N/A")
max_mins = getattr(q, "maxMinutes", "N/A")
max_nodes = getattr(q, "maxNodeCount", "N/A")
# Add more attributes if desired (e.g., maxMemoryMB, maxCoresPerNode)
print(
f" - Name: {name} (HPC Queue: {hpc_queue}, Max Jobs: {max_jobs}, Max User Jobs: {max_user_jobs}, Max Mins: {max_mins}, Max Nodes: {max_nodes})"
)
print()
# The items in the list are TapisResult objects themselves
return queues
except BaseTapyException as e:
if hasattr(e, "response") and e.response and e.response.status_code == 404:
raise SystemInfoError(f"Execution system '{system_id}' not found.") from e
else:
raise SystemInfoError(
f"Failed to retrieve queues for system '{system_id}': {e}"
) from e
except Exception as e:
raise SystemInfoError(
f"An unexpected error occurred while fetching queues for system '{system_id}': {e}"
) from e
def _resolve_username(t: Tapis, username: Optional[str] = None) -> str:
"""Resolve the effective username from an explicit parameter or the Tapis client.
Args:
t: Authenticated Tapis client instance.
username: Explicit username. If None, falls back to t.username.
Returns:
The resolved username string.
Raises:
ValueError: If username cannot be determined from either source.
"""
effective = username or getattr(t, "username", None)
if not effective:
raise ValueError(
"Username must be provided or available on the Tapis client (t.username)."
)
return effective
[docs]
def check_credentials(t: Tapis, system_id: str, username: Optional[str] = None) -> bool:
"""Check whether TMS credentials exist for a user on a Tapis system.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3').
username: The username to check. If None, auto-detected from t.username.
Returns:
True if credentials exist, False if they do not.
Raises:
ValueError: If system_id is empty or username cannot be determined.
CredentialError: If an unexpected API error occurs during the check.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
effective_username = _resolve_username(t, username)
try:
t.systems.checkUserCredential(systemId=system_id, userName=effective_username)
return True
except (UnauthorizedError, NotFoundError):
return False
except BaseTapyException as e:
raise CredentialError(
f"Failed to check credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
except Exception as e:
raise CredentialError(
f"Unexpected error checking credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
[docs]
def establish_credentials(
t: Tapis,
system_id: str,
username: Optional[str] = None,
force: bool = False,
verbose: bool = True,
) -> None:
"""Establish TMS credentials for a user on a Tapis system.
Idempotent: if credentials already exist and force is False, no action is taken.
Only systems with defaultAuthnMethod 'TMS_KEYS' are supported.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3').
username: The username. If None, auto-detected from t.username.
force: If True, create credentials even if they already exist.
verbose: If True, prints status messages.
Raises:
ValueError: If system_id is empty or username cannot be determined.
CredentialError: If the system does not use TMS_KEYS, if the system is
not found, or if credential creation fails.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
effective_username = _resolve_username(t, username)
# Verify system exists and uses TMS_KEYS authentication
try:
system_details = t.systems.getSystem(systemId=system_id)
authn_method = getattr(system_details, "defaultAuthnMethod", None)
except BaseTapyException as e:
if hasattr(e, "response") and e.response and e.response.status_code == 404:
raise CredentialError(f"System '{system_id}' not found.") from e
raise CredentialError(f"Failed to retrieve system '{system_id}': {e}") from e
if authn_method != "TMS_KEYS":
raise CredentialError(
f"System '{system_id}' uses authentication method '{authn_method}', "
f"not 'TMS_KEYS'. TMS credential management is only supported "
f"for TMS_KEYS systems."
)
# Check existing credentials unless force is True
if not force:
if check_credentials(t, system_id, effective_username):
if verbose:
print(
f"Credentials already exist for user '{effective_username}' "
f"on system '{system_id}'. No action taken."
)
return
# Create credentials
try:
t.systems.createUserCredential(
systemId=system_id,
userName=effective_username,
createTmsKeys=True,
)
if verbose:
print(
f"TMS credentials established for user '{effective_username}' "
f"on system '{system_id}'."
)
except BaseTapyException as e:
raise CredentialError(
f"Failed to create credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
except Exception as e:
raise CredentialError(
f"Unexpected error creating credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
[docs]
def revoke_credentials(
t: Tapis,
system_id: str,
username: Optional[str] = None,
verbose: bool = True,
) -> None:
"""Remove TMS credentials for a user on a Tapis system.
Idempotent: if credentials do not exist, no error is raised.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3').
username: The username. If None, auto-detected from t.username.
verbose: If True, prints status messages.
Raises:
ValueError: If system_id is empty or username cannot be determined.
CredentialError: If credential removal fails unexpectedly.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
effective_username = _resolve_username(t, username)
try:
t.systems.removeUserCredential(systemId=system_id, userName=effective_username)
if verbose:
print(
f"Credentials revoked for user '{effective_username}' "
f"on system '{system_id}'."
)
except (UnauthorizedError, NotFoundError):
if verbose:
print(
f"No credentials found for user '{effective_username}' "
f"on system '{system_id}'. No action taken."
)
except BaseTapyException as e:
raise CredentialError(
f"Failed to revoke credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
except Exception as e:
raise CredentialError(
f"Unexpected error revoking credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
# Default TACC execution systems that use TMS_KEYS
TACC_SYSTEMS = ["frontera", "stampede3", "ls6"]
[docs]
def setup_tms_credentials(
t: Tapis,
systems: Optional[List[str]] = None,
) -> Dict[str, str]:
"""Check and establish TMS credentials on execution systems.
For each system, checks if credentials exist and creates them if missing.
Failures are handled gracefully — a system that can't be reached or where
the user lacks an allocation is skipped with a warning.
Args:
t: Authenticated Tapis client instance.
systems: List of system IDs to set up. Defaults to TACC_SYSTEMS
(frontera, stampede3, ls6).
Returns:
Dict mapping system_id to status: "ready", "created", or "skipped".
"""
if systems is None:
systems = TACC_SYSTEMS
username = getattr(t, "username", None)
if not username:
print("Warning: Could not determine username. Skipping TMS setup.")
return {s: "skipped" for s in systems}
results = {}
for system_id in systems:
try:
# Check if system uses TMS_KEYS
system_details = t.systems.getSystem(systemId=system_id)
authn_method = getattr(system_details, "defaultAuthnMethod", None)
if authn_method != "TMS_KEYS":
results[system_id] = "skipped"
continue
# Check existing credentials
if check_credentials(t, system_id, username):
results[system_id] = "ready"
continue
# Try to create credentials
t.systems.createUserCredential(
systemId=system_id,
userName=username,
createTmsKeys=True,
)
results[system_id] = "created"
except Exception:
results[system_id] = "skipped"
# Print summary
ready = [s for s, v in results.items() if v in ("ready", "created")]
created = [s for s, v in results.items() if v == "created"]
skipped = [s for s, v in results.items() if v == "skipped"]
if ready:
msg = f"TMS credentials ready: {', '.join(ready)}"
if created:
msg += f" (newly created: {', '.join(created)})"
print(msg)
if skipped:
print(f"TMS credentials skipped: {', '.join(skipped)}")
return results