Source code for dapi.jobs

# dapi/jobs.py
import time
import json
import os
from datetime import datetime
from typing import Dict, Any, Optional, List
from tapipy.tapis import Tapis
from tapipy.errors import BaseTapyException
from tqdm.auto import tqdm
import pandas as pd
from .apps import get_app_details
from .exceptions import (
    JobSubmissionError,
    JobMonitorError,
    FileOperationError,
    AppDiscoveryError,
)

# --- Module-Level Status Constants ---
STATUS_TIMEOUT = "TIMEOUT"
STATUS_INTERRUPTED = "INTERRUPTED"
STATUS_MONITOR_ERROR = "MONITOR_ERROR"
STATUS_UNKNOWN = "UNKNOWN"
TAPIS_TERMINAL_STATES = [
    "FINISHED",
    "FAILED",
    "CANCELLED",
    "STOPPED",
    "ARCHIVING_FAILED",
]


[docs] def generate_job_request( tapis_client: Tapis, app_id: str, input_dir_uri: str, script_filename: Optional[ str ] = None, # Default is None, important for apps like OpenFOAM app_version: Optional[str] = None, job_name: Optional[str] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, max_minutes: Optional[int] = None, node_count: Optional[int] = None, cores_per_node: Optional[int] = None, memory_mb: Optional[int] = None, queue: Optional[str] = None, allocation: Optional[str] = None, archive_system: Optional[str] = None, archive_path: Optional[str] = None, extra_file_inputs: Optional[List[Dict[str, Any]]] = None, extra_app_args: Optional[List[Dict[str, Any]]] = None, extra_env_vars: Optional[List[Dict[str, Any]]] = None, extra_scheduler_options: Optional[List[Dict[str, Any]]] = None, script_param_names: List[str] = ["Input Script", "Main Script", "tclScript"], input_dir_param_name: str = "Input Directory", # Caller MUST override if app uses a different name (e.g., "Case Directory") allocation_param_name: str = "TACC Allocation", ) -> Dict[str, Any]: """Generate a Tapis job request dictionary based on app definition and inputs. Creates a properly formatted job request dictionary by retrieving the specified application details and applying user-provided overrides and additional parameters. The function automatically maps the script filename (if provided) and input directory to the appropriate app parameters. It dynamically reads the app definition to detect parameter names, determines whether to use appArgs or envVariables, and automatically populates all required parameters with default values when available. Args: tapis_client (Tapis): Authenticated Tapis client instance. app_id (str): The ID of the Tapis application to use for the job. input_dir_uri (str): Tapis URI to the input directory containing job files. script_filename (str, optional): Name of the main script file to execute. If None (default), no script parameter is added. This is suitable for apps like OpenFOAM that don't take a script argument. app_version (str, optional): Specific app version to use. If None, uses latest. job_name (str, optional): Custom job name. If None, auto-generates based on app ID and timestamp. description (str, optional): Job description. If None, uses app description. tags (List[str], optional): List of tags to associate with the job. max_minutes (int, optional): Maximum runtime in minutes. Overrides app default. node_count (int, optional): Number of compute nodes. Overrides app default. cores_per_node (int, optional): Cores per node. Overrides app default. memory_mb (int, optional): Memory in MB. Overrides app default. queue (str, optional): Execution queue name. Overrides app default. allocation (str, optional): TACC allocation to charge for compute time. archive_system (str, optional): Archive system for job outputs. If "designsafe" is specified, uses "designsafe.storage.default". If None, uses app default. archive_path (str, optional): Archive directory path. Can be a full path or just a directory name in MyData (e.g., "tapis-jobs-archive"). If None and archive_system is "designsafe", defaults to "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}". extra_file_inputs (List[Dict[str, Any]], optional): Additional file inputs beyond the main input directory. extra_app_args (List[Dict[str, Any]], optional): Additional application arguments. Use for parameters expected in 'appArgs' by the Tapis app. extra_env_vars (List[Dict[str, Any]], optional): Additional environment variables. Use for parameters expected in 'envVariables' by the Tapis app (e.g., OpenFOAM solver, mesh). Each item should be a dict like {"key": "VAR_NAME", "value": "var_value"}. extra_scheduler_options (List[Dict[str, Any]], optional): Additional scheduler options. script_param_names (List[str], optional): Parameter names/keys to check for script placement if script_filename is provided. Defaults to ["Input Script", "Main Script", "tclScript"]. input_dir_param_name (str, optional): The 'name' of the fileInput in the Tapis app definition that corresponds to input_dir_uri. Defaults to "Input Directory". The function will auto-detect the correct name from the app definition. allocation_param_name (str, optional): Parameter name for TACC allocation. Defaults to "TACC Allocation". Returns: Dict[str, Any]: Complete job request dictionary ready for submission to Tapis. Raises: AppDiscoveryError: If the specified app cannot be found or details cannot be retrieved. ValueError: If required parameters are missing, invalid, or if script_filename is provided but a suitable placement (matching script_param_names) cannot be found in the app's parameterSet. JobSubmissionError: If unexpected errors occur during job request generation. """ print(f"Generating job request for app '{app_id}'...") try: app_details = get_app_details(tapis_client, app_id, app_version, verbose=False) if not app_details: raise AppDiscoveryError( f"App '{app_id}' (Version: {app_version or 'latest'}) not found." ) final_app_version = app_details.version print(f"Using App Details: {app_details.id} v{final_app_version}") job_attrs = app_details.jobAttributes param_set_def = getattr(job_attrs, "parameterSet", None) final_job_name = ( job_name or f"{app_details.id}-{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) final_description = ( description or app_details.description or f"dapi job for {app_details.id}" ) archive_system_id = None archive_system_dir = None if archive_system: if archive_system.lower() == "designsafe": archive_system_id = "designsafe.storage.default" if archive_path: if archive_path.startswith("/") or archive_path.startswith("${"): archive_system_dir = archive_path else: archive_system_dir = f"${{EffectiveUserId}}/{archive_path}/${{JobCreateDate}}/${{JobUUID}}" else: archive_system_dir = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}" else: archive_system_id = archive_system if archive_path: archive_system_dir = archive_path else: archive_system_id = getattr(job_attrs, "archiveSystemId", None) archive_system_dir = getattr(job_attrs, "archiveSystemDir", None) job_req = { "name": final_job_name, "appId": app_details.id, "appVersion": final_app_version, "description": final_description, "execSystemId": getattr(job_attrs, "execSystemId", None), "archiveSystemId": archive_system_id, **({"archiveSystemDir": archive_system_dir} if archive_system_dir else {}), "archiveOnAppError": getattr(job_attrs, "archiveOnAppError", True), "execSystemLogicalQueue": ( queue if queue is not None else getattr(job_attrs, "execSystemLogicalQueue", None) ), "nodeCount": ( node_count if node_count is not None else getattr(job_attrs, "nodeCount", None) ), "coresPerNode": ( cores_per_node if cores_per_node is not None else getattr(job_attrs, "coresPerNode", None) ), "maxMinutes": ( max_minutes if max_minutes is not None else getattr(job_attrs, "maxMinutes", None) ), "memoryMB": ( memory_mb if memory_mb is not None else getattr(job_attrs, "memoryMB", None) ), **( {"isMpi": getattr(job_attrs, "isMpi", None)} if getattr(job_attrs, "isMpi", None) is not None else {} ), **( {"cmdPrefix": getattr(job_attrs, "cmdPrefix", None)} if getattr(job_attrs, "cmdPrefix", None) is not None else {} ), **({"tags": tags or []}), "fileInputs": [], "parameterSet": {"appArgs": [], "envVariables": [], "schedulerOptions": []}, } # --- Handle main input directory --- # Automatically detect the correct input directory parameter name from app definition main_input_target_path = None main_input_automount = True found_input_def = False actual_input_param_name = input_dir_param_name # Default fallback if hasattr(job_attrs, "fileInputs") and job_attrs.fileInputs: # First try to find exact match with provided name for fi_def in job_attrs.fileInputs: if getattr(fi_def, "name", "").lower() == input_dir_param_name.lower(): main_input_target_path = getattr(fi_def, "targetPath", None) main_input_automount = getattr(fi_def, "autoMountLocal", True) actual_input_param_name = getattr(fi_def, "name", "") found_input_def = True print( f"Found exact match for input parameter: '{actual_input_param_name}'" ) break # If no exact match found, try to auto-detect common input directory names if not found_input_def: common_input_names = [ "Input Directory", "Case Directory", "inputDirectory", "inputDir", ] for fi_def in job_attrs.fileInputs: fi_name = getattr(fi_def, "name", "") if fi_name in common_input_names: main_input_target_path = getattr(fi_def, "targetPath", None) main_input_automount = getattr(fi_def, "autoMountLocal", True) actual_input_param_name = fi_name found_input_def = True print( f"Auto-detected input parameter: '{actual_input_param_name}' (provided: '{input_dir_param_name}')" ) break # If still not found, use the first fileInput as fallback if not found_input_def and job_attrs.fileInputs: fi_def = job_attrs.fileInputs[0] main_input_target_path = getattr(fi_def, "targetPath", None) main_input_automount = getattr(fi_def, "autoMountLocal", True) actual_input_param_name = getattr(fi_def, "name", "") found_input_def = True print( f"Using first available fileInput: '{actual_input_param_name}' (no match found for '{input_dir_param_name}')" ) if not found_input_def: print( f"Warning: No fileInputs found in app definition. Using provided name '{input_dir_param_name}'" ) main_input_dict = { "name": actual_input_param_name, # Use the detected/matched parameter name "sourceUrl": input_dir_uri, "autoMountLocal": main_input_automount, } if ( main_input_target_path ): # Add targetPath only if the app definition provided one for this input main_input_dict["targetPath"] = main_input_target_path job_req["fileInputs"].append(main_input_dict) if extra_file_inputs: job_req["fileInputs"].extend(extra_file_inputs) # --- Handle script parameter placement --- script_param_added = False if script_filename is not None: # Only process if a script filename is provided # Try to place in appArgs if hasattr(param_set_def, "appArgs") and param_set_def.appArgs: for arg_def in param_set_def.appArgs: arg_name = getattr(arg_def, "name", "") if arg_name in script_param_names: print( f"Placing script '{script_filename}' in appArgs: '{arg_name}'" ) job_req["parameterSet"]["appArgs"].append( {"name": arg_name, "arg": script_filename} ) script_param_added = True break # If not placed in appArgs, try envVariables if ( not script_param_added and hasattr(param_set_def, "envVariables") and param_set_def.envVariables ): for var_def in param_set_def.envVariables: var_key = getattr(var_def, "key", "") if var_key in script_param_names: print( f"Placing script '{script_filename}' in envVariables: '{var_key}'" ) job_req["parameterSet"]["envVariables"].append( {"key": var_key, "value": script_filename} ) script_param_added = True break if not script_param_added: # If script_filename was provided but could not be placed. app_args_details = getattr(param_set_def, "appArgs", []) env_vars_details = getattr(param_set_def, "envVariables", []) defined_app_arg_names = [ getattr(a, "name", None) for a in app_args_details ] defined_env_var_keys = [ getattr(e, "key", None) for e in env_vars_details ] raise ValueError( f"script_filename '{script_filename}' was provided, but no matching parameter " f"(expected names/keys from script_param_names: {script_param_names}) was found " f"in the app's defined parameterSet. " f"App's defined appArgs names: {defined_app_arg_names}. " f"App's defined envVariables keys: {defined_env_var_keys}." ) else: print("script_filename is None, skipping script parameter placement.") # --- Auto-detect and add required parameters from app definition --- # Process appArgs first - add all required appArgs that aren't provided by user if hasattr(param_set_def, "appArgs") and param_set_def.appArgs: for app_arg_def in param_set_def.appArgs: arg_name = getattr(app_arg_def, "name", "") input_mode = getattr(app_arg_def, "inputMode", "") default_value = getattr(app_arg_def, "arg", "") # Skip if this is the script parameter (already handled above) if script_filename and arg_name in script_param_names: continue # Check if this arg is required and not already provided if input_mode == "REQUIRED" and arg_name: # Check if user already provided this arg user_provided = False if extra_app_args: for user_arg in extra_app_args: if user_arg.get("name") == arg_name: user_provided = True break # Also check if already added to job_req already_added = False for existing_arg in job_req["parameterSet"]["appArgs"]: if existing_arg.get("name") == arg_name: already_added = True break if not user_provided and not already_added: if default_value: print( f"Auto-adding required appArg '{arg_name}' with default: '{default_value}'" ) job_req["parameterSet"]["appArgs"].append( {"name": arg_name, "arg": default_value} ) else: print( f"Warning: Required appArg '{arg_name}' has no default value." ) # Process envVariables - add all required envVariables that aren't provided by user if hasattr(param_set_def, "envVariables") and param_set_def.envVariables: for env_var_def in param_set_def.envVariables: var_key = getattr(env_var_def, "key", "") input_mode = getattr(env_var_def, "inputMode", "") default_value = getattr(env_var_def, "value", "") enum_values = getattr(env_var_def, "enum_values", None) # Skip if this is the script parameter (already handled above) if script_filename and var_key in script_param_names: continue # Check if this variable is required and not already provided by user if input_mode == "REQUIRED" and var_key: # Check if user already provided this variable user_provided = False if extra_env_vars: for user_var in extra_env_vars: if user_var.get("key") == var_key: user_provided = True break # Also check if already added to job_req already_added = False for existing_var in job_req["parameterSet"]["envVariables"]: if existing_var.get("key") == var_key: already_added = True break if not user_provided and not already_added: # Use default value if available value_to_use = default_value # If no default but has enum values, use the first one if ( not value_to_use and enum_values and isinstance(enum_values, dict) ): value_to_use = list(enum_values.keys())[0] print( f"Auto-setting required env var '{var_key}' to first available option: '{value_to_use}'" ) elif value_to_use: print( f"Auto-setting required env var '{var_key}' to default: '{value_to_use}'" ) else: print( f"Warning: Required env var '{var_key}' has no default value." ) continue # Add to job request job_req["parameterSet"]["envVariables"].append( {"key": var_key, "value": value_to_use} ) # --- Handle extra parameters --- if extra_app_args: job_req["parameterSet"]["appArgs"].extend(extra_app_args) if extra_env_vars: # For OpenFOAM, parameters like solver, mesh, decomp go here job_req["parameterSet"]["envVariables"].extend(extra_env_vars) # --- Handle scheduler options and allocation --- fixed_sched_opt_names = [] if ( hasattr(param_set_def, "schedulerOptions") and param_set_def.schedulerOptions ): for sched_opt_def in param_set_def.schedulerOptions: # Check if inputMode is FIXED for this specific option definition if getattr(sched_opt_def, "inputMode", None) == "FIXED" and hasattr( sched_opt_def, "name" ): fixed_sched_opt_names.append(getattr(sched_opt_def, "name")) if allocation: # Check if the app itself defines an allocation parameter that is FIXED allocation_is_fixed_by_app = False if ( hasattr(param_set_def, "schedulerOptions") and param_set_def.schedulerOptions ): for sched_opt_def in param_set_def.schedulerOptions: # Assuming allocation is identified by allocation_param_name if ( getattr(sched_opt_def, "name", "") == allocation_param_name and getattr(sched_opt_def, "inputMode", None) == "FIXED" ): allocation_is_fixed_by_app = True print( f"Warning: App definition marks '{allocation_param_name}' as FIXED with value '{getattr(sched_opt_def, 'arg', '')}'. " f"User-provided allocation '{allocation}' will be ignored." ) break if not allocation_is_fixed_by_app: # If user provides an allocation and it's not fixed by the app, add/override it. # Remove any existing scheduler option with the same name before adding the new one. job_req["parameterSet"]["schedulerOptions"] = [ opt for opt in job_req["parameterSet"]["schedulerOptions"] if getattr(opt, "name", opt.get("name")) != allocation_param_name # Handle both Tapis objects and dicts ] print(f"Adding/Updating TACC allocation: {allocation}") job_req["parameterSet"]["schedulerOptions"].append( {"name": allocation_param_name, "arg": f"-A {allocation}"} ) if extra_scheduler_options: for extra_opt in extra_scheduler_options: opt_name = extra_opt.get("name") if opt_name and opt_name in fixed_sched_opt_names: print( f"Warning: Skipping user-provided scheduler option '{opt_name}' because it is marked as FIXED in the app definition." ) else: # Avoid duplicates if user tries to override allocation via extra_scheduler_options if opt_name == allocation_param_name and allocation: print( f"Note: Allocation '{allocation}' is already being handled. Skipping duplicate allocation from extra_scheduler_options." ) continue job_req["parameterSet"]["schedulerOptions"].append(extra_opt) # --- Clean up empty parameterSet sections --- if not job_req["parameterSet"]["appArgs"]: del job_req["parameterSet"]["appArgs"] if not job_req["parameterSet"]["envVariables"]: del job_req["parameterSet"]["envVariables"] if not job_req["parameterSet"]["schedulerOptions"]: del job_req["parameterSet"]["schedulerOptions"] if not job_req["parameterSet"]: del job_req["parameterSet"] final_job_req = {k: v for k, v in job_req.items() if v is not None} print("Job request dictionary generated successfully.") return final_job_req except (AppDiscoveryError, ValueError) as e: print(f"ERROR: Failed to generate job request: {e}") raise except Exception as e: print(f"ERROR: Unexpected error generating job request: {e}") raise JobSubmissionError(f"Unexpected error generating job request: {e}") from e
# --- submit_job_request function ---
[docs] def submit_job_request( tapis_client: Tapis, job_request: Dict[str, Any] ) -> "SubmittedJob": """Submit a pre-generated job request dictionary to Tapis. Takes a complete job request dictionary (typically generated by generate_job_request) and submits it to the Tapis jobs service for execution. Prints the job request details before submission for debugging purposes. Args: tapis_client (Tapis): Authenticated Tapis client instance. job_request (Dict[str, Any]): Complete job request dictionary containing all necessary job parameters, file inputs, and configuration. Returns: SubmittedJob: A SubmittedJob object for monitoring and managing the submitted job. Raises: ValueError: If job_request is not a dictionary. JobSubmissionError: If the Tapis job submission fails, with additional context from the HTTP request and response when available. Example: >>> job_request = generate_job_request(...) >>> submitted_job = submit_job_request(client, job_request) --- Submitting Tapis Job Request --- { "name": "matlab-r2023a-20231201_143022", "appId": "matlab-r2023a", ... } ------------------------------------ Job submitted successfully. UUID: 12345678-1234-1234-1234-123456789abc """ if not isinstance(job_request, dict): raise ValueError("Input 'job_request' must be a dictionary.") print("\n--- Submitting Tapis Job Request ---") print(json.dumps(job_request, indent=2, default=str)) print("------------------------------------") try: submitted = tapis_client.jobs.submitJob(**job_request) print(f"Job submitted successfully. UUID: {submitted.uuid}") return SubmittedJob(tapis_client, submitted.uuid) except BaseTapyException as e: print(f"ERROR: Tapis job submission API call failed: {e}") raise JobSubmissionError( f"Tapis job submission failed: {e}", request=getattr(e, "request", None), response=getattr(e, "response", None), ) from e except Exception as e: print(f"ERROR: Unexpected error during job submission: {e}") raise JobSubmissionError(f"Unexpected error during job submission: {e}") from e
# --- SubmittedJob Class ---
[docs] class SubmittedJob: """Represents a submitted Tapis job with methods for monitoring and management. This class provides a high-level interface for interacting with Tapis jobs, including status monitoring, output retrieval, job cancellation, and runtime analysis. It caches job details and status to minimize API calls. Attributes: uuid (str): The unique identifier of the Tapis job. TERMINAL_STATES (List[str]): List of job states that indicate completion. Example: >>> job = SubmittedJob(client, "12345678-1234-1234-1234-123456789abc") >>> status = job.status >>> if status in job.TERMINAL_STATES: ... print("Job completed") >>> final_status = job.monitor(timeout_minutes=60) """ TERMINAL_STATES = TAPIS_TERMINAL_STATES # Use module-level constant
[docs] def __init__(self, tapis_client: Tapis, job_uuid: str): """Initialize a SubmittedJob instance for an existing Tapis job. Args: tapis_client (Tapis): Authenticated Tapis client instance. job_uuid (str): The UUID of an existing Tapis job. Raises: TypeError: If tapis_client is not a Tapis instance. ValueError: If job_uuid is empty or not a string. """ if not isinstance(tapis_client, Tapis): raise TypeError("tapis_client must be an instance of tapipy.Tapis") if not job_uuid or not isinstance(job_uuid, str): raise ValueError("job_uuid must be a non-empty string.") self._tapis = tapis_client self.uuid = job_uuid self._last_status: Optional[str] = None self._job_details: Optional[Tapis] = None
def _get_details(self, force_refresh: bool = False) -> Tapis: """Fetch and cache job details from Tapis. Args: force_refresh (bool, optional): If True, forces a fresh API call even if details are already cached. Defaults to False. Returns: Tapis: Complete job details object from Tapis API. Raises: JobMonitorError: If job details cannot be retrieved from Tapis. """ if not self._job_details or force_refresh: try: self._job_details = self._tapis.jobs.getJob(jobUuid=self.uuid) self._last_status = self._job_details.status except BaseTapyException as e: raise JobMonitorError( f"Failed to get details for job {self.uuid}: {e}" ) from e return self._job_details @property def details(self) -> Tapis: """Get cached job details, fetching from Tapis if not already cached. Returns: Tapis: Complete job details object containing all job metadata, configuration, and current state information. """ return self._get_details() @property def status(self) -> str: """Get the current job status, using cached value when appropriate. For terminal states, returns the cached status without making an API call. For non-terminal states, may fetch fresh status depending on cache state. Returns: str: Current job status (e.g., "QUEUED", "RUNNING", "FINISHED", "FAILED"). Returns STATUS_UNKNOWN if status cannot be determined. """ try: if self._last_status and self._last_status not in self.TERMINAL_STATES: return self.get_status(force_refresh=False) elif self._last_status: return self._last_status else: return self.get_status(force_refresh=True) except JobMonitorError: return STATUS_UNKNOWN
[docs] def get_status(self, force_refresh: bool = True) -> str: """Get the current job status from Tapis API. Args: force_refresh (bool, optional): If True, always makes a fresh API call. If False, may return cached status. Defaults to True. Returns: str: Current job status from Tapis API. Raises: JobMonitorError: If status cannot be retrieved from Tapis. """ if not force_refresh and self._last_status: return self._last_status try: status_obj = self._tapis.jobs.getJobStatus(jobUuid=self.uuid) new_status = status_obj.status if new_status != self._last_status: self._last_status = new_status if self._job_details and self._job_details.status != self._last_status: self._job_details = None return self._last_status except BaseTapyException as e: raise JobMonitorError( f"Failed to get status for job {self.uuid}: {e}" ) from e
@property def last_message(self) -> Optional[str]: """Get the last status message recorded for the job. Retrieves the most recent status message from the job details, which typically contains information about the current job state or any errors that have occurred. Returns: str or None: The last status message if available and non-empty, otherwise None. Empty strings are treated as None. Note: Returns None if job details cannot be retrieved or if no message is available. Does not raise exceptions for retrieval failures. """ try: details = self.details # Ensures job details are loaded message = getattr(details, "lastMessage", None) if message: # Sometimes messages might be empty strings, treat as None for consistency return str(message).strip() if str(message).strip() else None return None except JobMonitorError as e: print( f"Could not retrieve job details to get last_message for job {self.uuid}: {e}" ) return None except Exception as e: print( f"An unexpected error occurred while fetching last_message for job {self.uuid}: {e}" ) return None
[docs] def monitor(self, interval: int = 15, timeout_minutes: Optional[int] = None) -> str: """Monitor job status with progress bars until completion or timeout. Continuously monitors the job status using tqdm progress bars to show progress through different job phases (waiting, running). Handles interruptions and errors gracefully. Args: interval (int, optional): Status check interval in seconds. Defaults to 15. timeout_minutes (int, optional): Maximum monitoring time in minutes. If None, uses the job's maxMinutes from its configuration. Use -1 or 0 for unlimited monitoring. Defaults to None. Returns: str: Final job status. Can be a standard Tapis status ("FINISHED", "FAILED", etc.) or a special monitoring status: - STATUS_TIMEOUT: Monitoring timed out - STATUS_INTERRUPTED: User interrupted monitoring (Ctrl+C) - STATUS_MONITOR_ERROR: Error occurred during monitoring Example: >>> job = SubmittedJob(client, job_uuid) >>> final_status = job.monitor(interval=30, timeout_minutes=120) Monitoring Job: 12345678-1234-1234-1234-123456789abc Waiting for job to start: 100%|████████| 12 checks Monitoring job: 100%|████████████| 45/45 checks Status: FINISHED >>> if final_status == "FINISHED": ... print("Job completed successfully!") """ previous_status = None current_status = STATUS_UNKNOWN start_time = time.time() effective_timeout_minutes = -1 timeout_seconds = float("inf") max_iterations = float("inf") pbar_waiting = None pbar_monitoring = None print(f"\nMonitoring Job: {self.uuid}") # Print Job ID once at the start try: # Fetch initial details details = self._get_details(force_refresh=True) current_status = details.status previous_status = current_status effective_timeout_minutes = ( timeout_minutes if timeout_minutes is not None else details.maxMinutes ) if effective_timeout_minutes <= 0: print( f"Job has maxMinutes <= 0 ({details.maxMinutes}). Monitoring indefinitely or until terminal state." ) timeout_seconds = float("inf") max_iterations = float("inf") else: timeout_seconds = effective_timeout_minutes * 60 max_iterations = ( int(timeout_seconds // interval) if interval > 0 else float("inf") ) waiting_states = [ "PENDING", "PROCESSING_INPUTS", "STAGING_INPUTS", "STAGING_JOB", "SUBMITTING_JOB", "QUEUED", ] running_states = [ "RUNNING", "ARCHIVING", ] # Treat ARCHIVING as part of the active monitoring phase # --- Waiting Phase --- if current_status in waiting_states: pbar_waiting = tqdm( desc="Waiting for job to start", dynamic_ncols=True, unit=" checks", leave=False, ) # leave=False hides bar on completion while current_status in waiting_states: pbar_waiting.set_postfix_str( f"Status: {current_status}", refresh=True ) time.sleep(interval) current_status = self.get_status(force_refresh=True) pbar_waiting.update(1) if time.time() - start_time > timeout_seconds: tqdm.write( f"\nWarning: Monitoring timeout ({effective_timeout_minutes} mins) reached while waiting." ) return STATUS_TIMEOUT if current_status in self.TERMINAL_STATES: pbar_waiting.set_postfix_str( f"Status: {current_status}", refresh=True ) tqdm.write( f"\nJob reached terminal state while waiting: {current_status}" ) return current_status # Return actual terminal status pbar_waiting.close() pbar_waiting = None # --- Monitoring Phase --- if current_status in running_states: total_iterations = ( max_iterations if max_iterations != float("inf") else None ) pbar_monitoring = tqdm( total=total_iterations, desc="Monitoring job", dynamic_ncols=True, unit=" checks", leave=True, ) # leave=True keeps bar after completion iteration_count = 0 # Initial status print for this phase tqdm.write(f"\tStatus: {current_status}") previous_status = current_status while current_status in running_states: # Update description only if status changes within this phase (less noisy) if current_status != previous_status: pbar_monitoring.set_description( f"Monitoring job (Status: {current_status})" ) tqdm.write(f"\tStatus: {current_status}") previous_status = current_status pbar_monitoring.update(1) iteration_count += 1 if ( max_iterations != float("inf") and iteration_count >= max_iterations ): tqdm.write( f"\nWarning: Monitoring timeout ({effective_timeout_minutes} mins) reached." ) return STATUS_TIMEOUT time.sleep(interval) current_status = self.get_status(force_refresh=True) if current_status in self.TERMINAL_STATES: tqdm.write(f"\tStatus: {current_status}") # Write final status if total_iterations: pbar_monitoring.n = total_iterations pbar_monitoring.refresh() return current_status # Return actual terminal status pbar_monitoring.close() pbar_monitoring = None # --- Handle Other Cases --- elif current_status in self.TERMINAL_STATES: print(f"Job already in terminal state: {current_status}") return current_status else: print( f"Job in unexpected initial state '{current_status}'. Monitoring stopped." ) return current_status return current_status # Should be a terminal state if loops finished except KeyboardInterrupt: print("\nMonitoring interrupted by user.") return STATUS_INTERRUPTED except JobMonitorError as e: print(f"\nError during monitoring: {e}") return STATUS_MONITOR_ERROR except Exception as e: print(f"\nUnexpected error during monitoring: {e}") return STATUS_MONITOR_ERROR finally: # Safely close progress bars if pbar_waiting is not None: try: pbar_waiting.close() except Exception: pass if pbar_monitoring is not None: try: pbar_monitoring.close() except Exception: pass
[docs] def print_runtime_summary(self, verbose: bool = False): """Print a summary of job runtime phases and total execution time. Analyzes the job's execution history to show time spent in different phases (queued, running) and calculates the total runtime from submission to completion. Args: verbose (bool, optional): If True, prints detailed job history events in addition to the runtime summary. Defaults to False. Example: >>> job.print_runtime_summary() Runtime Summary --------------- QUEUED time: 00:05:30 RUNNING time: 01:23:45 TOTAL time: 01:29:15 --------------- >>> job.print_runtime_summary(verbose=True) Detailed Job History: Event: JOB_NEW_STATUS, Detail: PENDING, Time: 2023-12-01T14:30:22.123456Z Event: JOB_NEW_STATUS, Detail: QUEUED, Time: 2023-12-01T14:30:25.234567Z ... Summary: QUEUED time: 00:05:30 RUNNING time: 01:23:45 TOTAL time: 01:29:15 --------------- """ from datetime import datetime t = self._tapis print("\nRuntime Summary") print("---------------") hist = t.jobs.getJobHistory(jobUuid=self.uuid) def format_timedelta(td): hours, remainder = divmod(td.total_seconds(), 3600) minutes, seconds = divmod(remainder, 60) return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}" time1 = datetime.strptime(hist[-1].created, "%Y-%m-%dT%H:%M:%S.%fZ") time0 = datetime.strptime(hist[0].created, "%Y-%m-%dT%H:%M:%S.%fZ") total_time = time1 - time0 if verbose: print("\nDetailed Job History:") for event in hist: print( f"Event: {event.event}, Detail: {event.eventDetail}, Time: {event.created}" ) print("\nSummary:") for i in range(len(hist) - 1): if hist[i].eventDetail == "RUNNING": time1 = datetime.strptime(hist[i + 1].created, "%Y-%m-%dT%H:%M:%S.%fZ") time0 = datetime.strptime(hist[i].created, "%Y-%m-%dT%H:%M:%S.%fZ") print("RUNNING time:", format_timedelta(time1 - time0)) elif hist[i].eventDetail == "QUEUED": time1 = datetime.strptime(hist[i + 1].created, "%Y-%m-%dT%H:%M:%S.%fZ") time0 = datetime.strptime(hist[i].created, "%Y-%m-%dT%H:%M:%S.%fZ") print("QUEUED time:", format_timedelta(time1 - time0)) print("TOTAL time:", format_timedelta(total_time)) print("---------------")
# --- Other SubmittedJob methods (archive_uri, list_outputs, etc.) --- # (No changes needed in these methods from the previous correct version) @property def archive_uri(self) -> Optional[str]: """Get the Tapis URI of the job's archive directory. Returns the URI where job outputs are stored after completion. The archive directory contains all job outputs, logs, and metadata. Returns: str or None: Tapis URI of the archive directory if available, otherwise None if archive information is not set. Example: >>> uri = job.archive_uri >>> if uri: ... print(f"Job outputs at: {uri}") ... files = client.files.list(uri) """ details = self._get_details() if details.archiveSystemId and details.archiveSystemDir: archive_path = details.archiveSystemDir.lstrip("/") return f"tapis://{details.archiveSystemId}/{archive_path}" return None
[docs] def list_outputs( self, path: str = "/", limit: int = 100, offset: int = 0 ) -> List[Tapis]: """List files and directories in the job's archive directory. Args: path (str, optional): Relative path within the job archive to list. Defaults to "/" (archive root). limit (int, optional): Maximum number of items to return. Defaults to 100. offset (int, optional): Number of items to skip for pagination. Defaults to 0. Returns: List[Tapis]: List of file and directory objects in the specified path. Raises: FileOperationError: If archive information is not available, the path cannot be accessed, or listing fails. Example: >>> outputs = job.list_outputs() >>> for item in outputs: ... print(f"{item.name} ({item.type})") tapisjob.out (file) tapisjob.err (file) results/ (dir) >>> results = job.list_outputs(path="results/") """ details = self._get_details() if not details.archiveSystemId or not details.archiveSystemDir: raise FileOperationError( f"Job {self.uuid} archive system ID or directory not available." ) full_archive_path = os.path.join(details.archiveSystemDir, path.lstrip("/")) full_archive_path = os.path.normpath(full_archive_path).lstrip("/") try: archive_base_uri = f"tapis://{details.archiveSystemId}/{full_archive_path}" from .files import list_files return list_files(self._tapis, archive_base_uri, limit=limit, offset=offset) except BaseTapyException as e: raise FileOperationError( f"Failed list job outputs for {self.uuid} at path '{path}': {e}" ) from e except Exception as e: raise FileOperationError( f"Unexpected error listing job outputs for {self.uuid}: {e}" ) from e
[docs] def download_output(self, remote_path: str, local_target: str): """Download a specific file from the job's archive directory. Args: remote_path (str): Relative path to the file within the job archive. local_target (str): Local filesystem path where the file should be saved. Raises: FileOperationError: If archive information is not available or download fails. Example: >>> job.download_output("tapisjob.out", "/local/job_output.txt") >>> job.download_output("results/data.txt", "/local/results/data.txt") """ details = self._get_details() if not details.archiveSystemId or not details.archiveSystemDir: raise FileOperationError( f"Job {self.uuid} archive system ID or directory not available." ) full_archive_path = os.path.join( details.archiveSystemDir, remote_path.lstrip("/") ) full_archive_path = os.path.normpath(full_archive_path).lstrip("/") remote_uri = f"tapis://{details.archiveSystemId}/{full_archive_path}" try: from .files import download_file download_file(self._tapis, remote_uri, local_target) except Exception as e: raise FileOperationError( f"Failed to download output '{remote_path}' for job {self.uuid}: {e}" ) from e
[docs] def get_output_content( self, output_filename: str, max_lines: Optional[int] = None, missing_ok: bool = True, ) -> Optional[str]: """Retrieve the content of a specific output file from the job's archive. Fetches and returns the content of a file from the job's archive directory as a string. Useful for examining log files, output files, and error files. Args: output_filename (str): Name of the file in the job's archive root (e.g., "tapisjob.out", "tapisjob.err", "results.txt"). max_lines (int, optional): If specified, returns only the last N lines of the file. Useful for large log files. Defaults to None (full file). missing_ok (bool, optional): If True and the file is not found, returns None. If False and not found, raises FileOperationError. Defaults to True. Returns: str or None: Content of the file as a string, or None if the file is not found and missing_ok=True. Raises: FileOperationError: If the job archive is not available, the file is not found (and missing_ok=False), or if there's an error fetching the file. Example: >>> # Get job output log >>> output = job.get_output_content("tapisjob.out") >>> if output: ... print(output) >>> # Get last 50 lines of error log >>> errors = job.get_output_content("tapisjob.err", max_lines=50) >>> # Require file to exist (raise error if missing) >>> results = job.get_output_content("results.txt", missing_ok=False) """ print(f"Attempting to fetch content of '{output_filename}' from job archive...") details = self._get_details() # Ensure details are loaded if not details.archiveSystemId or not details.archiveSystemDir: raise FileOperationError( f"Job {self.uuid} archive system ID or directory not available. Cannot fetch output." ) full_archive_path = os.path.join( details.archiveSystemDir, output_filename.lstrip("/") ) full_archive_path = os.path.normpath(full_archive_path).lstrip("/") try: # self._tapis.files.getContents() is expected to return the full file content as bytes # when the response is not JSON. The stream=True parameter is for the API endpoint. content_bytes = self._tapis.files.getContents( systemId=details.archiveSystemId, path=full_archive_path, stream=True, # Good to keep, as it's a hint for the server ) # Verify that we indeed received bytes if not isinstance(content_bytes, bytes): raise FileOperationError( f"Tapis API returned unexpected type for file content of '{output_filename}': {type(content_bytes)}. Expected bytes." ) content_str = content_bytes.decode( "utf-8", errors="replace" ) # Decode to string if max_lines is not None and max_lines > 0: lines = content_str.splitlines() if len(lines) > max_lines: # Slice to get the last max_lines content_str = "\n".join(lines[-max_lines:]) print(f"Returning last {max_lines} lines of '{output_filename}'.") else: print( f"File '{output_filename}' has {len(lines)} lines (less than/equal to max_lines={max_lines}). Returning full content." ) else: print(f"Returning full content of '{output_filename}'.") return content_str except BaseTapyException as e: if hasattr(e, "response") and e.response and e.response.status_code == 404: if missing_ok: print( f"Output file '{output_filename}' not found in archive (missing_ok=True). Path: {details.archiveSystemId}/{full_archive_path}" ) return None else: raise FileOperationError( f"Output file '{output_filename}' not found in job archive " f"at system '{details.archiveSystemId}', path '{full_archive_path}'." ) from e else: raise FileOperationError( f"Tapis error fetching output file '{output_filename}' for job {self.uuid} (Path: {details.archiveSystemId}/{full_archive_path}): {e}" ) from e except FileOperationError: # Re-raise FileOperationErrors from above raise except Exception as e: # Catch other unexpected errors raise FileOperationError( f"Unexpected error fetching content of '{output_filename}' for job {self.uuid} (Path: {details.archiveSystemId}/{full_archive_path}): {e}" ) from e
[docs] def cancel(self): """Attempt to cancel the job execution. Sends a cancellation request to Tapis. Note that cancellation may not be immediate and depends on the job's current state and the execution system. Raises: JobMonitorError: If the cancellation request fails or encounters an error. Note: Jobs that are already in terminal states cannot be cancelled. The method will print the current status if cancellation is not possible. Example: >>> job.cancel() Attempting to cancel job 12345678-1234-1234-1234-123456789abc... Cancel request sent for job 12345678-1234-1234-1234-123456789abc. Status may take time to update. """ print(f"Attempting to cancel job {self.uuid}...") try: self._tapis.jobs.cancelJob(jobUuid=self.uuid) print( f"Cancel request sent for job {self.uuid}. Status may take time to update." ) self._last_status = "CANCELLED" self._job_details = None except BaseTapyException as e: if hasattr(e, "response") and e.response and e.response.status_code == 400: print( f"Could not cancel job {self.uuid}. It might already be in a terminal state. Fetching status..." ) self.get_status(force_refresh=True) print(f"Current status: {self.status}") else: raise JobMonitorError( f"Failed to send cancel request for job {self.uuid}: {e}" ) from e except Exception as e: raise JobMonitorError( f"Unexpected error cancelling job {self.uuid}: {e}" ) from e
# --- Standalone Helper Functions ---
[docs] def get_job_status(t: Tapis, job_uuid: str) -> str: """Get the current status of a job by UUID. Standalone convenience function that creates a temporary SubmittedJob instance to retrieve the current status of an existing job. Args: t (Tapis): Authenticated Tapis client instance. job_uuid (str): The UUID of the job to check. Returns: str: Current job status (e.g., "QUEUED", "RUNNING", "FINISHED", "FAILED"). Raises: JobMonitorError: If status retrieval fails. TypeError: If t is not a Tapis instance. ValueError: If job_uuid is empty or invalid. Example: >>> status = get_job_status(client, "12345678-1234-1234-1234-123456789abc") >>> print(f"Job status: {status}") """ job = SubmittedJob(t, job_uuid) return job.get_status(force_refresh=True)
[docs] def get_runtime_summary(t: Tapis, job_uuid: str, verbose: bool = False): """Print a runtime summary for a job by UUID. Standalone convenience function that creates a temporary SubmittedJob instance to analyze and print the runtime summary of an existing job. Args: t (Tapis): Authenticated Tapis client instance. job_uuid (str): The UUID of the job to analyze. verbose (bool, optional): If True, prints detailed job history events in addition to the runtime summary. Defaults to False. Raises: JobMonitorError: If job details cannot be retrieved. TypeError: If t is not a Tapis instance. ValueError: If job_uuid is empty or invalid. Example: >>> get_runtime_summary(client, "12345678-1234-1234-1234-123456789abc") Runtime Summary --------------- QUEUED time: 00:05:30 RUNNING time: 01:23:45 TOTAL time: 01:29:15 --------------- """ job = SubmittedJob(t, job_uuid) job.print_runtime_summary(verbose=verbose)
[docs] def interpret_job_status(final_status: str, job_uuid: Optional[str] = None): """Print a user-friendly interpretation of a job status. Provides human-readable explanations for various job status values, including both standard Tapis states and special monitoring states. Args: final_status (str): The job status to interpret. Can be a standard Tapis status ("FINISHED", "FAILED", etc.) or a special monitoring status (STATUS_TIMEOUT, STATUS_INTERRUPTED, etc.). job_uuid (str, optional): The job UUID to include in the message for context. If None, uses generic "Job" in the message. Defaults to None. Example: >>> interpret_job_status("FINISHED", "12345678-1234-1234-1234-123456789abc") Job 12345678-1234-1234-1234-123456789abc completed successfully. >>> interpret_job_status("FAILED") Job failed. Check logs or job details. >>> interpret_job_status(STATUS_TIMEOUT, "12345678-1234-1234-1234-123456789abc") Job 12345678-1234-1234-1234-123456789abc monitoring timed out. """ job_id_str = f"Job {job_uuid}" if job_uuid else "Job" if final_status == "FINISHED": print(f"{job_id_str} completed successfully.") elif final_status == "FAILED": print(f"{job_id_str} failed. Check logs or job details.") elif final_status == STATUS_TIMEOUT: print(f"{job_id_str} monitoring timed out.") elif final_status == STATUS_INTERRUPTED: print(f"{job_id_str} monitoring was interrupted.") elif final_status == STATUS_MONITOR_ERROR: print(f"An error occurred while monitoring {job_id_str}.") elif final_status == STATUS_UNKNOWN: print(f"Could not determine final status of {job_id_str}.") elif final_status in TAPIS_TERMINAL_STATES: print(f"{job_id_str} ended with status: {final_status}") else: print(f"{job_id_str} ended with an unexpected status: {final_status}")
[docs] def list_jobs( tapis_client: Tapis, app_id: Optional[str] = None, status: Optional[str] = None, limit: int = 100, output: str = "df", verbose: bool = False, ): """Fetch Tapis jobs with optional filtering. Retrieves jobs from Tapis ordered by creation date (newest first) and optionally filters by app ID and/or status. Filters are applied client-side after fetching. Args: tapis_client: Authenticated Tapis client instance. app_id: Filter by application ID (e.g., "opensees-mp-s3"). status: Filter by job status (e.g., "FINISHED", "FAILED"). Case-insensitive. limit: Maximum number of jobs to fetch from Tapis. Defaults to 100. output: Output format. "df" returns a pandas DataFrame (default), "list" returns a list of dicts, "raw" returns the raw TapisResult objects. verbose: If True, prints the number of jobs found. Returns: Depends on ``output``: - "df": pandas DataFrame with formatted datetime columns. - "list": list of dicts with job metadata. - "raw": list of TapisResult objects as returned by the API. Raises: JobMonitorError: If the Tapis API call fails. ValueError: If output format is not recognized. Example: >>> df = list_jobs(t, app_id="matlab-r2023a", status="FINISHED") >>> jobs = list_jobs(t, output="list") >>> raw = list_jobs(t, limit=10, output="raw") """ if output not in ("df", "list", "raw"): raise ValueError(f"output must be 'df', 'list', or 'raw', got '{output}'") try: jobs_list = tapis_client.jobs.getJobList( limit=limit, orderBy="created(desc)", ) except BaseTapyException as e: raise JobMonitorError(f"Failed to list jobs: {e}") from e except Exception as e: raise JobMonitorError(f"Unexpected error listing jobs: {e}") from e if not jobs_list: if verbose: print("Found 0 jobs.") if output == "raw": return [] if output == "list": return [] return pd.DataFrame() # For raw output, apply filters manually on TapisResult objects if output == "raw": results = jobs_list if app_id: results = [j for j in results if getattr(j, "appId", None) == app_id] if status: results = [ j for j in results if getattr(j, "status", "").upper() == status.upper() ] if verbose: print(f"Found {len(results)} jobs.") return results # Convert TapisResult objects to dicts jobs_dicts = [job.__dict__ for job in jobs_list] # Apply client-side filters if app_id: jobs_dicts = [j for j in jobs_dicts if j.get("appId") == app_id] if status: jobs_dicts = [ j for j in jobs_dicts if j.get("status", "").upper() == status.upper() ] if verbose: print(f"Found {len(jobs_dicts)} jobs.") if output == "list": return jobs_dicts # Build DataFrame df = pd.DataFrame(jobs_dicts) if df.empty: return df # Add formatted datetime columns time_cols = ["created", "ended", "remoteStarted", "lastUpdated"] for col in time_cols: if col in df.columns: df[f"{col}_dt"] = pd.to_datetime(df[col], utc=True, errors="coerce") df[f"{col}_date"] = df[f"{col}_dt"].dt.date # Reorder: priority columns first priority = [ "name", "uuid", "status", "appId", "appVersion", "created_dt", "ended_dt", ] priority_present = [c for c in priority if c in df.columns] remaining = [c for c in df.columns if c not in priority_present] df = df[priority_present + remaining] df = df.reset_index(drop=True) return df