# dapi/systems.py
import pandas as pd
from tapipy.tapis import Tapis
from tapipy.errors import BaseTapyException, UnauthorizedError, NotFoundError
from typing import Dict, List, Any, Optional, Union
from .exceptions import SystemInfoError, CredentialError
# Known DesignSafe system categories
_KNOWN_HPC = {"stampede3", "frontera", "ls6", "vista"}
_KNOWN_STORAGE = {
"designsafe.storage.default",
"designsafe.storage.community",
"designsafe.storage.published",
"nees.public",
}
_INTERNAL_PREFIXES = ("project-", "apcd.", "wma-", "ds-stko", "cloud.data", "c4-")
_DUPLICATE_SUFFIXES = (".tms", ".designsafe", "-simcenter")
_STORAGE_PREFIXES = ("designsafe.storage.",)
def list_systems(
t: Tapis,
category: Optional[str] = None,
output: str = "df",
) -> Union[pd.DataFrame, List[Dict]]:
"""List Tapis systems the user has access to.
Filters out internal, duplicate, and project-specific systems by default,
showing only the systems useful for job submission and data access.
Args:
t (Tapis): Authenticated Tapis client instance.
category (str, optional): Filter by category:
"hpc" for execution systems (stampede3, frontera, ls6, vista),
"storage" for storage systems (MyData, CommunityData, etc.),
"all" for all systems without filtering.
If None, shows HPC + storage (excludes internal/project systems).
output (str, optional): "df" for DataFrame (default), "list" for dicts.
Returns:
Union[pd.DataFrame, List[Dict]]: Systems with id, host, type, category, credentials.
Raises:
SystemInfoError: If the API request fails.
ValueError: If output or category is invalid.
"""
if output not in ("df", "list"):
raise ValueError(f"output must be 'df' or 'list', got '{output}'")
if category is not None and category not in ("hpc", "storage", "all"):
raise ValueError(
f"category must be 'hpc', 'storage', 'all', or None, got '{category}'"
)
try:
all_systems = t.systems.getSystems(listType="ALL", limit=200)
except BaseTapyException as e:
raise SystemInfoError(f"Failed to list systems: {e}") from e
username = getattr(t, "username", None)
rows = []
for s in all_systems:
sid = s.id
host = getattr(s, "host", "")
can_exec = getattr(s, "canExec", False)
authn = getattr(s, "defaultAuthnMethod", "")
# Classify
if sid in _KNOWN_HPC:
cat = "hpc"
elif sid in _KNOWN_STORAGE:
cat = "storage"
elif (
any(sid.startswith(pfx) for pfx in _STORAGE_PREFIXES)
and sid not in _KNOWN_STORAGE
):
cat = "internal"
elif sid.startswith("project-"):
cat = "project"
elif any(sid.endswith(sfx) for sfx in _DUPLICATE_SUFFIXES):
cat = "internal"
elif any(sid.startswith(pfx) for pfx in _INTERNAL_PREFIXES):
cat = "internal"
elif sid == "maverick2":
cat = "internal"
elif can_exec:
cat = "hpc"
else:
cat = "other"
# Filter
if category == "hpc" and cat != "hpc":
continue
if category == "storage" and cat != "storage":
continue
if category is None and cat not in ("hpc", "storage"):
continue
# category == "all" shows everything
# Check TMS credentials for HPC systems
has_creds = None
if cat == "hpc" and authn == "TMS_KEYS" and username:
try:
has_creds = check_credentials(t, sid, username)
except Exception:
has_creds = None
rows.append(
{
"id": sid,
"host": host,
"category": cat,
"authn": authn,
"credentials": has_creds,
}
)
if output == "list":
return rows
return pd.DataFrame(rows)
[docs]
def list_system_queues(
t: Tapis,
system_id: str,
output: str = "df",
) -> Union[pd.DataFrame, List[Any]]:
"""List batch queues available on a Tapis execution system.
Args:
t (Tapis): Authenticated Tapis client instance.
system_id (str): The ID of the execution system (e.g., "stampede3").
output (str, optional): "df" for DataFrame (default), "raw" for Tapis objects.
Returns:
Union[pd.DataFrame, List]: Queues with name, maxNodes, maxMinutes, maxCoresPerNode, etc.
Raises:
SystemInfoError: If the system is not found or an API error occurs.
ValueError: If system_id is empty or output is invalid.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
if output not in ("df", "raw"):
raise ValueError(f"output must be 'df' or 'raw', got '{output}'")
try:
system_details = t.systems.getSystem(systemId=system_id)
queues = getattr(system_details, "batchLogicalQueues", [])
if not queues:
if output == "raw":
return []
return pd.DataFrame()
if output == "raw":
return queues
rows = []
for q in queues:
rows.append(
{
"name": getattr(q, "name", ""),
"hpcQueue": getattr(q, "hpcQueueName", ""),
"maxNodes": getattr(q, "maxNodeCount", None),
"maxCoresPerNode": getattr(q, "maxCoresPerNode", None),
"maxMinutes": getattr(q, "maxMinutes", None),
"maxMemoryMB": getattr(q, "maxMemoryMB", None),
"maxJobsPerUser": getattr(q, "maxJobsPerUser", None),
}
)
return pd.DataFrame(rows)
except BaseTapyException as e:
if hasattr(e, "response") and e.response and e.response.status_code == 404:
raise SystemInfoError(f"Execution system '{system_id}' not found.") from e
raise SystemInfoError(
f"Failed to retrieve queues for system '{system_id}': {e}"
) from e
def _resolve_username(t: Tapis, username: Optional[str] = None) -> str:
"""Resolve the effective username from an explicit parameter or the Tapis client.
Args:
t: Authenticated Tapis client instance.
username: Explicit username. If None, falls back to t.username.
Returns:
The resolved username string.
Raises:
ValueError: If username cannot be determined from either source.
"""
effective = username or getattr(t, "username", None)
if not effective:
raise ValueError(
"Username must be provided or available on the Tapis client (t.username)."
)
return effective
[docs]
def check_credentials(t: Tapis, system_id: str, username: Optional[str] = None) -> bool:
"""Check whether TMS credentials exist for a user on a Tapis system.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3').
username: The username to check. If None, auto-detected from t.username.
Returns:
True if credentials exist, False if they do not.
Raises:
ValueError: If system_id is empty or username cannot be determined.
CredentialError: If an unexpected API error occurs during the check.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
effective_username = _resolve_username(t, username)
try:
t.systems.checkUserCredential(systemId=system_id, userName=effective_username)
return True
except (UnauthorizedError, NotFoundError):
return False
except BaseTapyException as e:
raise CredentialError(
f"Failed to check credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
except Exception as e:
raise CredentialError(
f"Unexpected error checking credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
[docs]
def establish_credentials(
t: Tapis,
system_id: str,
username: Optional[str] = None,
force: bool = False,
verbose: bool = True,
) -> None:
"""Establish TMS credentials for a user on a Tapis system.
Idempotent: if credentials already exist and force is False, no action is taken.
Only systems with defaultAuthnMethod 'TMS_KEYS' are supported.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3').
username: The username. If None, auto-detected from t.username.
force: If True, create credentials even if they already exist.
verbose: If True, prints status messages.
Raises:
ValueError: If system_id is empty or username cannot be determined.
CredentialError: If the system does not use TMS_KEYS, if the system is
not found, or if credential creation fails.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
effective_username = _resolve_username(t, username)
# Verify system exists and uses TMS_KEYS authentication
try:
system_details = t.systems.getSystem(systemId=system_id)
authn_method = getattr(system_details, "defaultAuthnMethod", None)
except BaseTapyException as e:
if hasattr(e, "response") and e.response and e.response.status_code == 404:
raise CredentialError(f"System '{system_id}' not found.") from e
raise CredentialError(f"Failed to retrieve system '{system_id}': {e}") from e
if authn_method != "TMS_KEYS":
raise CredentialError(
f"System '{system_id}' uses authentication method '{authn_method}', "
f"not 'TMS_KEYS'. TMS credential management is only supported "
f"for TMS_KEYS systems."
)
# Check existing credentials unless force is True
if not force:
if check_credentials(t, system_id, effective_username):
if verbose:
print(
f"Credentials already exist for user '{effective_username}' "
f"on system '{system_id}'. No action taken."
)
return
# Create credentials
try:
t.systems.createUserCredential(
systemId=system_id,
userName=effective_username,
createTmsKeys=True,
)
if verbose:
print(
f"TMS credentials established for user '{effective_username}' "
f"on system '{system_id}'."
)
except BaseTapyException as e:
raise CredentialError(
f"Failed to create credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
except Exception as e:
raise CredentialError(
f"Unexpected error creating credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
[docs]
def revoke_credentials(
t: Tapis,
system_id: str,
username: Optional[str] = None,
verbose: bool = True,
) -> None:
"""Remove TMS credentials for a user on a Tapis system.
Idempotent: if credentials do not exist, no error is raised.
Args:
t: Authenticated Tapis client instance.
system_id: The ID of the Tapis system (e.g., 'frontera', 'stampede3').
username: The username. If None, auto-detected from t.username.
verbose: If True, prints status messages.
Raises:
ValueError: If system_id is empty or username cannot be determined.
CredentialError: If credential removal fails unexpectedly.
"""
if not system_id:
raise ValueError("system_id cannot be empty.")
effective_username = _resolve_username(t, username)
try:
t.systems.removeUserCredential(systemId=system_id, userName=effective_username)
if verbose:
print(
f"Credentials revoked for user '{effective_username}' "
f"on system '{system_id}'."
)
except (UnauthorizedError, NotFoundError):
if verbose:
print(
f"No credentials found for user '{effective_username}' "
f"on system '{system_id}'. No action taken."
)
except BaseTapyException as e:
raise CredentialError(
f"Failed to revoke credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
except Exception as e:
raise CredentialError(
f"Unexpected error revoking credentials for user '{effective_username}' "
f"on system '{system_id}': {e}"
) from e
# Default TACC execution systems that use TMS_KEYS
TACC_SYSTEMS = ["frontera", "stampede3", "ls6"]
[docs]
def setup_tms_credentials(
t: Tapis,
systems: Optional[List[str]] = None,
) -> Dict[str, str]:
"""Check and establish TMS credentials on execution systems.
For each system, checks if credentials exist and creates them if missing.
Failures are handled gracefully — a system that can't be reached or where
the user lacks an allocation is skipped with a warning.
Args:
t: Authenticated Tapis client instance.
systems: List of system IDs to set up. Defaults to TACC_SYSTEMS
(frontera, stampede3, ls6).
Returns:
Dict mapping system_id to status: "ready", "created", or "skipped".
"""
if systems is None:
systems = TACC_SYSTEMS
username = getattr(t, "username", None)
if not username:
print("Warning: Could not determine username. Skipping TMS setup.")
return {s: "skipped" for s in systems}
results = {}
for system_id in systems:
try:
# Check if system uses TMS_KEYS
system_details = t.systems.getSystem(systemId=system_id)
authn_method = getattr(system_details, "defaultAuthnMethod", None)
if authn_method != "TMS_KEYS":
results[system_id] = "skipped"
continue
# Check existing credentials
if check_credentials(t, system_id, username):
results[system_id] = "ready"
continue
# Try to create credentials
t.systems.createUserCredential(
systemId=system_id,
userName=username,
createTmsKeys=True,
)
results[system_id] = "created"
except Exception:
results[system_id] = "skipped"
# Print summary
ready = [s for s, v in results.items() if v in ("ready", "created")]
created = [s for s, v in results.items() if v == "created"]
skipped = [s for s, v in results.items() if v == "skipped"]
if ready:
msg = f"TMS credentials ready: {', '.join(ready)}"
if created:
msg += f" (newly created: {', '.join(created)})"
print(msg)
if skipped:
print(f"TMS credentials skipped: {', '.join(skipped)}")
return results