This example demonstrates how to submit and monitor an OpenSees simulation using dapi. OpenSees is a software framework for developing applications to simulate earthquake engineering applications, featuring finite element analysis capabilities for structural and geotechnical systems.
Overview¶
This example covers the essential workflow for running OpenSees simulations:
Installing and importing dapi
Setting up OpenSees job parameters and script files
Configuring multi-processor runs and resource allocation
Submitting and monitoring structural analysis jobs
Post-processing results with response spectra analysis
Complete Example¶
Step 1: Install and Import dapi¶
# Install dapi package
!pip install dapi --user --quiet
# Import required modules
from dapi import DSClient
import os
import json
import numpy as np
import matplotlib.pyplot as plt
from datetime import dateWhat this does:
Installs the DesignSafe API package from PyPI
Imports the main client class and analysis libraries
Sets up matplotlib for response spectra plotting
Step 2: Initialize Client¶
# Initialize DesignSafe client
ds = DSClient()What this does:
Creates an authenticated connection to DesignSafe services
Handles OAuth2 authentication automatically
Sets up connections to Tapis API, file systems, and job services
Authentication: dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the authentication guide.
Step 3: Configure Job Parameters¶
# Job configuration parameters
ds_path = os.getcwd() + "/input" # Path to OpenSees input files
input_filename: str = "Main_multiMotion.tcl" # Main OpenSees script
tacc_allocation: str = "your-allocation" # TACC allocation to charge
app_id: str = "opensees-mp-s3" # OpenSees-MP application ID
max_job_minutes: int = 60 # Maximum runtime in minutes
# Resource configuration
control_nodeCount: int = 1 # Number of compute nodes
control_corespernode: int = 16 # Cores per node for parallel analysisWhat each parameter does:
ds_path: DesignSafe path to your OpenSees case directory containing .tcl filesinput_filename: Main OpenSees script file (typically .tcl format)tacc_allocation: Your TACC allocation account (required for compute time billing)app_id: Specific OpenSees application version on DesignSafemax_job_minutes: Maximum wall-clock time for the job (prevents runaway simulations)control_nodeCount: Number of compute nodes (typically 1 for most analyses)control_corespernode: CPU cores for parallel processing
Step 4: Convert Path to URI¶
# Convert DesignSafe path to Tapis URI format
input_uri = ds.files.to_uri(ds_path)
print(f"Input Directory Tapis URI: {input_uri}")What this does:
Converts human-readable DesignSafe paths (like
/MyData/...) to Tapis URI formatTapis URIs are required for job submission and follow the pattern:
tapis://system/pathAutomatically detects your username and the correct storage system
Step 5: Generate Job Request¶
# Generate job request dictionary using app defaults
job_dict = ds.jobs.generate(
app_id=app_id,
input_dir_uri=input_uri,
script_filename=input_filename,
max_minutes=max_job_minutes,
allocation=tacc_allocation,
# Archive configuration for organized result storage
archive_system="designsafe",
archive_path="opensees-results", # Results go to MyData/opensees-results/
# OpenSees-specific job metadata
job_name="opensees_multi_motion_analysis",
description="Multi-free field analysis using OpenSees-MP",
tags=["research", "opensees", "earthquake", "site-response"]
)
print(json.dumps(job_dict, indent=2, default=str))What each parameter does:
app_id: Specifies which OpenSees application to runinput_dir_uri: Location of your OpenSees script filesscript_filename: Main OpenSees .tcl script filemax_minutes: Job timeout (prevents infinite runs)allocation: TACC account to charge for compute timearchive_system: Where to store results (“designsafe” = your MyData folder)archive_path: Custom subdirectory for organized results
Additional options you can add:
# Extended job configuration options
job_dict = ds.jobs.generate(
app_id=app_id,
input_dir_uri=input_uri,
script_filename=input_filename,
max_minutes=max_job_minutes,
allocation=tacc_allocation,
# Resource configuration
node_count=1, # Number of compute nodes
cores_per_node=16, # Cores per node (enables parallel processing)
memory_mb=192000, # Memory in MB per node
queue="skx-dev", # Queue: "skx-dev", "skx", "normal", etc.
# Job metadata
job_name="my_opensees_analysis", # Custom job name
description="Nonlinear site response analysis", # Job description
tags=["research", "opensees", "seismic"], # Searchable tags
# Archive configuration
archive_system="designsafe", # Where to store results
archive_path="opensees-results/site-response", # Custom archive path
)Step 6: Customize Resources¶
# Customize job settings
job_dict["name"] = "opensees-MP-multiMotion-dapi"
job_dict["nodeCount"] = control_nodeCount
job_dict["coresPerNode"] = control_corespernode
print("Generated job request:")
print(json.dumps(job_dict, indent=2, default=str))What this does:
Overrides default resource allocation from the app
nodeCount: Number of compute nodes (1 for most OpenSees analyses)coresPerNode: CPU cores per node (enables parallel element processing)More cores = faster solution for large models but higher cost
Resource guidelines: Visit OpenSees userguide on DesignSafe
Step 7: Submit Job¶
# Submit job using dapi
submitted_job = ds.jobs.submit(job_dict)
print(f"Job launched with UUID: {submitted_job.uuid}")
print("Can also check in DesignSafe portal under - Workspace > Tools & Application > Job Status")What this does:
Sends the job request to TACC’s job scheduler
Returns a
SubmittedJobobject for monitoringJob UUID is a unique identifier for tracking
Provides DesignSafe portal link for status checking
Step 8: Monitor Job¶
# Monitor job status using dapi
final_status = submitted_job.monitor(interval=15) # Check every 15 seconds
print(f"Job finished with status: {final_status}")What this does:
Polls job status at specified intervals (15 seconds)
Shows progress bars for different job phases
Returns final status when job completes
interval=15means check every 15 seconds (can be adjusted)
Job status meanings:
job_statuses = {
"PENDING": "Job submitted but not yet processed",
"PROCESSING_INPUTS": "Input files being staged",
"QUEUED": "Job waiting in scheduler queue",
"RUNNING": "Job actively executing",
"ARCHIVING": "Output files being archived",
"FINISHED": "Job completed successfully",
"FAILED": "Job failed during execution"
}Step 9: Check Results¶
# Interpret job status
ds.jobs.interpret_status(final_status, submitted_job.uuid)
# Display runtime summary
submitted_job.print_runtime_summary(verbose=False)
# Get current job status
current_status = ds.jobs.status(submitted_job.uuid)
print(f"Current status: {current_status}")
# Display last status message from TACC
print(f"Last message: {submitted_job.last_message}")What each command does:
interpret_status: Provides human-readable explanation of job outcomeprint_runtime_summary: Shows time spent in each job phase (queued, running, etc.)status: Gets current job status (useful for checking later)last_message: Shows last status message from the job scheduler
Step 10: Access Job Archive and Results¶
# Get archive information using dapi
archive_uri = submitted_job.archive_uri
print(f"Archive URI: {archive_uri}")
# Translate archive URI to local DesignSafe path
local_archive_path = ds.files.to_path(archive_uri)
print(f"Local archive path: {local_archive_path}")
# List archive contents
archive_files = ds.files.list(archive_uri)
print("\nArchive contents:")
for item in archive_files:
print(f"- {item.name} ({item.type})")What this does:
archive_uri: Location where job results are storedto_path: Converts Tapis URI to local path for analysisds.files.list: Lists all files and directories in the archiveShows output files like analysis results, output data, and logs
Typical OpenSees output files:
typical_outputs = {
"inputDirectory/": "Copy of your input directory with results",
"tapisjob.out": "Console output from OpenSees analysis",
"tapisjob.err": "Error messages (if any)",
"tapisjob.sh": "Job script that was executed",
"Profile*_acc*.out": "Acceleration time histories",
"Profile*_Gstress*.out": "Stress time histories",
"opensees.zip": "Compressed results package"
}Step 11: Access Results from Input Directory¶
# Download the inputDirectory folder which contains results
input_dir_archive_uri = f"{archive_uri}/inputDirectory"
try:
# List contents of inputDirectory in archive
input_dir_files = ds.files.list(input_dir_archive_uri)
print("\nFiles in inputDirectory:")
for item in input_dir_files:
print(f"- {item.name} ({item.type})")
# Change to the archive directory for post-processing
archive_path = ds.files.to_path(input_dir_archive_uri)
os.chdir(archive_path)
print(f"\nChanged to directory: {archive_path}")
except Exception as e:
print(f"Error accessing archive: {e}")What this does:
inputDirectory: Contains all your original files plus generated outputsos.chdir: Changes to the results directory for analysisLists all available output files for post-processing
Provides access to acceleration and stress time histories
Post-processing Results¶
Response Spectra Analysis¶
# Define response spectra function
def resp_spectra(a, time, nstep):
"""
This function builds response spectra from acceleration time history,
a should be a numpy array, T and nStep should be integers.
"""
# Add initial zero value to acceleration
a = np.insert(a, 0, 0)
# Number of periods at which spectral values are computed
nperiod = 100
# Define range of considered periods by power of 10
minpower = -3.0
maxpower = 1.0
# Create vector of considered periods
p = np.logspace(minpower, maxpower, nperiod)
# Incremental circular frequency
dw = 2.0 * np.pi / time
# Vector of circular frequency
w = np.arange(0, (nstep + 1) * dw, dw)
# Fast fourier transform of acceleration
afft = np.fft.fft(a)
# Arbitrary stiffness value
k = 1000.0
# Damping ratio
damp = 0.05
umax = np.zeros(nperiod)
vmax = np.zeros(nperiod)
amax = np.zeros(nperiod)
# Loop to compute spectral values at each period
for j in range(0, nperiod):
# Compute mass and dashpot coefficient for desired periods
m = ((p[j] / (2 * np.pi)) ** 2) * k
c = 2 * damp * (k * m) ** 0.5
h = np.zeros(nstep + 2, dtype=complex)
# Compute transfer function
for l in range(0, int(nstep / 2 + 1)):
h[l] = 1.0 / (-m * w[l] * w[l] + 1j * c * w[l] + k)
# Mirror image of transfer function
h[nstep + 1 - l] = np.conj(h[l])
# Compute displacement in frequency domain using transfer function
qfft = -m * afft
u = np.zeros(nstep + 1, dtype=complex)
for l in range(0, nstep + 1):
u[l] = h[l] * qfft[l]
# Compute displacement in time domain (ignore imaginary part)
utime = np.real(np.fft.ifft(u))
# Spectral displacement, velocity, and acceleration
umax[j] = np.max(np.abs(utime))
vmax[j] = (2 * np.pi / p[j]) * umax[j]
amax[j] = (2 * np.pi / p[j]) * vmax[j]
return p, umax, vmax, amaxWhat this function does:
resp_spectra: Computes response spectra from acceleration time historyp: Period vector (logarithmically spaced)umax, vmax, amax: Spectral displacement, velocity, and accelerationdamp=0.05: 5% damping ratio (typical for structures)FFT: Uses Fast Fourier Transform for efficient computation
Plot Response Spectra¶
# Define plotting function
def plot_acc():
"""
Plot acceleration response spectra on log-linear scale
"""
plt.figure(figsize=(10, 6))
# Plot response spectra for each profile
for motion in ["motion1"]:
for profile in ["A", "B", "C", "D"]:
try:
# Load acceleration data
acc = np.loadtxt(f"Profile{profile}_acc{motion}.out")
# Compute response spectra
[p, umax, vmax, amax] = resp_spectra(acc[:, -1], acc[-1, 0], acc.shape[0])
# Plot spectral acceleration
plt.semilogx(p, amax, label=f"Profile {profile}", linewidth=2)
except FileNotFoundError:
print(f"File Profile{profile}_acc{motion}.out not found")
# Format plot
plt.ylabel("$S_a$ (g)")
plt.xlabel("Period (s)")
plt.title("Acceleration Response Spectra (5% Damping)")
plt.grid(True, alpha=0.3)
plt.legend()
plt.xlim([0.01, 10])
plt.show()
# Execute the plotting function
plot_acc()What this does:
plt.semilogx: Creates log-scale plot for period axisProfile*_acc*.out: Loads acceleration time histories from OpenSees outputlinewidth=2: Makes lines more visible in plotsMultiple profiles: Compares response across different soil profiles
5% damping: Standard damping for structural response spectra
Advanced Post-processing¶
# Analyze stress time histories
def analyze_stress_results():
"""
Analyze stress time histories from OpenSees output
"""
stress_files = [f for f in os.listdir('.') if 'Gstress' in f and f.endswith('.out')]
print(f"Found {len(stress_files)} stress output files:")
for file in stress_files:
print(f"- {file}")
# Load stress data
try:
stress_data = np.loadtxt(file)
# Basic statistics
max_stress = np.max(np.abs(stress_data[:, 1:])) # Skip time column
print(f"Maximum stress magnitude: {max_stress:.2f}")
# Plot time history
plt.figure(figsize=(10, 4))
plt.plot(stress_data[:, 0], stress_data[:, 1], label='Shear Stress')
plt.xlabel('Time (s)')
plt.ylabel('Stress (kPa)')
plt.title(f'Stress Time History - {file}')
plt.grid(True, alpha=0.3)
plt.legend()
plt.show()
except Exception as e:
print(f"Error loading {file}: {e}")
# Run stress analysis
analyze_stress_results()What this does:
stress_files: Finds all stress output files automaticallynp.loadtxt: Loads numerical data from OpenSees outputmax_stress: Computes maximum stress magnitudeTime history plots: Visualizes stress evolution during earthquake