Try on DesignSafe

Step-by-Step OpenSees App#

Step-by-Step Detailed Submit an OpenSees Tapis App

by Silvia Mazzoni, DesignSafe, 2025

This notebook serves as a template for submitting the following DesignSafe OpenSees Apps:

  • openSees-express

  • openSees-mp-s3

  • opensees-sp-s3

We are using previously-defined python function to streamline the process.

Connect to Tapis#

t=OpsUtils.connect_tapis()
 -- Checking Tapis token --
 Token loaded from file. Token is still valid!
 Token expires at: 2025-09-10T07:14:42+00:00
 Token expires in: 2:05:58.887582
-- LOG IN SUCCESSFUL! --
print('QUEUE INFO')
OpsUtils.get_system_queues(t,system_id="stampede3",display=True);
QUEUE INFO
name icx skx skx-dev pvc spr nvdimm h100
description icx (CPU, standard compute) skx (CPU, recommended standard compute) skx-dev (CPU, 2 hour max, 1 job max, for testing) pvc (GPU, Intel no CUDA) spr (CPU, high memory bandwidth) nvdimm (CPU, large memory) None
hpcQueueName icx skx skx-dev pvc spr nvdimm h100
maxJobs -1 -1 -1 -1 -1 -1 -1
maxJobsPerUser 20 60 3 4 36 3 4
minNodeCount 1 1 1 1 1 1 1
maxNodeCount 32 256 16 4 32 1 4
minCoresPerNode 1 1 1 1 1 1 1
maxCoresPerNode 80 48 48 96 112 80 96
minMemoryMB 1 1 1 1 1 1 1
maxMemoryMB 256000 192000 192000 128000 128000 4000000 1000000
minMinutes 1 1 1 1 1 1 1
maxMinutes 2880 2880 120 2880 2880 2880 2880

App User Input#

Initialize#

# initalize
tapisInput = {}
tapisInput["name"] = 'OpenSeesMPsubmit'

App Parameters#

tapisInput["appId"] = "opensees-mp-s3" # options: "opensees-express", "opensees-mp-s3", "opensees-2p-s3"
tapisInput["appVersion"] = "latest" # always use latest in this Notebook Template

Get app schema – input#

thisAppData_MP = OpsUtils.get_tapis_app_schema(t,tapisInput["appId"],version='latest')
OpsUtils.display_tapis_app_schema(thisAppData_MP)
########################################
########### TAPIS-APP SCHEMA ###########
########################################
######## appID: opensees-mp-s3
######## version: latest
########################################
{
  sharedAppCtx: "wma_prtl"
  isPublic: True
  tenant: "designsafe"
  id: "opensees-mp-s3"
  version: "latest"
  description: "Runs all the processors in parallel. Requires understanding of parallel processing and the capabilities to write parallel scripts."
  owner: "wma_prtl"
  enabled: True
  versionEnabled: True
  locked: False
  runtime: "ZIP"
  runtimeVersion: None
  runtimeOptions: None
  containerImage: "tapis://cloud.data/corral/tacc/aci/CEP/applications/v3/opensees/latest/OpenSees/opensees.zip"
  jobType: "BATCH"
  maxJobs: 2147483647
  maxJobsPerUser: 2147483647
  strictFileInputs: True
  uuid: "1410a584-0c5e-4e47-b3b0-3a7bea0e1187"
  deleted: False
  created: "2025-02-20T18:01:49.005183Z"
  updated: "2025-08-28T20:17:39.426067Z"
  sharedWithUsers: []
  tags: ["portalName: DesignSafe", "portalName: CEP"]
  jobAttributes: {
    description: None
    dynamicExecSystem: False
    execSystemConstraints: None
    execSystemId: "stampede3"
    execSystemExecDir: "${JobWorkingDir}"
    execSystemInputDir: "${JobWorkingDir}"
    execSystemOutputDir: "${JobWorkingDir}"
    dtnSystemInputDir: "!tapis_not_set"
    dtnSystemOutputDir: "!tapis_not_set"
    execSystemLogicalQueue: "skx"
    archiveSystemId: "stampede3"
    archiveSystemDir: "HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}"
    archiveOnAppError: True
    isMpi: False
    mpiCmd: None
    cmdPrefix: None
    nodeCount: 2
    coresPerNode: 48
    memoryMB: 192000
    maxMinutes: 120
    fileInputs: [
      {
        name: "Input Directory"
        description: "Input directory that includes the tcl script as well as any other required files. Example input is in tapis://designsafe.storage.community/app_examples/opensees/OpenSeesMP"
        inputMode: "REQUIRED"
        autoMountLocal: True
        envKey: "inputDirectory"
        sourceUrl: None
        targetPath: "inputDirectory"
        notes: {
          selectionMode: "directory"
        }
      }
    ]
    fileInputArrays: []
    subscriptions: []
    tags: []
    parameterSet: {
      appArgs: [
        {
          arg: "OpenSeesMP"
          name: "mainProgram"
          description: None
          inputMode: "FIXED"
          notes: {
            isHidden: True
          }
        }
        {
          arg: None
          name: "Main Script"
          description: "The filename only of the OpenSees TCL script to execute. This file should reside in the Input Directory specified. To use with test input, use 'freeFieldEffective.tcl'"
          inputMode: "REQUIRED"
          notes: {
            inputType: "fileInput"
          }
        }
      ]
      containerArgs: []
      schedulerOptions: [
        {
          arg: "--tapis-profile OpenSees_default"
          name: "OpenSees TACC Scheduler Profile"
          description: "Scheduler profile for the default version of OpenSees"
          inputMode: "FIXED"
          notes: {
            isHidden: True
          }
        }
        {
          arg: None
          name: "TACC Reservation"
          description: "Reservation input string"
          inputMode: "INCLUDE_ON_DEMAND"
          notes: {
            isHidden: True
          }
        }
      ]
      envVariables: []
      archiveFilter: {
        includeLaunchFiles: True
        includes: []
        excludes: []
      }
      logConfig: {
        stdoutFilename: ""
        stderrFilename: ""
      }
    }
  }
  notes: {
    icon: "OpenSees"
    label: "OpenSeesMP"
    helpUrl: "https://www.designsafe-ci.org/user-guide/tools/simulation/#opensees-user-guide"
    category: "Simulation"
    isInteractive: False
    showReservation: True
    hideNodeCountAndCoresPerNode: False
  }
}
########################################

TACC-Job Parameters#

https://docs.tacc.utexas.edu/hpc/stampede3/

tapisInput["maxMinutes"] = 6

# OpenSees-mp-s3 and OpenSees-xp-s3 only:
if tapisInput["appId"] in ["opensees-mp-s3","opensees-sp-s3"]:
    tapisInput["execSystemId"] = "stampede3" # the app runs on stampede only
    tapisInput["execSystemLogicalQueue"] = "skx-dev" # "skx", "skx-dev"... for info: use the command 'OpsUtils.get_system_queues(t,system_id="stampede3",display=True);'
    tapisInput["nodeCount"] = 1 # limits set by which compute nodes you use
    tapisInput["coresPerNode"] = 48 # limits set by which compute nodes you use
    tapisInput["allocation"] = "DS-HPC1"

INPUT Files Parameters#

Storage SystemTapis & Tapis Base Path in URI format#

this is the very first part of your path, just above your home folder.

Options:

  • CommunityData

  • Published

The following options are user or project-dependent, and require unique path input.

The following option requires additional user-dependent input:

  • MyData

The following option requires additional user- and system- dependent input:

  • Work

The following option requires additional project-dependent input:

  • MyProjects

You can obtain a dependent tapis-URI path by performing the first step of submitting an OpenSeesMP job at the app portal: https://www.designsafe-ci.org/workspace/opensees-mp-s3

tapisInput['storage_system'] = 'MyData' # options: Community,MyData,Published,MyProjects,Work/stampede3,Work/frontera,Work/ls6

We will use a utility function to get the Tapis-compatible URI:#

get_uer_path_tapis_uri(): Discover and cache user-specific Tapis base URIs for DesignSafe storage systems, then return either the entire dictionary or a single base URI.

get_user_path_tapis_uri.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/get_user_path_tapis_uri.py
from __future__ import annotations

def get_user_path_tapis_uri(
    t,
    file_system: str = "none",                  # "none" | "mydata" | "community" | "work/stampede3","work/ls6","work/frontera"

    paths_file_path: str = "~/MyData/.tapis_user_paths.json",
    force_refresh: bool = False,
) -> Union[str, Dict]:
    """
    Discover and cache user-specific Tapis base URIs for DesignSafe storage systems,
    then return either the entire dictionary or a single base URI.

    Author
    -------
    Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com)

    Parameters
    ----------
    t : Tapipy client
        An authenticated Tapipy v3 client.
    file_system : {"none","mydata","community","work"}, optional
        Which base to return. Use "none" to return the full dictionary.
        When file_system="work", which HPC system's Work base to return.
    paths_file_path : str, optional
        Location (on MyData or local home) where the JSON cache is stored.
        Default: "~/MyData/.tapis_user_paths.json".
    force_refresh : bool, optional
        If True, (re)discover all bases and overwrite the cache file.

    Returns
    -------
    Union[str, dict]
        - If file_system == "none": the full dict of bases (including subdict for "work").
        - Else: a single base URI string for the requested system.

    Notes
    -----
    - Stored values are full Tapis URIs (start with "tapis://" and end with "/").
    - Keys are lowercase: "mydata", "community", "work". For "work", values are a dict
      keyed by HPC system ("stampede3", "ls6", "frontera").
    """
    import json
    import os
    from pathlib import Path
    from typing import Dict, Optional, Union, Iterable, Sequence
    from OpsUtils import OpsUtils

    # ----------------------------
    # normalize & validate inputs
    # ----------------------------
    fs = (file_system or "none").strip().lower()
    # print('fs',fs)

    # Handle loose input like "CommunityData"
    if "community" in fs:
        fs = "community"

    valid_file_systems = ["mydata", "community",  "published","none"]
    valid_work_systems = ["stampede3", "ls6", "frontera", "none"]
    for thisW in valid_work_systems:
        valid_file_systems.append(f'work/{thisW}')
    # print('valid_file_systems',valid_file_systems)

    if fs not in valid_file_systems:
        raise ValueError(f"file_system='{file_system}' not in {sorted(valid_file_systems)}")

    cache_path = Path(os.path.expanduser(paths_file_path))
    # print('cache_path',cache_path)

    # ----------------------------
    # helper: normalize URIs
    # ----------------------------
    def _with_scheme(u: str) -> str:
        u = u.strip()
        if not u:
            return u
        if not u.startswith("tapis://"):
            u = "tapis://" + u.lstrip("/")
        # if not u.endswith("/"):
        #     u += "/"
        u = u.rstrip("/")
        return u

    # ----------------------------
    # try reading existing cache
    # ----------------------------
    paths: Dict = {}
    if cache_path.exists() and not force_refresh:
        try:
            with cache_path.open("r", encoding="utf-8") as f:
                paths = json.load(f)
                print(f'found paths file: {cache_path}')
        except Exception:
            paths = {}
            
    # quick return if cache satisfies the request
    def _maybe_return_from_cache() -> Optional[Union[str, Dict]]:
        if not paths:
            return None
        if fs == "none":
            return paths
        if fs in {"mydata", "community", "published"}:
            val = paths.get(fs)
            if isinstance(val, str) and val:
                return _with_scheme(val)
        if "work" in fs:
            val = paths.get(fs)
            if isinstance(val, str) and val:
                return _with_scheme(val)
        return None

    cached = _maybe_return_from_cache()
    if cached is not None:
        return cached

    # ----------------------------
    # (re)discover all bases
    # ----------------------------
    try:
        username = OpsUtils.get_tapis_username(t)
    except Exception as e:
        raise RuntimeError(f"Could not determine Tapis username: {e}")

    discovered: Dict = {
        "mydata": _with_scheme(f"designsafe.storage.default/{username}"),
        "community": _with_scheme("designsafe.storage.community"),
        "published": _with_scheme("designsafe.storage.published"),
    }

    # Discover Work bases using the new inner helper
    for system in ("stampede3", "ls6", "frontera"):
        try:
            base_uri = OpsUtils.get_user_work_tapis_uri(t, system_id=system)
            discovered[f'work/{system}'] = _with_scheme(base_uri)  # idempotent
        except Exception:
            # Skip systems we can't resolve; they can be refreshed later
            continue

    # Persist to cache

    cache_path.parent.mkdir(parents=True, exist_ok=True)
    with cache_path.open("w", encoding="utf-8") as f:
        json.dump(discovered, f, indent=2)
        print(f'saved data to {cache_path}')

    # Return per request
    if fs == "none":
        return discovered
    else:
        return discovered[fs]

    return discovered
tapisInput['storage_system_baseURL'] = OpsUtils.get_user_path_tapis_uri(t,tapisInput['storage_system'])

print('storage_system_baseURL:',tapisInput['storage_system_baseURL'])
found paths file: /home/jupyter/MyData/.tapis_user_paths.json
storage_system_baseURL: tapis://designsafe.storage.default/silvia

File Paths#

The input_folder is the directory of your input file.

  • DO NOT INCLUDE the storage system, such as MyData, etc.

  • Start from the first folder within your storage-system folder.

The Main Script is the full name of the file that will be submitted to OpenSees. Yes you need to include the extension.

tapisInput['input_folder'] = '_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples'
tapisInput['Main Script'] = 'Ex1a_verymany.Canti2D.Push.mp.tcl'

OUTPUT-Files Parameters#

Where would you like your files to be archived once the job is finished?

Options: MyData and Work

In both cases you will find them in the tapis-jobs-archive folder in either MyData or Work/stampede3.

Remember, you cannot write to Projects nor CommunityData.

tapisInput['archive_system']='Work' # Options: MyData or Work

Review User Input#

display(tapisInput)
{'name': 'OpenSeesMPsubmit',
 'appId': 'opensees-mp-s3',
 'appVersion': 'latest',
 'maxMinutes': 6,
 'execSystemId': 'stampede3',
 'execSystemLogicalQueue': 'skx-dev',
 'nodeCount': 1,
 'coresPerNode': 48,
 'allocation': 'DS-HPC1',
 'storage_system': 'MyData',
 'storage_system_baseURL': 'tapis://designsafe.storage.default/silvia',
 'input_folder': '_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples',
 'Main Script': 'Ex1a_verymany.Canti2D.Push.mp.tcl',
 'archive_system': 'Work'}

Get interpreted Tapis-App Input (Optional)#

this is what will be sent as input (just informational here, and good to check input)

get_tapis_job_description.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/get_tapis_job_description.py
def get_tapis_job_description(t, tapisInput):
    """
    Build a complete Tapis v3 job description dict from user-friendly inputs.

    This function:
      1) Resolves the Files base URL for inputs from `tapisInput["storage_system"]`
         (supports MyData/community/published; can be overridden via
         `tapisInput["storage_system_baseURL"]`).
      2) Ensures a concrete application version:
         - If `appVersion` is missing or equals "latest", it resolves a pinned
           version via `OpsUtils.get_latest_app_version(t, )`.
      3) Validates required fields (set differs for OpenSees-Express vs. HPC apps).
      4) Constructs job attributes, fileInputs, parameterSet, and archive settings.

    Auto baseURL selection
    ----------------------
    If `storage_system_baseURL` is not provided:
      - If `storage_system` contains "mydata": uses
        `tapis://designsafe.storage.default/` where `` is taken
        from `t.authenticator.get_userinfo().username`.
      - If `storage_system` contains "community": uses
        `tapis://designsafe.storage.community`
      - If `storage_system` contains "published": uses
        `tapis://designsafe.storage.published`
      - Else: prints a message and returns -1.

    App families and required keys
    ------------------------------
    *OpenSees-Express* (`appId == "opensees-express"`)
      Required: ['name','appId','appVersion','maxMinutes','archive_system',
                 'storage_system','input_folder','Main Script']
      - Sets `parameterSet.envVariables` with:
          mainProgram = OpenSees
          tclScript   = 
- `fileInputs = [{"name": "Input Directory", "sourceUrl": "/"}]` - Archive options: archive_system == 'MyData' -> designsafe.storage.default under ${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID} archive_system == 'Temp' -> cloud.data:/tmp/${JobOwner}/tapis-jobs-archive/... *HPC OpenSees apps* (e.g., opensees-mp-s3, opensees-sp-*, etc.) Required: ['name','appId','appVersion','execSystemId','execSystemLogicalQueue', 'nodeCount','coresPerNode','maxMinutes','allocation','archive_system', 'storage_system','input_folder','Main Script'] - Sets base job attributes (system, queue, resources). - `parameterSet.appArgs = [{"name": "Main Script", "arg":
}]` - `parameterSet.schedulerOptions = [{"name": "TACC Allocation", "arg": f"-A {allocation}"}]` - `fileInputs` as above. - Archive options: archive_system == 'MyData' -> designsafe.storage.default under ${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID} archive_system == 'Work' -> exec system $WORK/tapis-jobs-archive/... Returns ------- dict A fully formed Tapis job description ready for submission. Returns -1 if validation fails or baseURL/appVersion cannot be determined. Side effects ------------ - Prints a list of missing required keys (if any). - Prints a short confirmation ("All Input is Complete") on success. Author ------ Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com) Date ---- 2025-08-16 Version ------- 1.2 """ # Silvia Mazzoni, 2025 from OpsUtils import OpsUtils # for get_latest_app_version def checkRequirements(tapisInputIN, RequiredInputList): nmiss = 0 for thisReq in RequiredInputList: if thisReq not in tapisInputIN: nmiss += 1 print(f"YOU need to define the following input: {thisReq}") return nmiss # --- Resolve storage baseURL if needed --- if "sourceUrl" not in tapisInput: if "storage_system_baseURL" not in tapisInput: if 'storage_system' not in tapisInput: print('NEED more info for your input directory') return -1 storage_system_lower = tapisInput.get("storage_system", "").lower() tapisInput['storage_system_baseURL'] = OpsUtils.get_user_path_tapis_uri(t,storage_system_lower) # print("tapisInput['storage_system_baseURL']",tapisInput['storage_system_baseURL']) tapisInput["sourceUrl"] = f"{tapisInput['storage_system_baseURL']}" if 'input_folder' in tapisInput: tapisInput["sourceUrl"] = f"{tapisInput['sourceUrl']}/{tapisInput['input_folder']}" print('input directory URI:',tapisInput["sourceUrl"]) appId = tapisInput["appId"] # print('appId',appId) # --- Ensure a concrete appVersion (resolve 'latest' or missing) --- if ("appVersion" not in tapisInput) or (str(tapisInput["appVersion"]).lower() == "latest"): resolved = OpsUtils.get_latest_app_version(t, appId) if not resolved or resolved in ("none", ""): print(f"Unable to resolve latest version for appId='{appId}'. Please specify appVersion.") return -1 tapisInput["appVersion"] = resolved # --- Get App Schema --- appMetaData = t.apps.getAppLatestVersion(appId=appId) app_MetaData = appMetaData.__dict__ app_jobAttributes = app_MetaData['jobAttributes'].__dict__ app_parameterSet = app_jobAttributes['parameterSet'].__dict__ inputKeys_App = ['id','version'] inputKeys_jobAttributes = ['execSystemId', 'execSystemLogicalQueue', 'archiveSystemId', 'archiveSystemDir', 'nodeCount', 'coresPerNode', 'memoryMB', 'maxMinutes'] # --- Defaults & branching --- # If Express, default the exec system to the Express VM unless provided if appId == "opensees-express" and "execSystemId" not in tapisInput: tapisInput["execSystemId"] = "wma-exec-01" if "execSystemId" not in tapisInput: tapisInput["execSystemId"] = "stampede3" job_description = {} nmiss = 999 # Express (runs on wma-exec-01) if appId == "opensees-express": RequiredInputList = [ "name", "appId", "maxMinutes", "archive_system", "storage_system", "input_folder", "Main Script" ] nmiss = checkRequirements(tapisInput, RequiredInputList) if nmiss == 0: parameterSet = {} job_description["name"] = tapisInput["name"] job_description["maxMinutes"] = tapisInput["maxMinutes"] job_description["appId"] = tapisInput["appId"] if 'appVersion' in tapisInput.keys(): job_description["appVersion"] = tapisInput["appVersion"] fileInputs = [{"name": "Input Directory", "sourceUrl": tapisInput["sourceUrl"]}]; # keep as is! if not 'Main Program' in tapisInput.keys(): tapisInput["Main Program"] = 'OpenSees' parameterSet["envVariables"] = [ {"key": "mainProgram", "value": tapisInput["Main Program"]}, {"key": "tclScript", "value": tapisInput["Main Script"]}, ] job_description["fileInputs"] = fileInputs job_description["parameterSet"] = parameterSet else: # HPC (e.g., OpenSeesMP/SP on Stampede3) RequiredInputList = [ "name","appId","execSystemId","execSystemLogicalQueue", "nodeCount","coresPerNode","maxMinutes","allocation","archive_system", ] nmiss = checkRequirements(tapisInput, RequiredInputList) if nmiss == 0: parameterSet = {} job_description["name"] = tapisInput["name"] job_description["execSystemId"] = tapisInput["execSystemId"] job_description["execSystemLogicalQueue"] = tapisInput["execSystemLogicalQueue"] job_description["maxMinutes"] = tapisInput["maxMinutes"] job_description["nodeCount"] = tapisInput["nodeCount"] job_description["coresPerNode"] = tapisInput["coresPerNode"] job_description["appId"] = tapisInput["appId"] if 'appVersion' in tapisInput.keys(): job_description["appVersion"] = tapisInput["appVersion"] fileInputs = [{"name": "Input Directory", "sourceUrl": tapisInput["sourceUrl"]}] # if not 'Main Program' in tapisInput.keys(): # tapisInput["Main Program"] = 'OpenSeesMP' # if not 'CommandLine Arguments' in tapisInput: # tapisInput["CommandLine Arguments"] = '' HEREkey = 'appArgs' print('HEREkey',HEREkey) parameterSet[HEREkey] = [] for app_Dict in app_parameterSet[HEREkey]: app_Dict = app_Dict.__dict__ print('app_Dict',app_Dict) here_name = app_Dict['name'] here_dict = {"name":here_name,"arg":app_Dict['arg']} if 'notes' in app_Dict: app_Dict_notes = app_Dict['notes'].__dict__ if 'isHidden' in app_Dict_notes and app_Dict_notes['isHidden']==True: continue if here_name in tapisInput: here_dict["arg"] = tapisInput[here_name] parameterSet[HEREkey].append(here_dict) HEREkey = 'envVariables' print('HEREkey',HEREkey) parameterSet[HEREkey] = [] for app_Dict in app_parameterSet[HEREkey]: app_Dict = app_Dict.__dict__ print('app_Dict',app_Dict) here_name = app_Dict['key'] here_dict = {"key":here_name,"value":app_Dict['value']} if 'notes' in app_Dict: app_Dict_notes = app_Dict['notes'].__dict__ if 'isHidden' in app_Dict_notes and app_Dict_notes['isHidden']==True: continue if here_name in tapisInput: here_dict["value"] = tapisInput[here_name] parameterSet[HEREkey].append(here_dict) parameterSet["schedulerOptions"] = [{"name": "TACC Allocation", "arg": f"-A {tapisInput['allocation']}"}] job_description["fileInputs"] = fileInputs job_description["parameterSet"] = parameterSet if nmiss == 0: # print('app_MetaData_keys',app_MetaData.keys()) for hereKey in tapisInput.keys(): # print('hereKey',hereKey) if hereKey in app_MetaData.keys() or hereKey in ['moduleLoads']: # print('yes') job_description[hereKey] = tapisInput[hereKey] # Archive location if "archive_system" in tapisInput: if tapisInput["archive_system"] == "MyData": job_description["archiveSystemId"] = "designsafe.storage.default" job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}" elif tapisInput["archive_system"] == "Work" and tapisInput["execSystemId"] != "wma-exec-01": job_description["archiveSystemId"] = tapisInput["execSystemId"] job_description["archiveSystemDir"] = "HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}" else: job_description["archiveSystemId"] = "designsafe.storage.default" job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}" else: job_description["archiveSystemId"] = "designsafe.storage.default" job_description["archiveSystemDir"] = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}" # --- Finalize --- if nmiss > 0: print("Please resubmit with all required input") return -1 else: # print("All Input is Complete") # print('job_description',job_description) return job_description
OpsUtils.get_tapis_job_description(t,tapisInput)
input directory URI: tapis://designsafe.storage.default/silvia/_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples
HEREkey appArgs
app_Dict {'arg': 'OpenSeesMP', 'name': 'mainProgram', 'description': None, 'inputMode': 'FIXED', 'notes': 
isHidden: True}
app_Dict {'arg': None, 'name': 'Main Script', 'description': "The filename only of the OpenSees TCL script to execute. This file should reside in the Input Directory specified. To use with test input, use 'freeFieldEffective.tcl'", 'inputMode': 'REQUIRED', 'notes': 
inputType: fileInput}
HEREkey envVariables
{'name': 'OpenSeesMPsubmit',
 'execSystemId': 'stampede3',
 'execSystemLogicalQueue': 'skx-dev',
 'maxMinutes': 6,
 'nodeCount': 1,
 'coresPerNode': 48,
 'appId': 'opensees-mp-s3',
 'appVersion': 'latest',
 'fileInputs': [{'name': 'Input Directory',
   'sourceUrl': 'tapis://designsafe.storage.default/silvia/_ToCommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/Examples_OpenSees/BasicExamples'}],
 'parameterSet': {'appArgs': [{'name': 'Main Script',
    'arg': 'Ex1a_verymany.Canti2D.Push.mp.tcl'}],
  'envVariables': [],
  'schedulerOptions': [{'name': 'TACC Allocation', 'arg': '-A DS-HPC1'}]},
 'archiveSystemId': 'stampede3',
 'archiveSystemDir': 'HOST_EVAL($WORK)/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}'}

GO!#

run_tapis_job.py
# /home/jupyter/CommunityData/OpenSees/TrainingMaterial/training-OpenSees-on-DesignSafe/OpsUtils/OpsUtils/Tapis/run_tapis_job.py
def run_tapis_job(
    t,
    tapisInput,
    askConfirmJob: bool = True,
    monitor_job: bool = True,
    askConfirmMonitorRT: bool = True,
    get_job_history: bool = False,
    get_job_metadata: bool = False,
    get_job_filedata: bool = False,
    job_description = {}
):
    """
    Run a complete Tapis job workflow in one call: build (or accept) a job description,
    submit, and optionally monitor + collect results (status/metadata/history/file data).

    Workflow
    --------
    1) If `job_description` is provided (non-empty dict), use it as-is; otherwise,
       build one from `tapisInput` via `OpsUtils.get_tapis_job_description`.
    2) Submit the job via `OpsUtils.submit_tapis_job` (with optional confirmation).
    3) If `monitor_job=True`, call `OpsUtils.monitor_tapis_job` and then:
       - Always fetch basic status via `OpsUtils.get_tapis_job_status`.
       - If `get_job_metadata=True`, fetch detailed metadata via
         `OpsUtils.get_tapis_job_metadata` (this overwrites the basic status dict).
       - If `get_job_history=True`, fetch history via `OpsUtils.get_tapis_job_history_data`.
       - If `get_job_filedata=True`, fetch output file listings/data via
         `OpsUtils.get_tapis_job_all_files`.

    Parameters
    ----------
    t : tapipy.tapis.Tapis
        Authenticated Tapis client.
    tapisInput : dict
        Fields used to construct the job description (e.g., name, appId, inputs) when
        `job_description` is not provided.
    askConfirmJob : bool, default True
        Ask for confirmation before submitting.
    monitor_job : bool, default True
        Monitor the job in real time after submission. Required for the code paths that
        fetch status/metadata/history/file data in this function.
    askConfirmMonitorRT : bool, default True
        Ask for confirmation before starting real-time monitoring.
    get_job_history : bool, default False
        If True and `monitor_job=True`, include job history in the return object.
    get_job_metadata : bool, default False
        If True and `monitor_job=True`, include detailed job metadata in the return object
        (overwrites the basic status dict).
    get_job_filedata : bool, default False
        If True and `monitor_job=True`, include job output file listings/data.
    job_description : dict, default {}
        Optional pre-built Tapis job description. If provided (non-empty), it takes
        precedence over building from `tapisInput`.

    Returns
    -------
    dict
        A dictionary seeded from `OpsUtils.submit_tapis_job(...)`, augmented with:
          - 'job_description' : dict
          - 'JobHistory'      : dict or {}
          - 'JobMetadata'     : dict or {}
          - 'JobFiledata'     : dict or {}
          - 'runJobStatus'    : 'Finished' | 'Incomplete' | other
        If `OpsUtils.get_tapis_job_description(...)` fails, returns:
          {'runJobStatus': 'Incomplete'}

    Notes
    -----
    - Status/metadata/history/file data are gathered only when `monitor_job=True`.
    - If `OpsUtils.submit_tapis_job` yields a status other than 'Finished', the function
      returns that object unchanged (except the early 'Incomplete' short-circuit).

    Example
    -------
    result = run_tapis_job(
        t, tapisInput,
        monitor_job=True,
        get_job_history=True,
        get_job_metadata=True
    )
    print("Job UUID:", result.get("jobUuid"))
    print("Status:",   result.get("JobMetadata", {}).get("status"))

    Author
    ------
    Silvia Mazzoni, DesignSafe (silviamazzoni@yahoo.com)

    Date
    ----
    2025-08-16

    Version
    -------
    1.1
    """

    from OpsUtils import OpsUtils
    import ipywidgets as widgets
    from IPython.display import display, clear_output
    job_out = widgets.Output()
    job_accordion = widgets.Accordion(children=[job_out])
    job_accordion.selected_index = 0
    display(job_accordion)
    job_accordion.set_title(0, f'Job Run ....')

    with job_out:
        desc_out = widgets.Output()
        desc_accordion = widgets.Accordion(children=[desc_out])
        # desc_accordion.selected_index = 0
        desc_accordion.set_title(0, f'Job Input')
        
        display(desc_accordion)
        with desc_out:
            if not job_description:
                print('Creating job_description')
                job_description = OpsUtils.get_tapis_job_description(t, tapisInput)
            print('job_description:',OpsUtils.display_tapis_results(job_description))
        
        if job_description == -1:
            return {"runJobStatus": "Incomplete"}
    
        submit_out = widgets.Output()
        submit_accordion = widgets.Accordion(children=[submit_out])
        # submit_accordion.selected_index = 0
        
        submit_accordion.set_title(0, f'Submit Returned Data')

        with submit_out:
            returnDict = OpsUtils.submit_tapis_job(t, job_description, askConfirmJob)

        
            OpsUtils.display_tapis_results(returnDict)
        
        display(submit_accordion)
        
        if returnDict.get("runJobStatus") == "Submitted":

            monitor_out = widgets.Output()
            monitor_accordion = widgets.Accordion(children=[monitor_out])
            monitor_accordion.selected_index = 0
            monitor_accordion.set_title(0, f'Monitor Job')
            display(monitor_accordion)
            with monitor_out:
            
                print("job_start_time:", returnDict.get("job_start_time"))
            jobUuid = returnDict.get("jobUuid")
            job_accordion.set_title(0, f'Job Run:  {jobUuid} ...')
    
            JobHistory = {}
            JobMetadata = {}
            JobFiledata = {}
            JobStatusData = {}
    
            if monitor_job and jobUuid:
                with monitor_out:
                    OpsUtils.monitor_tapis_job(t, jobUuid, returnDict.get("job_start_time"), askConfirmMonitorRT)
    
                # Always fetch basic status after monitoring
                JobStatusData = OpsUtils.get_tapis_job_status(t, jobUuid, tapisInput,return_values=True)
                job_accordion.set_title(0, f'Job :  {jobUuid} {JobStatusData.status} {JobStatusData.condition}')
    
                # Optional enrichments
                if get_job_metadata:
                    JobMetadata = OpsUtils.get_tapis_job_metadata(t, jobUuid, tapisInput)
                if get_job_history:
                    JobHistory = OpsUtils.get_tapis_job_history_data(t, jobUuid)
                if get_job_filedata:
                    JobFiledata = OpsUtils.get_tapis_job_all_files(t, jobUuid)
    
            # Augment return object
            returnDict["job_description"] = job_description
            returnDict["JobHistory"] = JobHistory
            returnDict["JobMetadata"] = JobMetadata
            returnDict["JobFiledata"] = JobFiledata
            returnDict["JobStatusData"] = JobStatusData
            returnDict["runJobStatus"] = "Finished"

    job_accordion.set_title(0, f'Job Run:  {jobUuid} done!')

    return returnDict
# Here is a function that combines all the steps involved in submitting a job to Tapis.
jobReturns = OpsUtils.run_tapis_job(t,tapisInput)
jobUuid = jobReturns['jobUuid']
print('jobUuid:',jobUuid)
print('jobReturns',jobReturns.keys())
jobUuid: b280beaf-854e-4264-9b84-49f1d6085a12-007
jobReturns dict_keys(['jobUuid', 'submitted_job', 'job_start_time', 'runJobStatus', 'job_description', 'JobHistory', 'JobMetadata', 'JobFiledata', 'JobStatusData'])

ONCE THE JOB HAS COMPLETED….#


Get detailed Job Status, Metadata, History, Stage Durations, and Files List#

JobStatus = OpsUtils.get_tapis_job_status(t, jobUuid)
JobMetadata = OpsUtils.get_tapis_job_metadata(t, jobUuid)
JobHistory = OpsUtils.get_tapis_job_history_data(t, jobUuid,print_out=True)
AllFilesDict = OpsUtils.get_tapis_job_all_files(t, jobUuid, displayIt=10, target_dir=False)

Visualize Data#

this is the same process as what we had done when we presented the web-portal submit


get base path for output data from posted path:#

Different systems in DesignSafe have different root paths

basePath = JobMetadata['archiveSystemDir_out']
print('basePath',basePath)
basePath /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory

directory contents#

if os.path.exists(basePath):
    print(os.listdir(basePath))
else:
    print('path does not exist')
['Ex1a_verymany.Canti2D.Push.mp.tcl', '.ipynb_checkpoints', 'Ex1a.Canti2D.Push.tcl', 'Ex1a.Canti2D.Push.mpi4py.py', 'LcolList.out', 'Ex1a.Canti2D.Push.mp.tcl', 'Ex1a.Canti2D.Push.py', 'simpleSP.tcl', 'DataTCLmp', 'Ex1a.Canti2D.Push.mpi.py', 'Ex1a_many.Canti2D.Push.mp.tcl']

Plot some analysis results#

for any of the above analyses

import matplotlib.pyplot as plt
import numpy
#pick any case
print('basePath:',basePath)
dataDir = f'{basePath}/DataTCLmp'; # know this from my input script, or see directory contents
print('dataDir:',dataDir)
if os.path.exists(dataDir):
    print('dataDir exists!!!')
else:
    print('dataDir DOES NOT EXIST! -- it may just need time....you may just need to re-run this and the subsequent cells')
basePath: /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory
dataDir: /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp
dataDir exists!!!

List files in folder for a specific case, also using a wildcard#

You could use the following command, but it is not 100% reliable nor safe. it also doesn’t return the list. *os.system(f’ls {dataDir}/Lcol{Lcol}.out’)

Lcol = 100.0
import glob
# Build the wildcard pattern
pattern = os.path.join(dataDir, f"*Lcol{Lcol}.out")
# Get matching files as a Python list
files = glob.glob(pattern)
for f in files:
    print(f)
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DBase_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/RBase_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DFree_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DCol_Lcol100.0.out
/home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/FCol_Lcol100.0.out
plt.close('all')
fname3o = f'DFree_Lcol{Lcol}.out'
fname3 = f'{dataDir}/{fname3o}'
print('fname3:',fname3)
dataDFree = numpy.loadtxt(fname3)
plt.subplot(211)
plt.title(f'Ex1a.Canti2D Lcol={Lcol}')
plt.grid(True)
plt.plot(dataDFree[:,1])
plt.xlabel('Step Number')
plt.ylabel('Free-Node Displacement')
plt.subplot(212)
plt.grid(True)
plt.plot(dataDFree[:,1],dataDFree[:,0])
plt.xlabel('Free-Node Disp.')
plt.ylabel('Pseudo-Time (~Force)')
plt.savefig(f'{dataDir}/Response.jpg')
plt.show()
print(f'plot saved to {dataDir}/Response_Lcol{Lcol}.jpg')
print('End of Run: Ex1a.Canti2D.Push.py.ipynb')
fname3: /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/DFree_Lcol100.0.out
../_images/99168c87cfcd630306d693199ecf7b41858c5b12cf0068d76bb702a432ee6649.png
plot saved to /home/jupyter/Work/stampede3/tapis-jobs-archive/2025-09-10Z/OpenSeesMPsubmit-b280beaf-854e-4264-9b84-49f1d6085a12-007/inputDirectory/DataTCLmp/Response_Lcol100.0.jpg
End of Run: Ex1a.Canti2D.Push.py.ipynb
print('Done!')
Done!