Source code for dapi.systems

# dapi/systems.py
from tapipy.tapis import Tapis
from tapipy.errors import BaseTapyException, UnauthorizedError, NotFoundError
from typing import Dict, List, Any, Optional
from .exceptions import SystemInfoError, CredentialError


[docs] def list_system_queues(t: Tapis, system_id: str, verbose: bool = True) -> List[Any]: """ Retrieves the list of batch logical queues available on a specific Tapis execution system. Args: t: Authenticated Tapis client instance. system_id: The ID of the execution system (e.g., 'frontera', 'stampede2'). verbose: If True, prints the found queues. Returns: A list of queue objects (typically TapisResult instances or similar dict-like structures) defined for the system. Returns an empty list if the system exists but has no queues defined. Raises: SystemInfoError: If the system is not found or an API error occurs. """ if not system_id: raise ValueError("system_id cannot be empty.") try: if verbose: print(f"\nFetching queue information for system '{system_id}'...") # Get system details - Fetch the full object to ensure queues are included # Removed 'select' parameter for simplicity and robustness against API variations system_details = t.systems.getSystem(systemId=system_id) # Use 'batchLogicalQueues' based on the direct API call result queues = getattr(system_details, "batchLogicalQueues", []) if not queues: # Check if the system itself was found but just has no queues try: # Minimal check to confirm system existence if queues list was empty # This might be slightly redundant if getSystem above succeeded, but safe. t.systems.getSystem(systemId=system_id, select="id") if verbose: # Updated message print( f"System '{system_id}' found, but it has no batch logical queues defined." ) return [] # Return empty list as system exists but has no queues except BaseTapyException as e_check: # If this minimal check fails with 404, the system wasn't found initially if ( hasattr(e_check, "response") and e_check.response and e_check.response.status_code == 404 ): raise SystemInfoError( f"Execution system '{system_id}' not found." ) from e_check else: # Other error during the existence check raise SystemInfoError( f"Error confirming existence of system '{system_id}': {e_check}" ) from e_check if verbose: # Updated message print(f"Found {len(queues)} batch logical queues for system '{system_id}':") for q in queues: name = getattr(q, "name", "N/A") hpc_queue = getattr( q, "hpcQueueName", "N/A" ) # Actual scheduler queue name max_jobs = getattr(q, "maxJobs", "N/A") max_user_jobs = getattr(q, "maxUserJobs", "N/A") max_mins = getattr(q, "maxMinutes", "N/A") max_nodes = getattr(q, "maxNodeCount", "N/A") # Add more attributes if desired (e.g., maxMemoryMB, maxCoresPerNode) print( f" - Name: {name} (HPC Queue: {hpc_queue}, Max Jobs: {max_jobs}, Max User Jobs: {max_user_jobs}, Max Mins: {max_mins}, Max Nodes: {max_nodes})" ) print() # The items in the list are TapisResult objects themselves return queues except BaseTapyException as e: if hasattr(e, "response") and e.response and e.response.status_code == 404: raise SystemInfoError(f"Execution system '{system_id}' not found.") from e else: raise SystemInfoError( f"Failed to retrieve queues for system '{system_id}': {e}" ) from e except Exception as e: raise SystemInfoError( f"An unexpected error occurred while fetching queues for system '{system_id}': {e}" ) from e
def _resolve_username(t: Tapis, username: Optional[str] = None) -> str: """Resolve the effective username from an explicit parameter or the Tapis client. Args: t: Authenticated Tapis client instance. username: Explicit username. If None, falls back to t.username. Returns: The resolved username string. Raises: ValueError: If username cannot be determined from either source. """ effective = username or getattr(t, "username", None) if not effective: raise ValueError( "Username must be provided or available on the Tapis client (t.username)." ) return effective
[docs] def check_credentials(t: Tapis, system_id: str, username: Optional[str] = None) -> bool: """Check whether TMS credentials exist for a user on a Tapis system. Args: t: Authenticated Tapis client instance. system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3'). username: The username to check. If None, auto-detected from t.username. Returns: True if credentials exist, False if they do not. Raises: ValueError: If system_id is empty or username cannot be determined. CredentialError: If an unexpected API error occurs during the check. """ if not system_id: raise ValueError("system_id cannot be empty.") effective_username = _resolve_username(t, username) try: t.systems.checkUserCredential(systemId=system_id, userName=effective_username) return True except (UnauthorizedError, NotFoundError): return False except BaseTapyException as e: raise CredentialError( f"Failed to check credentials for user '{effective_username}' " f"on system '{system_id}': {e}" ) from e except Exception as e: raise CredentialError( f"Unexpected error checking credentials for user '{effective_username}' " f"on system '{system_id}': {e}" ) from e
[docs] def establish_credentials( t: Tapis, system_id: str, username: Optional[str] = None, force: bool = False, verbose: bool = True, ) -> None: """Establish TMS credentials for a user on a Tapis system. Idempotent: if credentials already exist and force is False, no action is taken. Only systems with defaultAuthnMethod 'TMS_KEYS' are supported. Args: t: Authenticated Tapis client instance. system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3'). username: The username. If None, auto-detected from t.username. force: If True, create credentials even if they already exist. verbose: If True, prints status messages. Raises: ValueError: If system_id is empty or username cannot be determined. CredentialError: If the system does not use TMS_KEYS, if the system is not found, or if credential creation fails. """ if not system_id: raise ValueError("system_id cannot be empty.") effective_username = _resolve_username(t, username) # Verify system exists and uses TMS_KEYS authentication try: system_details = t.systems.getSystem(systemId=system_id) authn_method = getattr(system_details, "defaultAuthnMethod", None) except BaseTapyException as e: if hasattr(e, "response") and e.response and e.response.status_code == 404: raise CredentialError(f"System '{system_id}' not found.") from e raise CredentialError(f"Failed to retrieve system '{system_id}': {e}") from e if authn_method != "TMS_KEYS": raise CredentialError( f"System '{system_id}' uses authentication method '{authn_method}', " f"not 'TMS_KEYS'. TMS credential management is only supported " f"for TMS_KEYS systems." ) # Check existing credentials unless force is True if not force: if check_credentials(t, system_id, effective_username): if verbose: print( f"Credentials already exist for user '{effective_username}' " f"on system '{system_id}'. No action taken." ) return # Create credentials try: t.systems.createUserCredential( systemId=system_id, userName=effective_username, createTmsKeys=True, ) if verbose: print( f"TMS credentials established for user '{effective_username}' " f"on system '{system_id}'." ) except BaseTapyException as e: raise CredentialError( f"Failed to create credentials for user '{effective_username}' " f"on system '{system_id}': {e}" ) from e except Exception as e: raise CredentialError( f"Unexpected error creating credentials for user '{effective_username}' " f"on system '{system_id}': {e}" ) from e
[docs] def revoke_credentials( t: Tapis, system_id: str, username: Optional[str] = None, verbose: bool = True, ) -> None: """Remove TMS credentials for a user on a Tapis system. Idempotent: if credentials do not exist, no error is raised. Args: t: Authenticated Tapis client instance. system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3'). username: The username. If None, auto-detected from t.username. verbose: If True, prints status messages. Raises: ValueError: If system_id is empty or username cannot be determined. CredentialError: If credential removal fails unexpectedly. """ if not system_id: raise ValueError("system_id cannot be empty.") effective_username = _resolve_username(t, username) try: t.systems.removeUserCredential(systemId=system_id, userName=effective_username) if verbose: print( f"Credentials revoked for user '{effective_username}' " f"on system '{system_id}'." ) except (UnauthorizedError, NotFoundError): if verbose: print( f"No credentials found for user '{effective_username}' " f"on system '{system_id}'. No action taken." ) except BaseTapyException as e: raise CredentialError( f"Failed to revoke credentials for user '{effective_username}' " f"on system '{system_id}': {e}" ) from e except Exception as e: raise CredentialError( f"Unexpected error revoking credentials for user '{effective_username}' " f"on system '{system_id}': {e}" ) from e
# Default TACC execution systems that use TMS_KEYS TACC_SYSTEMS = ["frontera", "stampede3", "ls6"]
[docs] def setup_tms_credentials( t: Tapis, systems: Optional[List[str]] = None, ) -> Dict[str, str]: """Check and establish TMS credentials on execution systems. For each system, checks if credentials exist and creates them if missing. Failures are handled gracefully — a system that can't be reached or where the user lacks an allocation is skipped with a warning. Args: t: Authenticated Tapis client instance. systems: List of system IDs to set up. Defaults to TACC_SYSTEMS (frontera, stampede3, ls6). Returns: Dict mapping system_id to status: "ready", "created", or "skipped". """ if systems is None: systems = TACC_SYSTEMS username = getattr(t, "username", None) if not username: print("Warning: Could not determine username. Skipping TMS setup.") return {s: "skipped" for s in systems} results = {} for system_id in systems: try: # Check if system uses TMS_KEYS system_details = t.systems.getSystem(systemId=system_id) authn_method = getattr(system_details, "defaultAuthnMethod", None) if authn_method != "TMS_KEYS": results[system_id] = "skipped" continue # Check existing credentials if check_credentials(t, system_id, username): results[system_id] = "ready" continue # Try to create credentials t.systems.createUserCredential( systemId=system_id, userName=username, createTmsKeys=True, ) results[system_id] = "created" except Exception: results[system_id] = "skipped" # Print summary ready = [s for s, v in results.items() if v in ("ready", "created")] created = [s for s, v in results.items() if v == "created"] skipped = [s for s, v in results.items() if v == "skipped"] if ready: msg = f"TMS credentials ready: {', '.join(ready)}" if created: msg += f" (newly created: {', '.join(created)})" print(msg) if skipped: print(f"TMS credentials skipped: {', '.join(skipped)}") return results