Step-by-Step OpenSees App#
Step-by-Step Detailed Submit an OpenSees Tapis App
by Silvia Mazzoni, DesignSafe, 2025
This notebook serves as a template for submitting the following DesignSafe OpenSees Apps:
openSees-express
openSees-mp-s3
opensees-sp-s3
We are using previously-defined python function to streamline the process.
Connect to Tapis#
t=OpsUtils.connect_tapis()
-- Checking Tapis token --
Token loaded from file. Token is still valid!
Token expires at: 2025-09-10T07:14:42+00:00
Token expires in: 2:05:58.887582
-- LOG IN SUCCESSFUL! --
print('QUEUE INFO')
OpsUtils.get_system_queues(t,system_id="stampede3",display=True);
QUEUE INFO
| name | icx | skx | skx-dev | pvc | spr | nvdimm | h100 |
|---|---|---|---|---|---|---|---|
| description | icx (CPU, standard compute) | skx (CPU, recommended standard compute) | skx-dev (CPU, 2 hour max, 1 job max, for testing) | pvc (GPU, Intel no CUDA) | spr (CPU, high memory bandwidth) | nvdimm (CPU, large memory) | None |
| hpcQueueName | icx | skx | skx-dev | pvc | spr | nvdimm | h100 |
| maxJobs | -1 | -1 | -1 | -1 | -1 | -1 | -1 |
| maxJobsPerUser | 20 | 60 | 3 | 4 | 36 | 3 | 4 |
| minNodeCount | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
| maxNodeCount | 32 | 256 | 16 | 4 | 32 | 1 | 4 |
| minCoresPerNode | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
| maxCoresPerNode | 80 | 48 | 48 | 96 | 112 | 80 | 96 |
| minMemoryMB | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
| maxMemoryMB | 256000 | 192000 | 192000 | 128000 | 128000 | 4000000 | 1000000 |
| minMinutes | 1 | 1 | 1 | 1 | 1 | 1 | 1 |
| maxMinutes | 2880 | 2880 | 120 | 2880 | 2880 | 2880 | 2880 |
App User Input#
Initialize#
# initalize
tapisInput = {}
tapisInput["name"] = 'OpenSeesMPsubmit'
App Parameters#
tapisInput["appId"] = "opensees-mp-s3" # options: "opensees-express", "opensees-mp-s3", "opensees-2p-s3"
tapisInput["appVersion"] = "latest" # always use latest in this Notebook Template
Get app schema – input#
thisAppData_MP = OpsUtils.get_tapis_app_schema(t,tapisInput["appId"],version='latest')
OpsUtils.display_tapis_app_schema(thisAppData_MP)
########################################
########### TAPIS-APP SCHEMA ###########
########################################
######## appID: opensees-mp-s3
######## version: latest
########################################
{
sharedAppCtx: "wma_prtl"
isPublic: True
tenant: "designsafe"
id: "opensees-mp-s3"
version: "latest"
description: "Runs all the processors in parallel. Requires understanding of parallel processing and the capabilities to write parallel scripts."
owner: "wma_prtl"
enabled: True
versionEnabled: True
locked: False
runtime: "ZIP"
runtimeVersion: None
runtimeOptions: None
containerImage: "tapis://cloud.data/corral/tacc/aci/CEP/applications/v3/opensees/latest/OpenSees/opensees.zip"
jobType: "BATCH"
maxJobs: 2147483647
maxJobsPerUser: 2147483647
strictFileInputs: True
uuid: "1410a584-0c5e-4e47-b3b0-3a7bea0e1187"
deleted: False
created: "2025-02-20T18:01:49.005183Z"
updated: "2025-08-28T20:17:39.426067Z"
sharedWithUsers: []
tags: ["portalName: DesignSafe", "portalName: CEP"]
jobAttributes: {
description: None
dynamicExecSystem: False
execSystemConstraints: None
execSystemId: "stampede3"
execSystemExecDir: "${JobWorkingDir}"
execSystemInputDir: "${JobWorkingDir}"
execSystemOutputDir: "${JobWorkingDir}"
dtnSystemInputDir: "!tapis_not_set"
dtnSystemOutputDir: "!tapis_not_set"
execSystemLogicalQueue: "skx"
archiveSystemId: "stampede3"
archiveSystemDir: "HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}"
archiveOnAppError: True
isMpi: False
mpiCmd: None
cmdPrefix: None
nodeCount: 2
coresPerNode: 48
memoryMB: 192000
maxMinutes: 120
fileInputs: [
{
name: "Input Directory"
description: "Input directory that includes the tcl script as well as any other required files. Example input is in tapis://designsafe.storage.community/app_examples/opensees/OpenSeesMP"
inputMode: "REQUIRED"
autoMountLocal: True
envKey: "inputDirectory"
sourceUrl: None
targetPath: "inputDirectory"
notes: {
selectionMode: "directory"
}
}
]
fileInputArrays: []
subscriptions: []
tags: []
parameterSet: {
appArgs: [
{
arg: "OpenSeesMP"
name: "mainProgram"
description: None
inputMode: "FIXED"
notes: {
isHidden: True
}
}
{
arg: None
name: "Main Script"
description: "The filename only of the OpenSees TCL script to execute. This file should reside in the Input Directory specified. To use with test input, use 'freeFieldEffective.tcl'"
inputMode: "REQUIRED"
notes: {
inputType: "fileInput"
}
}
]
containerArgs: []
schedulerOptions: [
{
arg: "--tapis-profile OpenSees_default"
name: "OpenSees TACC Scheduler Profile"
description: "Scheduler profile for the default version of OpenSees"
inputMode: "FIXED"
notes: {
isHidden: True
}
}
{
arg: None
name: "TACC Reservation"
description: "Reservation input string"
inputMode: "INCLUDE_ON_DEMAND"
notes: {
isHidden: True
}
}
]
envVariables: []
archiveFilter: {
includeLaunchFiles: True
includes: []
excludes: []
}
logConfig: {
stdoutFilename: ""
stderrFilename: ""
}
}
}
notes: {
icon: "OpenSees"
label: "OpenSeesMP"
helpUrl: "https://www.designsafe-ci.org/user-guide/tools/simulation/#opensees-user-guide"
category: "Simulation"
isInteractive: False
showReservation: True
hideNodeCountAndCoresPerNode: False
}
}
########################################
TACC-Job Parameters#
https://docs.tacc.utexas.edu/hpc/stampede3/
tapisInput["maxMinutes"] = 6
# OpenSees-mp-s3 and OpenSees-xp-s3 only:
if tapisInput["appId"] in ["opensees-mp-s3","opensees-sp-s3"]:
tapisInput["execSystemId"] = "stampede3" # the app runs on stampede only
tapisInput["execSystemLogicalQueue"] = "skx-dev" # "skx", "skx-dev"... for info: use the command 'OpsUtils.get_system_queues(t,system_id="stampede3",display=True);'
tapisInput["nodeCount"] = 1 # limits set by which compute nodes you use
tapisInput["coresPerNode"] = 48 # limits set by which compute nodes you use
tapisInput["allocation"] = "DS-HPC1"
INPUT Files Parameters#
Storage SystemTapis & Tapis Base Path in URI format#
this is the very first part of your path, just above your home folder.
Options:
CommunityData
Published
The following options are user or project-dependent, and require unique path input.
The following option requires additional user-dependent input:
MyData
The following option requires additional user- and system- dependent input:
Work
The following option requires additional project-dependent input:
MyProjects
You can obtain a dependent tapis-URI path by performing the first step of submitting an OpenSeesMP job at the app portal: https://www.designsafe-ci.org/workspace/opensees-mp-s3
tapisInput['storage_system'] = 'MyData' # options: Community,MyData,Published,MyProjects,Work/stampede3,Work/frontera,Work/ls6
We will use a utility function to get the Tapis-compatible URI:#
get_uer_path_tapis_uri(): Discover and cache user-specific Tapis base URIs for DesignSafe storage systems, then return either the entire dictionary or a single base URI.
get_user_path_tapis_uri.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/get_user_path_tapis_uri.py
from __future__ import annotations
def get_user_path_tapis_uri(
t,
file_system: str = "none", # "none" | "mydata" | "community" | "work/stampede3","work/ls6","work/frontera"
paths_file_path: str = "~/MyData/.tapis_user_paths.json",
force_refresh: bool = False,
) -> Union[str, Dict]:
"""
Discover and cache user-specific Tapis base URIs for DesignSafe storage systems,
then return either the entire dictionary or a single base URI.
Author
-------
Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com)
Parameters
----------
t : Tapipy client
An authenticated Tapipy v3 client.
file_system : {"none","mydata","community","work"}, optional
Which base to return. Use "none" to return the full dictionary.
When file_system="work", which HPC system's Work base to return.
paths_file_path : str, optional
Location (on MyData or local home) where the JSON cache is stored.
Default: "~/MyData/.tapis_user_paths.json".
force_refresh : bool, optional
If True, (re)discover all bases and overwrite the cache file.
Returns
-------
Union[str, dict]
- If file_system == "none": the full dict of bases (including subdict for "work").
- Else: a single base URI string for the requested system.
Notes
-----
- Stored values are full Tapis URIs (start with "tapis://" and end with "/").
- Keys are lowercase: "mydata", "community", "work". For "work", values are a dict
keyed by HPC system ("stampede3", "ls6", "frontera").
"""
import json
import os
from pathlib import Path
from typing import Dict, Optional, Union, Iterable, Sequence
from OpsUtils import OpsUtils
# ----------------------------
# normalize & validate inputs
# ----------------------------
fs = (file_system or "none").strip().lower()
# print('fs',fs)
# Handle loose input like "CommunityData"
if "community" in fs:
fs = "community"
valid_file_systems = ["mydata", "community", "published","none"]
valid_work_systems = ["stampede3", "ls6", "frontera", "none"]
for thisW in valid_work_systems:
valid_file_systems.append(f'work/{thisW}')
# print('valid_file_systems',valid_file_systems)
if fs not in valid_file_systems:
raise ValueError(f"file_system='{file_system}' not in {sorted(valid_file_systems)}")
cache_path = Path(os.path.expanduser(paths_file_path))
# print('cache_path',cache_path)
# ----------------------------
# helper: normalize URIs
# ----------------------------
def _with_scheme(u: str) -> str:
u = u.strip()
if not u:
return u
if not u.startswith("tapis://"):
u = "tapis://" + u.lstrip("/")
# if not u.endswith("/"):
# u += "/"
u = u.rstrip("/")
return u
# ----------------------------
# try reading existing cache
# ----------------------------
paths: Dict = {}
if cache_path.exists() and not force_refresh:
try:
with cache_path.open("r", encoding="utf-8") as f:
paths = json.load(f)
print(f'found paths file: {cache_path}')
except Exception:
paths = {}
# quick return if cache satisfies the request
def _maybe_return_from_cache() -> Optional[Union[str, Dict]]:
if not paths:
return None
if fs == "none":
return paths
if fs in {"mydata", "community", "published"}:
val = paths.get(fs)
if isinstance(val, str) and val:
return _with_scheme(val)
if "work" in fs:
val = paths.get(fs)
if isinstance(val, str) and val:
return _with_scheme(val)
return None
cached = _maybe_return_from_cache()
if cached is not None:
return cached
# ----------------------------
# (re)discover all bases
# ----------------------------
try:
username = OpsUtils.get_tapis_username(t)
except Exception as e:
raise RuntimeError(f"Could not determine Tapis username: {e}")
discovered: Dict = {
"mydata": _with_scheme(f"designsafe.storage.default/{username}"),
"community": _with_scheme("designsafe.storage.community"),
"published": _with_scheme("designsafe.storage.published"),
}
# Discover Work bases using the new inner helper
for system in ("stampede3", "ls6", "frontera"):
try:
base_uri = OpsUtils.get_user_work_tapis_uri(t, system_id=system)
discovered[f'work/{system}'] = _with_scheme(base_uri) # idempotent
except Exception:
# Skip systems we can't resolve; they can be refreshed later
continue
# Persist to cache
cache_path.parent.mkdir(parents=True, exist_ok=True)
with cache_path.open("w", encoding="utf-8") as f:
json.dump(discovered, f, indent=2)
print(f'saved data to {cache_path}')
# Return per request
if fs == "none":
return discovered
else:
return discovered[fs]
return discovered
tapisInput['storage_system_baseURL'] = OpsUtils.get_user_path_tapis_uri(t,tapisInput['storage_system'])
print('storage_system_baseURL:',tapisInput['storage_system_baseURL'])
found paths file: /home/jupyter/MyData/.tapis_user_paths.json
storage_system_baseURL: tapis://designsafe.storage.default/silvia
File Paths#
The input_folder is the directory of your input file.
DO NOT INCLUDE the storage system, such as MyData, etc.
Start from the first folder within your storage-system folder.
The Main Script is the full name of the file that will be submitted to OpenSees. Yes you need to include the extension.
tapisInput['input_folder'] = '_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples'
tapisInput['Main Script'] = 'Ex1a_verymany.Canti2D.Push.mp.tcl'
OUTPUT-Files Parameters#
Where would you like your files to be archived once the job is finished?
Options: MyData and Work
In both cases you will find them in the tapis-jobs-archive folder in either MyData or Work/stampede3.
Remember, you cannot write to Projects nor CommunityData.
tapisInput['archive_system']='Work' # Options: MyData or Work
Review User Input#
display(tapisInput)
{'name': 'OpenSeesMPsubmit',
'appId': 'opensees-mp-s3',
'appVersion': 'latest',
'maxMinutes': 6,
'execSystemId': 'stampede3',
'execSystemLogicalQueue': 'skx-dev',
'nodeCount': 1,
'coresPerNode': 48,
'allocation': 'DS-HPC1',
'storage_system': 'MyData',
'storage_system_baseURL': 'tapis://designsafe.storage.default/silvia',
'input_folder': '_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples',
'Main Script': 'Ex1a_verymany.Canti2D.Push.mp.tcl',
'archive_system': 'Work'}
Get interpreted Tapis-App Input (Optional)#
this is what will be sent as input (just informational here, and good to check input)
get_tapis_job_description.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/get_tapis_job_description.py
def get_tapis_job_description(t, tapisInput):
"""
Build a complete Tapis v3 job description dict from user-friendly inputs.
This function:
1) Resolves the Files base URL for inputs from `tapisInput["storage_system"]`
(supports MyData/community/published; can be overridden via
`tapisInput["storage_system_baseURL"]`).
2) Ensures a concrete application version:
- If `appVersion` is missing or equals "latest", it resolves a pinned
version via `OpsUtils.get_latest_app_version(t, )`.
3) Validates required fields (set differs for OpenSees-Express vs. HPC apps).
4) Constructs job attributes, fileInputs, parameterSet, and archive settings.
Auto baseURL selection
----------------------
If `storage_system_baseURL` is not provided:
- If `storage_system` contains "mydata": uses
`tapis://designsafe.storage.default/` where `` is taken
from `t.authenticator.get_userinfo().username`.
- If `storage_system` contains "community": uses
`tapis://designsafe.storage.community`
- If `storage_system` contains "published": uses
`tapis://designsafe.storage.published`
- Else: prints a message and returns -1.
App families and required keys
------------------------------
*OpenSees-Express* (`appId == "opensees-express"`)
Required: ['name','appId','appVersion','maxMinutes','archive_system',
'storage_system','input_folder','Main Script']
- Sets `parameterSet.envVariables` with:
mainProgram = OpenSees
tclScript =
- `fileInputs = [{"name": "Input Directory", "sourceUrl": " /"}]`
- Archive options:
archive_system == 'MyData' -> designsafe.storage.default under
${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}
archive_system == 'Temp' -> cloud.data:/tmp/${JobOwner}/tapis-jobs-archive/...
*HPC OpenSees apps* (e.g., opensees-mp-s3, opensees-sp-*, etc.)
Required: ['name','appId','appVersion','execSystemId','execSystemLogicalQueue',
'nodeCount','coresPerNode','maxMinutes','allocation','archive_system',
'storage_system','input_folder','Main Script']
- Sets base job attributes (system, queue, resources).
- `parameterSet.appArgs = [{"name": "Main Script", "arg": }]`
- `parameterSet.schedulerOptions = [{"name": "TACC Allocation",
"arg": f"-A {allocation}"}]`
- `fileInputs` as above.
- Archive options:
archive_system == 'MyData' -> designsafe.storage.default under
${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}
archive_system == 'Work' -> exec system $WORK/tapis-jobs-archive/...
Returns
-------
dict
A fully formed Tapis job description ready for submission.
Returns -1 if validation fails or baseURL/appVersion cannot be determined.
Side effects
------------
- Prints a list of missing required keys (if any).
- Prints a short confirmation ("All Input is Complete") on success.
Author
------
Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com)
Date
----
2025-08-16
Version
-------
1.2
"""
# Silvia Mazzoni, 2025
from OpsUtils import OpsUtils # for get_latest_app_version
def checkRequirements(tapisInputIN, RequiredInputList):
nmiss = 0
for thisReq in RequiredInputList:
if thisReq not in tapisInputIN:
nmiss += 1
print(f"YOU need to define the following input: {thisReq}")
return nmiss
# --- Resolve storage baseURL if needed ---
if "sourceUrl" not in tapisInput:
if "storage_system_baseURL" not in tapisInput:
if 'storage_system' not in tapisInput:
print('NEED more info for your input directory')
return -1
storage_system_lower = tapisInput.get("storage_system", "").lower()
tapisInput['storage_system_baseURL'] = OpsUtils.get_user_path_tapis_uri(t,storage_system_lower)
# print("tapisInput['storage_system_baseURL']",tapisInput['storage_system_baseURL'])
tapisInput["sourceUrl"] = f"{tapisInput['storage_system_baseURL']}"
if 'input_folder' in tapisInput:
tapisInput["sourceUrl"] = f"{tapisInput['sourceUrl']}/{tapisInput['input_folder']}"
print('input directory URI:',tapisInput["sourceUrl"])
appId = tapisInput["appId"]
# print('appId',appId)
# --- Ensure a concrete appVersion (resolve 'latest' or missing) ---
if ("appVersion" not in tapisInput) or (str(tapisInput["appVersion"]).lower() == "latest"):
resolved = OpsUtils.get_latest_app_version(t, appId)
if not resolved or resolved in ("none", ""):
print(f"Unable to resolve latest version for appId='{appId}'. Please specify appVersion.")
return -1
tapisInput["appVersion"] = resolved
# --- Get App Schema ---
appMetaData = t.apps.getAppLatestVersion(appId=appId)
app_MetaData = appMetaData.__dict__
app_jobAttributes = app_MetaData['jobAttributes'].__dict__
app_parameterSet = app_jobAttributes['parameterSet'].__dict__
inputKeys_App = ['id','version']
inputKeys_jobAttributes = ['execSystemId', 'execSystemLogicalQueue', 'archiveSystemId', 'archiveSystemDir', 'nodeCount', 'coresPerNode', 'memoryMB', 'maxMinutes']
# --- Defaults & branching ---
# If Express, default the exec system to the Express VM unless provided
if appId == "opensees-express" and "execSystemId" not in tapisInput:
tapisInput["execSystemId"] = "wma-exec-01"
if "execSystemId" not in tapisInput:
tapisInput["execSystemId"] = "stampede3"
job_description = {}
nmiss = 999
# Express (runs on wma-exec-01)
if appId == "opensees-express":
RequiredInputList = [
"name", "appId", "maxMinutes", "archive_system",
"storage_system", "input_folder", "Main Script"
]
nmiss = checkRequirements(tapisInput, RequiredInputList)
if nmiss == 0:
parameterSet = {}
job_description["name"] = tapisInput["name"]
job_description["maxMinutes"] = tapisInput["maxMinutes"]
job_description["appId"] = tapisInput["appId"]
if 'appVersion' in tapisInput.keys():
job_description["appVersion"] = tapisInput["appVersion"]
fileInputs = [{"name": "Input Directory", "sourceUrl": tapisInput["sourceUrl"]}]; # keep as is!
if not 'Main Program' in tapisInput.keys():
tapisInput["Main Program"] = 'OpenSees'
parameterSet["envVariables"] = [
{"key": "mainProgram", "value": tapisInput["Main Program"]},
{"key": "tclScript", "value": tapisInput["Main Script"]},
]
job_description["fileInputs"] = fileInputs
job_description["parameterSet"] = parameterSet
else:
# HPC (e.g., OpenSeesMP/SP on Stampede3)
RequiredInputList = [
"name","appId","execSystemId","execSystemLogicalQueue",
"nodeCount","coresPerNode","maxMinutes","allocation","archive_system",
]
nmiss = checkRequirements(tapisInput, RequiredInputList)
if nmiss == 0:
parameterSet = {}
job_description["name"] = tapisInput["name"]
job_description["execSystemId"] = tapisInput["execSystemId"]
job_description["execSystemLogicalQueue"] = tapisInput["execSystemLogicalQueue"]
job_description["maxMinutes"] = tapisInput["maxMinutes"]
job_description["nodeCount"] = tapisInput["nodeCount"]
job_description["coresPerNode"] = tapisInput["coresPerNode"]
job_description["appId"] = tapisInput["appId"]
if 'appVersion' in tapisInput.keys():
job_description["appVersion"] = tapisInput["appVersion"]
fileInputs = [{"name": "Input Directory", "sourceUrl": tapisInput["sourceUrl"]}]
# if not 'Main Program' in tapisInput.keys():
# tapisInput["Main Program"] = 'OpenSeesMP'
# if not 'CommandLine Arguments' in tapisInput:
# tapisInput["CommandLine Arguments"] = ''
HEREkey = 'appArgs'
print('HEREkey',HEREkey)
parameterSet[HEREkey] = []
for app_Dict in app_parameterSet[HEREkey]:
app_Dict = app_Dict.__dict__
print('app_Dict',app_Dict)
here_name = app_Dict['name']
here_dict = {"name":here_name,"arg":app_Dict['arg']}
if 'notes' in app_Dict:
app_Dict_notes = app_Dict['notes'].__dict__
if 'isHidden' in app_Dict_notes and app_Dict_notes['isHidden']==True:
continue
if here_name in tapisInput:
here_dict["arg"] = tapisInput[here_name]
parameterSet[HEREkey].append(here_dict)
HEREkey = 'envVariables'
print('HEREkey',HEREkey)
parameterSet[HEREkey] = []
for app_Dict in app_parameterSet[HEREkey]:
app_Dict = app_Dict.__dict__
print('app_Dict',app_Dict)
here_name = app_Dict['key']
here_dict = {"key":here_name,"value":app_Dict['value']}
if 'notes' in app_Dict:
app_Dict_notes = app_Dict['notes'].__dict__
if 'isHidden' in app_Dict_notes and app_Dict_notes['isHidden']==True:
continue
if here_name in tapisInput:
here_dict["value"] = tapisInput[here_name]
parameterSet[HEREkey].append(here_dict)
parameterSet["schedulerOptions"] = [{"name": "TACC Allocation", "arg": f"-A {tapisInput['allocation']}"}]
job_description["fileInputs"] = fileInputs
job_description["parameterSet"] = parameterSet
if nmiss == 0:
# print('app_MetaData_keys',app_MetaData.keys())
for hereKey in tapisInput.keys():
# print('hereKey',hereKey)
if hereKey in app_MetaData.keys() or hereKey in ['moduleLoads']:
# print('yes')
job_description[hereKey] = tapisInput[hereKey]
# Archive location
if "archive_system" in tapisInput:
if tapisInput["archive_system"] == "MyData":
job_description["archiveSystemId"] = "designsafe.storage.default"
job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
elif tapisInput["archive_system"] == "Work" and tapisInput["execSystemId"] != "wma-exec-01":
job_description["archiveSystemId"] = tapisInput["execSystemId"]
job_description["archiveSystemDir"] = "HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}"
else:
job_description["archiveSystemId"] = "designsafe.storage.default"
job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
else:
job_description["archiveSystemId"] = "designsafe.storage.default"
job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
# --- Finalize ---
if nmiss > 0:
print("Please resubmit with all required input")
return -1
else:
# print("All Input is Complete")
# print('job_description',job_description)
return job_description
OpsUtils.get_tapis_job_description(t,tapisInput)
input directory URI: tapis://designsafe.storage.default/silvia/_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples
HEREkey appArgs
app_Dict {'arg': 'OpenSeesMP', 'name': 'mainProgram', 'description': None, 'inputMode': 'FIXED', 'notes':
isHidden: True}
app_Dict {'arg': None, 'name': 'Main Script', 'description': "The filename only of the OpenSees TCL script to execute. This file should reside in the Input Directory specified. To use with test input, use 'freeFieldEffective.tcl'", 'inputMode': 'REQUIRED', 'notes':
inputType: fileInput}
HEREkey envVariables
{'name': 'OpenSeesMPsubmit',
'execSystemId': 'stampede3',
'execSystemLogicalQueue': 'skx-dev',
'maxMinutes': 6,
'nodeCount': 1,
'coresPerNode': 48,
'appId': 'opensees-mp-s3',
'appVersion': 'latest',
'fileInputs': [{'name': 'Input Directory',
'sourceUrl': 'tapis://designsafe.storage.default/silvia/_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples'}],
'parameterSet': {'appArgs': [{'name': 'Main Script',
'arg': 'Ex1a_verymany.Canti2D.Push.mp.tcl'}],
'envVariables': [],
'schedulerOptions': [{'name': 'TACC Allocation', 'arg': '-A DS-HPC1'}]},
'archiveSystemId': 'stampede3',
'archiveSystemDir': 'HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}'}
GO!#
run_tapis_job.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/run_tapis_job.py
def run_tapis_job(
t,
tapisInput,
askConfirmJob: bool = True,
monitor_job: bool = True,
askConfirmMonitorRT: bool = True,
get_job_history: bool = False,
get_job_metadata: bool = False,
get_job_filedata: bool = False,
job_description = {}
):
"""
Run a complete Tapis job workflow in one call: build (or accept) a job description,
submit, and optionally monitor + collect results (status/metadata/history/file data).
Workflow
--------
1) If `job_description` is provided (non-empty dict), use it as-is; otherwise,
build one from `tapisInput` via `OpsUtils.get_tapis_job_description`.
2) Submit the job via `OpsUtils.submit_tapis_job` (with optional confirmation).
3) If `monitor_job=True`, call `OpsUtils.monitor_tapis_job` and then:
- Always fetch basic status via `OpsUtils.get_tapis_job_status`.
- If `get_job_metadata=True`, fetch detailed metadata via
`OpsUtils.get_tapis_job_metadata` (this overwrites the basic status dict).
- If `get_job_history=True`, fetch history via `OpsUtils.get_tapis_job_history_data`.
- If `get_job_filedata=True`, fetch output file listings/data via
`OpsUtils.get_tapis_job_all_files`.
Parameters
----------
t : tapipy.tapis.Tapis
Authenticated Tapis client.
tapisInput : dict
Fields used to construct the job description (e.g., name, appId, inputs) when
`job_description` is not provided.
askConfirmJob : bool, default True
Ask for confirmation before submitting.
monitor_job : bool, default True
Monitor the job in real time after submission. Required for the code paths that
fetch status/metadata/history/file data in this function.
askConfirmMonitorRT : bool, default True
Ask for confirmation before starting real-time monitoring.
get_job_history : bool, default False
If True and `monitor_job=True`, include job history in the return object.
get_job_metadata : bool, default False
If True and `monitor_job=True`, include detailed job metadata in the return object
(overwrites the basic status dict).
get_job_filedata : bool, default False
If True and `monitor_job=True`, include job output file listings/data.
job_description : dict, default {}
Optional pre-built Tapis job description. If provided (non-empty), it takes
precedence over building from `tapisInput`.
Returns
-------
dict
A dictionary seeded from `OpsUtils.submit_tapis_job(...)`, augmented with:
- 'job_description' : dict
- 'JobHistory' : dict or {}
- 'JobMetadata' : dict or {}
- 'JobFiledata' : dict or {}
- 'runJobStatus' : 'Finished' | 'Incomplete' | other
If `OpsUtils.get_tapis_job_description(...)` fails, returns:
{'runJobStatus': 'Incomplete'}
Notes
-----
- Status/metadata/history/file data are gathered only when `monitor_job=True`.
- If `OpsUtils.submit_tapis_job` yields a status other than 'Finished', the function
returns that object unchanged (except the early 'Incomplete' short-circuit).
Example
-------
result = run_tapis_job(
t, tapisInput,
monitor_job=True,
get_job_history=True,
get_job_metadata=True
)
print("Job UUID:", result.get("jobUuid"))
print("Status:", result.get("JobMetadata", {}).get("status"))
Author
------
Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com)
Date
----
2025-08-16
Version
-------
1.1
"""
from OpsUtils import OpsUtils
import ipywidgets as widgets
from IPython.display import display, clear_output
job_out = widgets.Output()
job_accordion = widgets.Accordion(children=[job_out])
job_accordion.selected_index = 0
display(job_accordion)
job_accordion.set_title(0, f'Job Run ....')
with job_out:
desc_out = widgets.Output()
desc_accordion = widgets.Accordion(children=[desc_out])
# desc_accordion.selected_index = 0
desc_accordion.set_title(0, f'Job Input')
display(desc_accordion)
with desc_out:
if not job_description:
print('Creating job_description')
job_description = OpsUtils.get_tapis_job_description(t, tapisInput)
print('job_description:',OpsUtils.display_tapis_results(job_description))
if job_description == -1:
return {"runJobStatus": "Incomplete"}
submit_out = widgets.Output()
submit_accordion = widgets.Accordion(children=[submit_out])
# submit_accordion.selected_index = 0
submit_accordion.set_title(0, f'Submit Returned Data')
with submit_out:
returnDict = OpsUtils.submit_tapis_job(t, job_description, askConfirmJob)
OpsUtils.display_tapis_results(returnDict)
display(submit_accordion)
if returnDict.get("runJobStatus") == "Submitted":
monitor_out = widgets.Output()
monitor_accordion = widgets.Accordion(children=[monitor_out])
monitor_accordion.selected_index = 0
monitor_accordion.set_title(0, f'Monitor Job')
display(monitor_accordion)
with monitor_out:
print("job_start_time:", returnDict.get("job_start_time"))
jobUuid = returnDict.get("jobUuid")
job_accordion.set_title(0, f'Job Run: {jobUuid} ...')
JobHistory = {}
JobMetadata = {}
JobFiledata = {}
JobStatusData = {}
if monitor_job and jobUuid:
with monitor_out:
OpsUtils.monitor_tapis_job(t, jobUuid, returnDict.get("job_start_time"), askConfirmMonitorRT)
# Always fetch basic status after monitoring
JobStatusData = OpsUtils.get_tapis_job_status(t, jobUuid, tapisInput,return_values=True)
job_accordion.set_title(0, f'Job : {jobUuid} {JobStatusData.status} {JobStatusData.condition}')
# Optional enrichments
if get_job_metadata:
JobMetadata = OpsUtils.get_tapis_job_metadata(t, jobUuid, tapisInput)
if get_job_history:
JobHistory = OpsUtils.get_tapis_job_history_data(t, jobUuid)
if get_job_filedata:
JobFiledata = OpsUtils.get_tapis_job_all_files(t, jobUuid)
# Augment return object
returnDict["job_description"] = job_description
returnDict["JobHistory"] = JobHistory
returnDict["JobMetadata"] = JobMetadata
returnDict["JobFiledata"] = JobFiledata
returnDict["JobStatusData"] = JobStatusData
returnDict["runJobStatus"] = "Finished"
job_accordion.set_title(0, f'Job Run: {jobUuid} done!')
return returnDict
# Here is a function that combines all the steps involved in submitting a job to Tapis.
jobReturns = OpsUtils.run_tapis_job(t,tapisInput)
jobUuid = jobReturns['jobUuid']
print('jobUuid:',jobUuid)
print('jobReturns',jobReturns.keys())
jobUuid: b280beaf-854e-4264-9b84-49f1d6085a12-007
jobReturns dict_keys(['jobUuid', 'submitted_job', 'job_start_time', 'runJobStatus', 'job_description', 'JobHistory', 'JobMetadata', 'JobFiledata', 'JobStatusData'])
ONCE THE JOB HAS COMPLETED….#
Get detailed Job Status, Metadata, History, Stage Durations, and Files List#
JobStatus = OpsUtils.get_tapis_job_status(t, jobUuid)
JobMetadata = OpsUtils.get_tapis_job_metadata(t, jobUuid)
JobHistory = OpsUtils.get_tapis_job_history_data(t, jobUuid,print_out=True)
AllFilesDict = OpsUtils.get_tapis_job_all_files(t, jobUuid, displayIt=10, target_dir=False)
Visualize Data#
this is the same process as what we had done when we presented the web-portal submit
get base path for output data from posted path:#
Different systems in DesignSafe have different root paths
basePath = JobMetadata['archiveSystemDir_out']
print('basePath',basePath)
basePath /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory
directory contents#
if os.path.exists(basePath):
print(os.listdir(basePath))
else:
print('path does not exist')
['Ex1a_verymany.Canti2D.Push.mp.tcl', '.ipynb_checkpoints', 'Ex1a.Canti2D.Push.tcl', 'Ex1a.Canti2D.Push.mpi4py.py', 'LcolList.out', 'Ex1a.Canti2D.Push.mp.tcl', 'Ex1a.Canti2D.Push.py', 'simpleSP.tcl', 'DataTCLmp', 'Ex1a.Canti2D.Push.mpi.py', 'Ex1a_many.Canti2D.Push.mp.tcl']
Plot some analysis results#
for any of the above analyses
import matplotlib.pyplot as plt
import numpy
#pick any case
print('basePath:',basePath)
dataDir = f'{basePath}/DataTCLmp'; # know this from my input script, or see directory contents
print('dataDir:',dataDir)
if os.path.exists(dataDir):
print('dataDir exists!!!')
else:
print('dataDir DOES NOT EXIST! -- it may just need time....you may just need to re-run this and the subsequent cells')
basePath: /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory
dataDir: /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp
dataDir exists!!!
List files in folder for a specific case, also using a wildcard#
You could use the following command, but it is not 100% reliable nor safe. it also doesn’t return the list. *os.system(f’ls {dataDir}/Lcol{Lcol}.out’)
Lcol = 100.0
import glob
# Build the wildcard pattern
pattern = os.path.join(dataDir, f"*Lcol{Lcol}.out")
# Get matching files as a Python list
files = glob.glob(pattern)
for f in files:
print(f)
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DBase_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/RBase_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DFree_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DCol_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/FCol_Lcol100.0.out
plt.close('all')
fname3o = f'DFree_Lcol{Lcol}.out'
fname3 = f'{dataDir}/{fname3o}'
print('fname3:',fname3)
dataDFree = numpy.loadtxt(fname3)
plt.subplot(211)
plt.title(f'Ex1a.Canti2D Lcol={Lcol}')
plt.grid(True)
plt.plot(dataDFree[:,1])
plt.xlabel('Step Number')
plt.ylabel('Free-Node Displacement')
plt.subplot(212)
plt.grid(True)
plt.plot(dataDFree[:,1],dataDFree[:,0])
plt.xlabel('Free-Node Disp.')
plt.ylabel('Pseudo-Time (~Force)')
plt.savefig(f'{dataDir}/Response.jpg')
plt.show()
print(f'plot saved to {dataDir}/Response_Lcol{Lcol}.jpg')
print('End of Run: Ex1a.Canti2D.Push.py.ipynb')
fname3: /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DFree_Lcol100.0.out
plot saved to /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/Response_Lcol100.0.jpg
End of Run: Ex1a.Canti2D.Push.py.ipynb
print('Done!')
Done!