Skip to content

Job Management

This guide covers everything you need to know about submitting, monitoring, and managing computational jobs on DesignSafe using dapi.

🎯 Overview

dapi provides a high-level interface for working with TAPIS v3 jobs on DesignSafe. You can:

  • Discover available applications
  • Generate job requests with automatic parameter mapping
  • Submit jobs to DesignSafe compute resources
  • Monitor job progress with real-time updates
  • Manage job outputs and results

🔍 Application Discovery

Finding Applications

from dapi import DSClient

client = DSClient()

# Find all applications
all_apps = client.apps.find("", verbose=False)
print(f"Found {len(all_apps)} applications")

# Search for specific applications
matlab_apps = client.apps.find("matlab", verbose=True)
opensees_apps = client.apps.find("opensees", verbose=True)
mpm_apps = client.apps.find("mpm", verbose=True)

Getting Application Details

# Get detailed information about an application
app_details = client.apps.get_details("mpm-s3", verbose=True)

print(f"App: {app_details.id}")
print(f"Version: {app_details.version}")
print(f"Description: {app_details.description}")
print(f"Execution System: {app_details.jobAttributes.execSystemId}")
print(f"Max Runtime: {app_details.jobAttributes.maxMinutes} minutes")
print(f"Default Cores: {app_details.jobAttributes.coresPerNode}")
Application App ID Description
MATLAB matlab-r2023a MATLAB computational environment
OpenSees opensees-express Structural analysis framework
MPM mpm-s3 Material Point Method simulations
ADCIRC adcirc-v55 Coastal circulation modeling
LS-DYNA ls-dyna Explicit finite element analysis

🚀 Job Submission

Basic Job Submission

# 1. Prepare input directory
input_path = "/MyData/analysis/input/"
input_uri = client.files.translate_path_to_uri(input_path, verify_exists=True)

# 2. Generate job request
job_request = client.jobs.generate_request(
    app_id="matlab-r2023a",
    input_dir_uri=input_uri,
    script_filename="run_analysis.m",
    max_minutes=60,
    allocation="your_tacc_allocation"
)

# 3. Submit job
job = client.jobs.submit_request(job_request)
print(f"Job submitted: {job.uuid}")

Advanced Job Configuration

job_request = client.jobs.generate_request(
    app_id="mpm-s3",
    input_dir_uri=input_uri,
    script_filename="mpm.json",

    # Resource requirements
    max_minutes=120,
    node_count=2,
    cores_per_node=48,
    memory_mb=96000,
    queue="normal",
    allocation="your_allocation",

    # Job metadata
    job_name="mpm_parametric_study_001",
    description="Parametric study of soil behavior under seismic loading",
    tags=["research", "mpm", "seismic"],

    # Additional file inputs
    extra_file_inputs=[
        {
            "name": "Material Library",
            "sourceUrl": "tapis://designsafe.storage.default/shared/materials/",
            "targetPath": "materials"
        }
    ],

    # Environment variables
    extra_env_vars=[
        {"key": "OMP_NUM_THREADS", "value": "48"},
        {"key": "ANALYSIS_TYPE", "value": "SEISMIC"}
    ],

    # Scheduler options
    extra_scheduler_options=[
        {"name": "Email Notification", "arg": "-m be"},
        {"name": "Job Array", "arg": "-t 1-10"}
    ]
)

Modifying Job Requests

# Generate base request
job_request = client.jobs.generate_request(...)

# Modify before submission
job_request["name"] = "custom_job_name"
job_request["description"] = "Updated description"
job_request["nodeCount"] = 4
job_request["maxMinutes"] = 180

# Add custom parameters
if "parameterSet" not in job_request:
    job_request["parameterSet"] = {}
if "envVariables" not in job_request["parameterSet"]:
    job_request["parameterSet"]["envVariables"] = []

job_request["parameterSet"]["envVariables"].append({
    "key": "CUSTOM_PARAM",
    "value": "custom_value"
})

# Submit modified request
job = client.jobs.submit_request(job_request)

📊 Job Monitoring

Real-time Monitoring

# Submit job
job = client.jobs.submit_request(job_request)

# Monitor with progress bars
final_status = job.monitor(
    interval=15,           # Check every 15 seconds
    timeout_minutes=240    # Timeout after 4 hours
)

# Interpret results
client.jobs.interpret_status(final_status, job.uuid)

Manual Status Checking

# Check current status
current_status = job.get_status()
print(f"Current status: {current_status}")

# Check if job is complete
if current_status in job.TERMINAL_STATES:
    print("Job has finished")
else:
    print("Job is still running")

# Get detailed job information
details = job.details
print(f"Submitted: {details.created}")
print(f"Started: {details.started}")
print(f"Last Updated: {details.lastUpdated}")

Job Status Overview

Status Description
PENDING Job submitted but not yet processed
PROCESSING_INPUTS Input files being staged
STAGING_INPUTS Files being transferred to compute system
STAGING_JOB Job being prepared for execution
SUBMITTING_JOB Job being submitted to scheduler
QUEUED Job waiting in scheduler queue
RUNNING Job actively executing
ARCHIVING Output files being archived
FINISHED Job completed successfully
FAILED Job failed during execution
CANCELLED Job was cancelled
STOPPED Job was stopped

📈 Job Analysis

Runtime Summary

# Get runtime breakdown
job.print_runtime_summary(verbose=False)

# Detailed history (verbose mode)
job.print_runtime_summary(verbose=True)

Example output:

Runtime Summary
---------------
QUEUED  time: 00:05:30
RUNNING time: 01:23:45
TOTAL   time: 01:29:15
---------------

Status Messages

# Get last status message
last_message = job.last_message
if last_message:
    print(f"Last message: {last_message}")
else:
    print("No status message available")

📁 Output Management

Listing Job Outputs

# List all files in job archive
outputs = job.list_outputs()
for output in outputs:
    print(f"- {output.name} ({output.type}, {output.size} bytes)")

# List files in subdirectory
results = job.list_outputs(path="results/")

Accessing Job Archive

# Get archive URI
archive_uri = job.archive_uri
print(f"Job archive: {archive_uri}")

# Use files interface to browse archive
files = client.files.list(archive_uri)
for file in files:
    print(f"- {file.name}")

Reading Output Files

# Read job output log
stdout = job.get_output_content("tapisjob.out")
if stdout:
    print("Job Output:")
    print(stdout)

# Read last 50 lines of output
recent_output = job.get_output_content("tapisjob.out", max_lines=50)

# Read error log (if job failed)
stderr = job.get_output_content("tapisjob.err", missing_ok=True)
if stderr:
    print("Error Output:")
    print(stderr)

# Read custom output files
results = job.get_output_content("results.txt", missing_ok=True)

Downloading Files

# Download specific files
job.download_output("results.mat", "/local/path/results.mat")
job.download_output("output_data.csv", "/local/analysis/data.csv")

# Download using files interface
client.files.download(
    f"{archive_uri}/results.mat",
    "/local/path/results.mat"
)

🔄 Job Management

Job Cancellation

# Cancel a running job
job.cancel()

# Check status after cancellation
status = job.get_status()
print(f"Status after cancel: {status}")

The cancel() method sends a cancellation request to Tapis. Note that: - Cancellation may not be immediate and depends on the job's current state - Jobs already in terminal states (FINISHED, FAILED, etc.) cannot be cancelled - The job status will eventually change to "CANCELLED" if the cancellation is successful

Resuming Monitoring

# If you lose connection, resume monitoring with job UUID
from dapi import SubmittedJob

job_uuid = "12345678-1234-1234-1234-123456789abc"
resumed_job = SubmittedJob(client._tapis, job_uuid)

# Continue monitoring
final_status = resumed_job.monitor()

Bulk Job Operations

# Monitor multiple jobs
job_uuids = ["uuid1", "uuid2", "uuid3"]
jobs = [SubmittedJob(client._tapis, uuid) for uuid in job_uuids]

# Check all statuses
for job in jobs:
    status = job.get_status()
    print(f"Job {job.uuid}: {status}")

# Wait for all to complete
for job in jobs:
    if job.get_status() not in job.TERMINAL_STATES:
        print(f"Monitoring {job.uuid}...")
        final_status = job.monitor()
        print(f"Final status: {final_status}")

🖥️ System Information

Queue Information

# List available queues for a system
frontera_queues = client.systems.list_queues("frontera")
for queue in frontera_queues:
    print(f"Queue: {queue.name}")
    print(f"  Max runtime: {queue.maxRequestedTime} minutes")
    print(f"  Max nodes: {queue.maxNodesPerJob}")

# Check if specific queue exists
dev_queue_exists = any(q.name == "development" for q in frontera_queues)
print(f"Development queue available: {dev_queue_exists}")

System Status

# Get system information
try:
    queues = client.systems.list_queues("stampede3")
    print(f"Stampede3 has {len(queues)} available queues")
except Exception as e:
    print(f"Cannot access Stampede3: {e}")

🔧 Advanced Patterns

Parametric Studies

# Submit multiple jobs with different parameters
base_request = client.jobs.generate_request(
    app_id="mpm-s3",
    input_dir_uri=input_uri,
    script_filename="template.json",
    max_minutes=60,
    allocation="your_allocation"
)

# Parameter variations
parameters = [
    {"friction": 0.1, "density": 2000},
    {"friction": 0.2, "density": 2200},
    {"friction": 0.3, "density": 2400},
]

submitted_jobs = []
for i, params in enumerate(parameters):
    # Modify job request for this parameter set
    job_req = base_request.copy()
    job_req["name"] = f"parametric_study_{i:03d}"
    job_req["description"] = f"Friction: {params['friction']}, Density: {params['density']}"

    # Add parameters as environment variables
    if "parameterSet" not in job_req:
        job_req["parameterSet"] = {}
    if "envVariables" not in job_req["parameterSet"]:
        job_req["parameterSet"]["envVariables"] = []

    job_req["parameterSet"]["envVariables"].extend([
        {"key": "FRICTION", "value": str(params["friction"])},
        {"key": "DENSITY", "value": str(params["density"])}
    ])

    # Submit job
    job = client.jobs.submit_request(job_req)
    submitted_jobs.append(job)
    print(f"Submitted job {i+1}/{len(parameters)}: {job.uuid}")

# Monitor all jobs
print("Monitoring all jobs...")
for i, job in enumerate(submitted_jobs):
    print(f"\nMonitoring job {i+1}/{len(submitted_jobs)}: {job.uuid}")
    final_status = job.monitor()
    print(f"Job {i+1} final status: {final_status}")

Job Dependencies

# Submit jobs with dependencies (manual coordination)
# Job 1: Preprocessing
prep_job = client.jobs.submit_request(preprocessing_request)
prep_status = prep_job.monitor()

if prep_status == "FINISHED":
    print("Preprocessing complete, starting main analysis...")

    # Job 2: Main analysis (uses outputs from Job 1)
    main_request["fileInputs"].append({
        "name": "Preprocessed Data",
        "sourceUrl": prep_job.archive_uri,
        "targetPath": "preprocessed"
    })

    main_job = client.jobs.submit_request(main_request)
    main_status = main_job.monitor()

    if main_status == "FINISHED":
        print("Main analysis complete, starting postprocessing...")

        # Job 3: Postprocessing
        post_request["fileInputs"].append({
            "name": "Analysis Results",
            "sourceUrl": main_job.archive_uri,
            "targetPath": "results"
        })

        post_job = client.jobs.submit_request(post_request)
        final_status = post_job.monitor()

        print(f"Pipeline complete. Final status: {final_status}")

🚨 Error Handling

Common Issues and Solutions

from dapi import JobSubmissionError, JobMonitorError

try:
    # Job submission
    job = client.jobs.submit_request(job_request)
    final_status = job.monitor()

except JobSubmissionError as e:
    print(f"Job submission failed: {e}")

    # Check common issues
    if "allocation" in str(e).lower():
        print("💡 Check your TACC allocation is correct and active")
    elif "queue" in str(e).lower():
        print("💡 Check the queue name is valid for the system")
    elif "file" in str(e).lower():
        print("💡 Check input files exist and paths are correct")

except JobMonitorError as e:
    print(f"Job monitoring failed: {e}")

    # Try to get last known status
    try:
        status = job.get_status()
        print(f"Last known status: {status}")
    except:
        print("Cannot determine job status")

except Exception as e:
    print(f"Unexpected error: {e}")

Debugging Failed Jobs

# For failed jobs, get detailed error information
if final_status == "FAILED":
    print("🔍 Debugging failed job...")

    # Get error logs
    stderr = job.get_output_content("tapisjob.err", missing_ok=True)
    if stderr:
        print("Standard Error:")
        print(stderr)

    # Get last part of stdout
    stdout = job.get_output_content("tapisjob.out", max_lines=100)
    if stdout:
        print("Last 100 lines of output:")
        print(stdout)

    # Check job details
    details = job.details
    print(f"Last message: {details.lastMessage}")
    print(f"Status history available via: job.print_runtime_summary(verbose=True)")

💡 Best Practices

1. Resource Planning

# ✅ Choose appropriate resources
job_request = client.jobs.generate_request(
    app_id="mpm-s3",
    input_dir_uri=input_uri,
    script_filename="analysis.json",
    max_minutes=60,        # Realistic time estimate
    node_count=1,          # Start small, scale up
    cores_per_node=24,     # Match application parallelism
    queue="development",   # Use dev queue for testing
    allocation="your_allocation"
)

2. Job Organization

# ✅ Use descriptive names and metadata
job_request["name"] = f"seismic_analysis_{site_id}_{datetime.now().strftime('%Y%m%d_%H%M')}"
job_request["description"] = f"Seismic analysis for site {site_id} using {method} method"
job_request["tags"] = ["research", "seismic", site_id, method]

3. Error Recovery

# ✅ Implement retry logic for transient failures
max_retries = 3
for attempt in range(max_retries):
    try:
        job = client.jobs.submit_request(job_request)
        final_status = job.monitor()
        break
    except JobSubmissionError as e:
        if attempt < max_retries - 1:
            print(f"Attempt {attempt + 1} failed, retrying... ({e})")
            time.sleep(60)  # Wait before retry
        else:
            raise

➡️ Next Steps