Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

OpenSees Job Submission Example

This example demonstrates how to submit and monitor an OpenSees simulation using dapi. OpenSees is a software framework for developing applications to simulate earthquake engineering applications, featuring finite element analysis capabilities for structural and geotechnical systems.

Try on DesignSafe

Overview

This example covers the essential workflow for running OpenSees simulations:

Complete Example

Step 1: Install and Import dapi

# Install dapi package
!pip install dapi --user --quiet

# Import required modules
from dapi import DSClient
import os
import json
import numpy as np
import matplotlib.pyplot as plt
from datetime import date

What this does:

Step 2: Initialize Client

# Initialize DesignSafe client
ds = DSClient()

What this does:

Authentication: dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the authentication guide.

Step 3: Configure Job Parameters

# Job configuration parameters
ds_path = os.getcwd() + "/input" # Path to OpenSees input files
input_filename: str = "Main_multiMotion.tcl" # Main OpenSees script
tacc_allocation: str = "your-allocation" # TACC allocation to charge
app_id: str = "opensees-mp-s3" # OpenSees-MP application ID
max_job_minutes: int = 60 # Maximum runtime in minutes

# Resource configuration
control_nodeCount: int = 1 # Number of compute nodes
control_corespernode: int = 16 # Cores per node for parallel analysis

What each parameter does:

Step 4: Convert Path to URI

# Convert DesignSafe path to Tapis URI format
input_uri = ds.files.to_uri(ds_path)
print(f"Input Directory Tapis URI: {input_uri}")

What this does:

Step 5: Generate Job Request

# Generate job request dictionary using app defaults
job_dict = ds.jobs.generate(
 app_id=app_id,
 input_dir_uri=input_uri,
 script_filename=input_filename,
 max_minutes=max_job_minutes,
 allocation=tacc_allocation,
 # Archive configuration for organized result storage
 archive_system="designsafe",
 archive_path="opensees-results", # Results go to MyData/opensees-results/
 # OpenSees-specific job metadata
 job_name="opensees_multi_motion_analysis",
 description="Multi-free field analysis using OpenSees-MP",
 tags=["research", "opensees", "earthquake", "site-response"]
)
print(json.dumps(job_dict, indent=2, default=str))

What each parameter does:

Additional options you can add:

# Extended job configuration options
job_dict = ds.jobs.generate(
 app_id=app_id,
 input_dir_uri=input_uri,
 script_filename=input_filename,
 max_minutes=max_job_minutes,
 allocation=tacc_allocation,
 
 # Resource configuration
 node_count=1, # Number of compute nodes
 cores_per_node=16, # Cores per node (enables parallel processing)
 memory_mb=192000, # Memory in MB per node
 queue="skx-dev", # Queue: "skx-dev", "skx", "normal", etc.
 
 # Job metadata
 job_name="my_opensees_analysis", # Custom job name
 description="Nonlinear site response analysis", # Job description
 tags=["research", "opensees", "seismic"], # Searchable tags
 
 # Archive configuration
 archive_system="designsafe", # Where to store results
 archive_path="opensees-results/site-response", # Custom archive path
)

Step 6: Customize Resources

# Customize job settings
job_dict["name"] = "opensees-MP-multiMotion-dapi"
job_dict["nodeCount"] = control_nodeCount
job_dict["coresPerNode"] = control_corespernode

print("Generated job request:")
print(json.dumps(job_dict, indent=2, default=str))

What this does:

Resource guidelines: Visit OpenSees userguide on DesignSafe

Step 7: Submit Job

# Submit job using dapi
submitted_job = ds.jobs.submit(job_dict)
print(f"Job launched with UUID: {submitted_job.uuid}")
print("Can also check in DesignSafe portal under - Workspace > Tools & Application > Job Status")

What this does:

Step 8: Monitor Job

# Monitor job status using dapi
final_status = submitted_job.monitor(interval=15) # Check every 15 seconds
print(f"Job finished with status: {final_status}")

What this does:

Job status meanings:

job_statuses = {
 "PENDING": "Job submitted but not yet processed",
 "PROCESSING_INPUTS": "Input files being staged",
 "QUEUED": "Job waiting in scheduler queue", 
 "RUNNING": "Job actively executing",
 "ARCHIVING": "Output files being archived",
 "FINISHED": "Job completed successfully",
 "FAILED": "Job failed during execution"
}

Step 9: Check Results

# Interpret job status
ds.jobs.interpret_status(final_status, submitted_job.uuid)

# Display runtime summary
submitted_job.print_runtime_summary(verbose=False)

# Get current job status
current_status = ds.jobs.status(submitted_job.uuid)
print(f"Current status: {current_status}")

# Display last status message from TACC
print(f"Last message: {submitted_job.last_message}")

What each command does:

Step 10: Access Job Archive and Results

# Get archive information using dapi
archive_uri = submitted_job.archive_uri
print(f"Archive URI: {archive_uri}")

# Translate archive URI to local DesignSafe path
local_archive_path = ds.files.to_path(archive_uri)
print(f"Local archive path: {local_archive_path}")

# List archive contents
archive_files = ds.files.list(archive_uri)
print("\nArchive contents:")
for item in archive_files:
 print(f"- {item.name} ({item.type})")

What this does:

Typical OpenSees output files:

typical_outputs = {
 "inputDirectory/": "Copy of your input directory with results",
 "tapisjob.out": "Console output from OpenSees analysis",
 "tapisjob.err": "Error messages (if any)",
 "tapisjob.sh": "Job script that was executed",
 "Profile*_acc*.out": "Acceleration time histories",
 "Profile*_Gstress*.out": "Stress time histories",
 "opensees.zip": "Compressed results package"
}

Step 11: Access Results from Input Directory

# Download the inputDirectory folder which contains results
input_dir_archive_uri = f"{archive_uri}/inputDirectory"
try:
 # List contents of inputDirectory in archive
 input_dir_files = ds.files.list(input_dir_archive_uri)
 print("\nFiles in inputDirectory:")
 for item in input_dir_files:
 print(f"- {item.name} ({item.type})")
 
 # Change to the archive directory for post-processing
 archive_path = ds.files.to_path(input_dir_archive_uri)
 os.chdir(archive_path)
 print(f"\nChanged to directory: {archive_path}")
 
except Exception as e:
 print(f"Error accessing archive: {e}")

What this does:

Post-processing Results

Response Spectra Analysis

# Define response spectra function
def resp_spectra(a, time, nstep):
 """
 This function builds response spectra from acceleration time history,
 a should be a numpy array, T and nStep should be integers.
 """
 # Add initial zero value to acceleration
 a = np.insert(a, 0, 0)
 # Number of periods at which spectral values are computed
 nperiod = 100
 # Define range of considered periods by power of 10
 minpower = -3.0
 maxpower = 1.0
 # Create vector of considered periods
 p = np.logspace(minpower, maxpower, nperiod)
 # Incremental circular frequency
 dw = 2.0 * np.pi / time
 # Vector of circular frequency
 w = np.arange(0, (nstep + 1) * dw, dw)
 # Fast fourier transform of acceleration
 afft = np.fft.fft(a)
 # Arbitrary stiffness value
 k = 1000.0
 # Damping ratio
 damp = 0.05
 umax = np.zeros(nperiod)
 vmax = np.zeros(nperiod)
 amax = np.zeros(nperiod)
 
 # Loop to compute spectral values at each period
 for j in range(0, nperiod):
 # Compute mass and dashpot coefficient for desired periods
 m = ((p[j] / (2 * np.pi)) ** 2) * k
 c = 2 * damp * (k * m) ** 0.5
 h = np.zeros(nstep + 2, dtype=complex)
 
 # Compute transfer function
 for l in range(0, int(nstep / 2 + 1)):
 h[l] = 1.0 / (-m * w[l] * w[l] + 1j * c * w[l] + k)
 # Mirror image of transfer function
 h[nstep + 1 - l] = np.conj(h[l])

 # Compute displacement in frequency domain using transfer function
 qfft = -m * afft
 u = np.zeros(nstep + 1, dtype=complex)
 for l in range(0, nstep + 1):
 u[l] = h[l] * qfft[l]

 # Compute displacement in time domain (ignore imaginary part)
 utime = np.real(np.fft.ifft(u))

 # Spectral displacement, velocity, and acceleration
 umax[j] = np.max(np.abs(utime))
 vmax[j] = (2 * np.pi / p[j]) * umax[j]
 amax[j] = (2 * np.pi / p[j]) * vmax[j]

 return p, umax, vmax, amax

What this function does:

Plot Response Spectra

# Define plotting function
def plot_acc():
 """
 Plot acceleration response spectra on log-linear scale
 """
 plt.figure(figsize=(10, 6))

 # Plot response spectra for each profile
 for motion in ["motion1"]:
 for profile in ["A", "B", "C", "D"]:
 try:
 # Load acceleration data
 acc = np.loadtxt(f"Profile{profile}_acc{motion}.out")
 
 # Compute response spectra
 [p, umax, vmax, amax] = resp_spectra(acc[:, -1], acc[-1, 0], acc.shape[0])
 
 # Plot spectral acceleration
 plt.semilogx(p, amax, label=f"Profile {profile}", linewidth=2)
 
 except FileNotFoundError:
 print(f"File Profile{profile}_acc{motion}.out not found")

 # Format plot
 plt.ylabel("$S_a$ (g)")
 plt.xlabel("Period (s)")
 plt.title("Acceleration Response Spectra (5% Damping)")
 plt.grid(True, alpha=0.3)
 plt.legend()
 plt.xlim([0.01, 10])
 plt.show()

# Execute the plotting function
plot_acc()

What this does:

Advanced Post-processing

# Analyze stress time histories
def analyze_stress_results():
 """
 Analyze stress time histories from OpenSees output
 """
 stress_files = [f for f in os.listdir('.') if 'Gstress' in f and f.endswith('.out')]
 
 print(f"Found {len(stress_files)} stress output files:")
 for file in stress_files:
 print(f"- {file}")
 
 # Load stress data
 try:
 stress_data = np.loadtxt(file)
 
 # Basic statistics
 max_stress = np.max(np.abs(stress_data[:, 1:])) # Skip time column
 print(f"Maximum stress magnitude: {max_stress:.2f}")
 
 # Plot time history
 plt.figure(figsize=(10, 4))
 plt.plot(stress_data[:, 0], stress_data[:, 1], label='Shear Stress')
 plt.xlabel('Time (s)')
 plt.ylabel('Stress (kPa)')
 plt.title(f'Stress Time History - {file}')
 plt.grid(True, alpha=0.3)
 plt.legend()
 plt.show()
 
 except Exception as e:
 print(f"Error loading {file}: {e}")

# Run stress analysis
analyze_stress_results()

What this does: