Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Job Management

Listing Jobs

Browse your job history as a pandas DataFrame with optional filtering.

from dapi import DSClient

ds = DSClient()

# List all recent jobs (default: last 100, returns DataFrame)
df = ds.jobs.list()
print(df[["name", "uuid", "status", "appId", "created_dt"]])

# Filter by application
df = ds.jobs.list(app_id="opensees-mp-s3")

# Filter by status
df = ds.jobs.list(status="FINISHED")

# Combine filters and increase limit
df = ds.jobs.list(app_id="matlab-r2023a", status="FAILED", limit=500)

# Use pandas for further analysis
finished = df[df["status"] == "FINISHED"]
print(f"Finished jobs: {len(finished)}")
print(finished.groupby("appId").size())

Output Formats

By default list() returns a pandas DataFrame. Use the output parameter for other formats:

# DataFrame (default) -- includes formatted datetime columns
df = ds.jobs.list()

# List of dicts -- lightweight, no pandas dependency
jobs = ds.jobs.list(output="list")
for job in jobs:
    print(f"{job['name']}: {job['status']}")

# Raw TapisResult objects -- for advanced Tapis API usage
raw = ds.jobs.list(output="raw")

Application Discovery

Finding Applications

all_apps = ds.apps.find("", verbose=False)
print(f"Found {len(all_apps)} applications")

matlab_apps = ds.apps.find("matlab", verbose=True)
opensees_apps = ds.apps.find("opensees", verbose=True)
mpm_apps = ds.apps.find("mpm", verbose=True)

Getting Application Details

app_details = ds.apps.get_details("mpm-s3", verbose=True)

print(f"App: {app_details.id}")
print(f"Version: {app_details.version}")
print(f"Execution System: {app_details.jobAttributes.execSystemId}")
print(f"Max Runtime: {app_details.jobAttributes.maxMinutes} minutes")
print(f"Default Cores: {app_details.jobAttributes.coresPerNode}")

Available Applications

ApplicationApp IDDescription
Agnosticdesignsafe-agnostic-appGeneral-purpose Python/OpenSees/PyLauncher execution
MATLABmatlab-r2023aMATLAB computational environment
OpenSeesopensees-expressStructural analysis framework
OpenSees MPopensees-mp-s3OpenSees parallel (MPI) analysis
MPMmpm-s3Material Point Method simulations
ADCIRCadcirc-v55Coastal circulation modeling
LS-DYNAls-dynaExplicit finite element analysis

The Agnostic App (designsafe-agnostic-app) runs Python scripts, OpenSeesPy, and PyLauncher parameter sweeps on TACC systems. It includes Python 3.12 with OpenSeesPy pre-installed and supports configurable TACC module loading. It runs in serial mode (isMpi: false), which is what PyLauncher workflows need.

Job Submission

Basic Submission

# 1. Prepare input directory
input_path = "/MyData/analysis/input/"
input_uri = ds.files.to_uri(input_path, verify_exists=True)

# 2. Generate job request
job_request = ds.jobs.generate(
 app_id="matlab-r2023a",
 input_dir_uri=input_uri,
 script_filename="run_analysis.m",
 max_minutes=60,
 allocation="your_tacc_allocation"
)

# 3. Submit job
job = ds.jobs.submit(job_request)
print(f"Job submitted: {job.uuid}")

Advanced Configuration

job_request = ds.jobs.generate(
 app_id="mpm-s3",
 input_dir_uri=input_uri,
 script_filename="mpm.json",

 # Resource requirements
 max_minutes=120,
 node_count=2,
 cores_per_node=48,
 memory_mb=96000,
 queue="normal",
 allocation="your_allocation",

 # Job metadata
 job_name="mpm_parametric_study_001",
 description="Parametric study of soil behavior under seismic loading",
 tags=["research", "mpm", "seismic"],

 # Additional file inputs
 extra_file_inputs=[
 {
 "name": "Material Library",
 "sourceUrl": "tapis://designsafe.storage.default/shared/materials/",
 "targetPath": "materials"
 }
 ],

 # Environment variables
 extra_env_vars=[
 {"key": "OMP_NUM_THREADS", "value": "48"},
 {"key": "ANALYSIS_TYPE", "value": "SEISMIC"}
 ],

 # Scheduler options
 extra_scheduler_options=[
 {"name": "Email Notification", "arg": "-m be"},
 {"name": "Job Array", "arg": "-t 1-10"}
 ]
)

Modifying Job Requests

job_request = ds.jobs.generate(...)

# Modify before submission
job_request["name"] = "custom_job_name"
job_request["description"] = "Updated description"
job_request["nodeCount"] = 4
job_request["maxMinutes"] = 180

# Add custom parameters
if "parameterSet" not in job_request:
 job_request["parameterSet"] = {}
if "envVariables" not in job_request["parameterSet"]:
 job_request["parameterSet"]["envVariables"] = []

job_request["parameterSet"]["envVariables"].append({
 "key": "CUSTOM_PARAM",
 "value": "custom_value"
})

job = ds.jobs.submit(job_request)

Job Monitoring

Real-time Monitoring

job = ds.jobs.submit(job_request)

final_status = job.monitor(
 interval=15, # Check every 15 seconds
 timeout_minutes=240 # Timeout after 4 hours
)

ds.jobs.interpret_status(final_status, job.uuid)

Manual Status Checking

current_status = job.get_status()
print(f"Current status: {current_status}")

if current_status in job.TERMINAL_STATES:
 print("Job has finished")
else:
 print("Job is still running")

details = job.details
print(f"Submitted: {details.created}")
print(f"Started: {details.started}")
print(f"Last Updated: {details.lastUpdated}")

Job Statuses

StatusDescription
PENDINGSubmitted, not yet processed
PROCESSING_INPUTSInput files being staged
STAGING_INPUTSFiles transferring to compute system
STAGING_JOBJob being prepared for execution
SUBMITTING_JOBSubmitting to scheduler
QUEUEDWaiting in scheduler queue
RUNNINGExecuting
ARCHIVINGOutput files being archived
FINISHEDCompleted successfully
FAILEDFailed
CANCELLEDCancelled
STOPPEDStopped

Job Analysis

Runtime Summary

job.print_runtime_summary(verbose=False)

# Detailed history
job.print_runtime_summary(verbose=True)

Example output:

Runtime Summary
---------------
QUEUED time: 00:05:30
RUNNING time: 01:23:45
TOTAL time: 01:29:15
---------------

Status Messages

last_message = job.last_message
if last_message:
 print(f"Last message: {last_message}")

Output Management

Listing Outputs

outputs = job.list_outputs()
for output in outputs:
 print(f"- {output.name} ({output.type}, {output.size} bytes)")

# Subdirectory
results = job.list_outputs(path="results/")

Reading Output Files

stdout = job.get_output_content("tapisjob.out")
if stdout:
 print(stdout)

# Last 50 lines
recent_output = job.get_output_content("tapisjob.out", max_lines=50)

# Error log
stderr = job.get_output_content("tapisjob.err", missing_ok=True)
if stderr:
 print(stderr)

Downloading Files

job.download_output("results.mat", "/local/path/results.mat")

ds.files.download(
 f"{archive_uri}/results.mat",
 "/local/path/results.mat"
)

Job Cancellation

job.cancel()
status = job.get_status()
print(f"Status after cancel: {status}")

Cancellation may not be immediate. Jobs in terminal states (FINISHED, FAILED, etc.) cannot be cancelled.

Resuming Monitoring

from dapi import SubmittedJob

job_uuid = "12345678-1234-1234-1234-123456789abc"
resumed_job = SubmittedJob(ds._tapis, job_uuid)
final_status = resumed_job.monitor()

Parameter Sweeps with PyLauncher

PyLauncher runs many independent tasks within a single SLURM allocation -- ideal for parameter studies. dapi generates sweep commands, task lists, and launcher scripts.

ds = DSClient()

sweep = {
    "ALPHA": [0.3, 0.5, 3.7],
    "BETA":  [1.1, 2.0, 3.0],
}

# Preview (dry run)
ds.jobs.parametric_sweep.generate(
    'python3 simulate.py --alpha ALPHA --beta BETA',
    sweep,
    preview=True,
)

# Generate sweep files
ds.jobs.parametric_sweep.generate(
    'python3 simulate.py --alpha ALPHA --beta BETA '
    '--output "$WORK/sweep_$SLURM_JOB_ID/run_ALPHA_BETA"',
    sweep,
    "/home/jupyter/MyData/sweep_demo/",
    debug="host+job",
)

# Submit
job = ds.jobs.parametric_sweep.submit(
    "/MyData/sweep_demo/",
    app_id="designsafe-agnostic-app",
    allocation="your_allocation",
    node_count=1,
    cores_per_node=48,
    max_minutes=30,
)
job.monitor()

For a full walkthrough with OpenSees, see the PyLauncher example.

Bulk Operations

job_uuids = ["uuid1", "uuid2", "uuid3"]
jobs = [SubmittedJob(ds._tapis, uuid) for uuid in job_uuids]

for job in jobs:
 status = job.get_status()
 print(f"Job {job.uuid}: {status}")

for job in jobs:
 if job.get_status() not in job.TERMINAL_STATES:
 final_status = job.monitor()
 print(f"Final status: {final_status}")

Multiple Separate Jobs

If each run needs its own full allocation (e.g., MPI jobs that can’t share nodes), submit them as separate Tapis jobs:

parameters = [
 {"friction": 0.1, "density": 2000},
 {"friction": 0.2, "density": 2200},
 {"friction": 0.3, "density": 2400},
]

submitted_jobs = []
for i, params in enumerate(parameters):
 job_req = ds.jobs.generate(
 app_id="mpm-s3",
 input_dir_uri=input_uri,
 script_filename="template.json",
 max_minutes=60,
 allocation="your_allocation",
 extra_env_vars=[
 {"key": "FRICTION", "value": str(params["friction"])},
 {"key": "DENSITY", "value": str(params["density"])},
 ],
 )
 job_req["name"] = f"parametric_study_{i:03d}"
 job = ds.jobs.submit(job_req)
 submitted_jobs.append(job)

for job in submitted_jobs:
 job.monitor()

For independent serial tasks, PyLauncher is more efficient — it runs all tasks in a single allocation.

Job Dependencies

# Job 1: Preprocessing
prep_job = ds.jobs.submit(preprocessing_request)
prep_status = prep_job.monitor()

if prep_status == "FINISHED":
 # Job 2: Main analysis (uses outputs from Job 1)
 main_request["fileInputs"].append({
 "name": "Preprocessed Data",
 "sourceUrl": prep_job.archive_uri,
 "targetPath": "preprocessed"
 })

 main_job = ds.jobs.submit(main_request)
 main_status = main_job.monitor()

 if main_status == "FINISHED":
 # Job 3: Postprocessing
 post_request["fileInputs"].append({
 "name": "Analysis Results",
 "sourceUrl": main_job.archive_uri,
 "targetPath": "results"
 })

 post_job = ds.jobs.submit(post_request)
 final_status = post_job.monitor()

Error Handling

from dapi import JobSubmissionError, JobMonitorError

try:
 job = ds.jobs.submit(job_request)
 final_status = job.monitor()

except JobSubmissionError as e:
 print(f"Job submission failed: {e}")

 if "allocation" in str(e).lower():
 print("Check your TACC allocation is correct and active")
 elif "queue" in str(e).lower():
 print("Check the queue name is valid for the system")
 elif "file" in str(e).lower():
 print("Check input files exist and paths are correct")

except JobMonitorError as e:
 print(f"Job monitoring failed: {e}")
 try:
 status = job.get_status()
 print(f"Last known status: {status}")
 except:
 print("Cannot determine job status")

Debugging Failed Jobs

if final_status == "FAILED":
 stderr = job.get_output_content("tapisjob.err", missing_ok=True)
 if stderr:
 print("Standard Error:")
 print(stderr)

 stdout = job.get_output_content("tapisjob.out", max_lines=100)
 if stdout:
 print("Last 100 lines of output:")
 print(stdout)

 details = job.details
 print(f"Last message: {details.lastMessage}")
 print(f"Full history: job.print_runtime_summary(verbose=True)")

System Queues

frontera_queues = ds.systems.queues("frontera")
for queue in frontera_queues:
 print(f"Queue: {queue.name}")
 print(f"Max runtime: {queue.maxMinutes} min")
 print(f"Max nodes: {queue.maxNodeCount}")