get_tapis_job_description()#
get_tapis_job_description(t, tapisInput)
Generate a complete Tapis v3 job description from a concise input dict. The helper:
Resolves input baseURL
If storage_system_baseURL is missing, it is derived from storage_system:
“mydata” → tapis://designsafe.storage.default/
“community” → tapis://designsafe.storage.community
“published” → tapis://designsafe.storage.published
The final sourceUrl used in fileInputs is: “
/<input_folder>” .
Ensures a concrete appVersion
If appVersion is missing or equals “latest”, the function resolves a pinned version via OpsUtils.get_latest_app_version(t,
) .If it cannot resolve, the function returns -1 with a helpful message.
Validates inputs
OpenSees-Express (appId == “opensees-express”; defaults execSystemId to wma-exec-01 if not given): Required: [‘name’,’appId’,’appVersion’,’maxMinutes’,’archive_system’, ‘storage_system’,’input_folder’,’input_filename’]
HPC OpenSees apps (e.g., opensees-mp-s3, SP variants): Required: [‘name’,’appId’,’appVersion’,’execSystemId’,’execSystemLogicalQueue’, ‘nodeCount’,’coresPerNode’,’maxMinutes’,’allocation’,’archive_system’, ‘storage_system’,’input_folder’,’input_filename’]
Assembles the job description
Express (Tcl)
parameterSet.envVariables = [{“key”:”mainProgram”,”value”:”OpenSees”}, {“key”:”tclScript”,”value”: <input_filename>}]
fileInputs = [{“name”:”Input Directory”,”sourceUrl”:
/<input_folder>}]
HPC (MP/SP)
Base job attributes: system, queue, resources, app id/version.
parameterSet.appArgs = [{“name”:”Main Script”,”arg”: <input_filename>}]
parameterSet.schedulerOptions = [{“name”:”TACC Allocation”,”arg”: f”-A {allocation}”}]
fileInputs as above.
Archive location
Express
archive_system == ‘MyData’ → designsafe.storage.default:${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}
archive_system == ‘Temp’ → cloud.data:/tmp/${JobOwner}/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}
HPC (MP/SP)
archive_system == ‘MyData’ → same as above (MyData path)
archive_system == ‘Work’ → ${WORK}/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID} on the execution system
Parameters#
t (tapipy.tapis.Tapis) — Authenticated client (discovers username for MyData).
tapisInput (dict) — See required keys above; optionally include storage_system_baseURL to bypass auto-detection.
Returns#
dict — Ready-to-submit Tapis job description.
-1 — Validation failed, or baseURL/appVersion could not be determined.
Also prints missing keys (if any) and a confirmation message when inputs are complete.
Examples#
OpenSees-Express (MyData inputs; auto-resolve appVersion)#
tapisInput = {
"name": "opensees-express-smoke",
"appId": "opensees-express",
# "appVersion": "latest", # optional; can omit or set "latest" to auto-resolve
"maxMinutes": 10,
"archive_system": "MyData",
"storage_system": "MyData",
"input_folder": "projects/demo/input",
"input_filename": "model.tcl"
}
desc = get_tapis_job_description(t, tapisInput)
OpenSeesMP on Stampede3 (community inputs; pinned appVersion)#
tapisInput = {
"name": "opensees-mp-smoke",
"appId": "opensees-mp-s3",
"appVersion": "1.0.0",
"execSystemId": "designsafe.community.stampede3",
"execSystemLogicalQueue": "normal",
"nodeCount": 1,
"coresPerNode": 56,
"maxMinutes": 10,
"allocation": "TACC-XYZ123",
"archive_system": "Work",
"storage_system": "community",
"input_folder": "training/opensees/examples",
"input_filename": "model.tcl"
}
desc = get_tapis_job_description(t, tapisInput)
Notes#
fileInputs uses the tapis:// scheme; the platform will stage inputs from that system/path.
For MyData auto-detection, the function queries your Tapis userinfo to insert your username.
The function resolves and pins appVersion when you omit it or set “latest”.
Returns -1 if anything essential is missing; the console lists missing keys.
Author: Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com) Date: 2025-08-16 Version: 1.2
Files#
You can find these files in Community Data.
get_tapis_job_description.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/get_tapis_job_description.py
def get_tapis_job_description(t, tapisInput):
"""
Build a complete Tapis v3 job description dict from user-friendly inputs.
This function:
1) Resolves the Files base URL for inputs from `tapisInput["storage_system"]`
(supports MyData/community/published; can be overridden via
`tapisInput["storage_system_baseURL"]`).
2) Ensures a concrete application version:
- If `appVersion` is missing or equals "latest", it resolves a pinned
version via `OpsUtils.get_latest_app_version(t, )`.
3) Validates required fields (set differs for OpenSees-Express vs. HPC apps).
4) Constructs job attributes, fileInputs, parameterSet, and archive settings.
Auto baseURL selection
----------------------
If `storage_system_baseURL` is not provided:
- If `storage_system` contains "mydata": uses
`tapis://designsafe.storage.default/` where `` is taken
from `t.authenticator.get_userinfo().username`.
- If `storage_system` contains "community": uses
`tapis://designsafe.storage.community`
- If `storage_system` contains "published": uses
`tapis://designsafe.storage.published`
- Else: prints a message and returns -1.
App families and required keys
------------------------------
*OpenSees-Express* (`appId == "opensees-express"`)
Required: ['name','appId','appVersion','maxMinutes','archive_system',
'storage_system','input_folder','Main Script']
- Sets `parameterSet.envVariables` with:
mainProgram = OpenSees
tclScript =
- `fileInputs = [{"name": "Input Directory", "sourceUrl": "/"}]`
- Archive options:
archive_system == 'MyData' -> designsafe.storage.default under
${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}
archive_system == 'Temp' -> cloud.data:/tmp/${JobOwner}/tapis-jobs-archive/...
*HPC OpenSees apps* (e.g., opensees-mp-s3, opensees-sp-*, etc.)
Required: ['name','appId','appVersion','execSystemId','execSystemLogicalQueue',
'nodeCount','coresPerNode','maxMinutes','allocation','archive_system',
'storage_system','input_folder','Main Script']
- Sets base job attributes (system, queue, resources).
- `parameterSet.appArgs = [{"name": "Main Script", "arg":
}]`
- `parameterSet.schedulerOptions = [{"name": "TACC Allocation",
"arg": f"-A {allocation}"}]`
- `fileInputs` as above.
- Archive options:
archive_system == 'MyData' -> designsafe.storage.default under
${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}
archive_system == 'Work' -> exec system $WORK/tapis-jobs-archive/...
Returns
-------
dict
A fully formed Tapis job description ready for submission.
Returns -1 if validation fails or baseURL/appVersion cannot be determined.
Side effects
------------
- Prints a list of missing required keys (if any).
- Prints a short confirmation ("All Input is Complete") on success.
Author
------
Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com)
Date
----
2025-08-16
Version
-------
1.2
"""
# Silvia Mazzoni, 2025
from OpsUtils import OpsUtils # for get_latest_app_version
def checkRequirements(tapisInputIN, RequiredInputList):
nmiss = 0
for thisReq in RequiredInputList:
if thisReq not in tapisInputIN:
nmiss += 1
print(f"YOU need to define the following input: {thisReq}")
return nmiss
appId = tapisInput["appId"]
# print('appId',appId)
# --- Ensure a concrete appVersion (resolve 'latest' or missing) ---
if ("appVersion" not in tapisInput) or (str(tapisInput["appVersion"]).lower() == "latest"):
resolved = OpsUtils.get_latest_app_version(t, appId)
if not resolved or resolved in ("none", ""):
print(f"Unable to resolve latest version for appId='{appId}'. Please specify appVersion.")
return -1
tapisInput["appVersion"] = resolved
# --- Get App Schema ---
appMetaData = t.apps.getAppLatestVersion(appId=appId)
app_MetaData = appMetaData.__dict__
app_jobAttributes = app_MetaData['jobAttributes'].__dict__
app_parameterSet = app_jobAttributes['parameterSet'].__dict__
# --- Resolve storage baseURL if needed ---
if len(app_jobAttributes['fileInputs'])>0:
if "sourceUrl" not in tapisInput:
if "storage_system_baseURL" not in tapisInput:
if 'storage_system' not in tapisInput:
print('NEED more info for your input directory')
return -1
storage_system_lower = tapisInput.get("storage_system", "").lower()
tapisInput['storage_system_baseURL'] = OpsUtils.get_user_path_tapis_uri(t,storage_system_lower)
# print("tapisInput['storage_system_baseURL']",tapisInput['storage_system_baseURL'])
tapisInput["sourceUrl"] = f"{tapisInput['storage_system_baseURL']}"
if 'input_folder' in tapisInput:
tapisInput["sourceUrl"] = f"{tapisInput['sourceUrl']}/{tapisInput['input_folder']}"
print('input directory URI:',tapisInput["sourceUrl"])
inputKeys_App = ['id','version']
inputKeys_jobAttributes = ['execSystemId', 'execSystemLogicalQueue', 'archiveSystemId', 'archiveSystemDir', 'nodeCount', 'coresPerNode', 'memoryMB', 'maxMinutes']
# --- Defaults & branching ---
# If Express, default the exec system to the Express VM unless provided
if appId == "opensees-express" and "execSystemId" not in tapisInput:
tapisInput["execSystemId"] = "wma-exec-01"
if "execSystemId" not in tapisInput:
tapisInput["execSystemId"] = "stampede3"
job_description = {}
nmiss = 999
# Express (runs on wma-exec-01)
if appId == "opensees-express":
RequiredInputList = [
"name", "appId", "maxMinutes", "archive_system",
"storage_system", "input_folder", "Main Script"
]
nmiss = checkRequirements(tapisInput, RequiredInputList)
if nmiss == 0:
parameterSet = {}
job_description["name"] = tapisInput["name"]
job_description["maxMinutes"] = tapisInput["maxMinutes"]
job_description["appId"] = tapisInput["appId"]
if 'appVersion' in tapisInput.keys():
job_description["appVersion"] = tapisInput["appVersion"]
fileInputs = [{"name": "Input Directory", "sourceUrl": tapisInput["sourceUrl"]}]; # keep as is!
if not 'Main Program' in tapisInput.keys():
tapisInput["Main Program"] = 'OpenSees'
parameterSet["envVariables"] = [
{"key": "mainProgram", "value": tapisInput["Main Program"]},
{"key": "tclScript", "value": tapisInput["Main Script"]},
]
job_description["fileInputs"] = fileInputs
job_description["parameterSet"] = parameterSet
else:
# HPC (e.g., OpenSeesMP/SP on Stampede3)
RequiredInputList = [
"name","appId","execSystemId","execSystemLogicalQueue",
"nodeCount","coresPerNode","maxMinutes","allocation","archive_system",
]
nmiss = checkRequirements(tapisInput, RequiredInputList)
if nmiss == 0:
parameterSet = {}
job_description["name"] = tapisInput["name"]
job_description["execSystemId"] = tapisInput["execSystemId"]
job_description["execSystemLogicalQueue"] = tapisInput["execSystemLogicalQueue"]
job_description["maxMinutes"] = tapisInput["maxMinutes"]
job_description["nodeCount"] = tapisInput["nodeCount"]
job_description["coresPerNode"] = tapisInput["coresPerNode"]
job_description["appId"] = tapisInput["appId"]
if 'appVersion' in tapisInput.keys():
job_description["appVersion"] = tapisInput["appVersion"]
if len(app_jobAttributes['fileInputs'])>0:
fileInputs = [{"name": "Input Directory", "sourceUrl": tapisInput["sourceUrl"]}]
# if not 'Main Program' in tapisInput.keys():
# tapisInput["Main Program"] = 'OpenSeesMP'
# if not 'CommandLine Arguments' in tapisInput:
# tapisInput["CommandLine Arguments"] = ''
HEREkey = 'appArgs'
# print('HEREkey',HEREkey)
parameterSet[HEREkey] = []
for app_Dict in app_parameterSet[HEREkey]:
app_Dict = app_Dict.__dict__
# print('app_Dict',app_Dict)
here_name = app_Dict['name']
here_dict = {"name":here_name,"arg":app_Dict['arg']}
if 'notes' in app_Dict:
app_Dict_notes = app_Dict['notes'].__dict__
if 'isHidden' in app_Dict_notes and app_Dict_notes['isHidden']==True:
continue
if here_name in tapisInput:
here_dict["arg"] = tapisInput[here_name]
parameterSet[HEREkey].append(here_dict)
HEREkey = 'envVariables'
# print('HEREkey',HEREkey)
parameterSet[HEREkey] = []
for app_Dict in app_parameterSet[HEREkey]:
app_Dict = app_Dict.__dict__
# print('app_Dict',app_Dict)
here_name = app_Dict['key']
here_dict = {"key":here_name,"value":app_Dict['value']}
if 'notes' in app_Dict:
app_Dict_notes = app_Dict['notes'].__dict__
if 'isHidden' in app_Dict_notes and app_Dict_notes['isHidden']==True:
continue
if here_name in tapisInput:
here_dict["value"] = tapisInput[here_name]
parameterSet[HEREkey].append(here_dict)
parameterSet["schedulerOptions"] = [{"name": "TACC Allocation", "arg": f"-A {tapisInput['allocation']}"}]
if len(app_jobAttributes['fileInputs'])>0:
job_description["fileInputs"] = fileInputs
job_description["parameterSet"] = parameterSet
if nmiss == 0:
# print('app_MetaData_keys',app_MetaData.keys())
for hereKey in tapisInput.keys():
# print('hereKey',hereKey)
if hereKey in app_MetaData.keys() or hereKey in ['moduleLoads']:
# print('yes')
job_description[hereKey] = tapisInput[hereKey]
# Archive location
if "archive_system" in tapisInput:
if tapisInput["archive_system"] == "MyData":
job_description["archiveSystemId"] = "designsafe.storage.default"
job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
elif tapisInput["archive_system"] == "Work" and tapisInput["execSystemId"] != "wma-exec-01":
job_description["archiveSystemId"] = tapisInput["execSystemId"]
job_description["archiveSystemDir"] = "HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}"
else:
job_description["archiveSystemId"] = "designsafe.storage.default"
job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
else:
job_description["archiveSystemId"] = "designsafe.storage.default"
job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
# --- Finalize ---
if nmiss > 0:
print("Please resubmit with all required input")
return -1
else:
# print("All Input is Complete")
# print('job_description',job_description)
return job_description