OpenFOAM Job Submission Example
This example demonstrates how to submit and monitor an OpenFOAM CFD simulation using dapi. OpenFOAM is a free, open-source computational fluid dynamics (CFD) software package.
🎯 Overview
This example covers the essential workflow for running OpenFOAM simulations:
- Installing and importing dapi
- Setting up OpenFOAM job parameters
- Configuring solvers, mesh generation, and decomposition
- Submitting and monitoring CFD jobs
- Post-processing results with force coefficient analysis
🚀 Complete Example
Step 1: Install and Import dapi
# Install dapi package
!pip install dapi --user --quiet
# Import required modules
from dapi import DSClient
import json
What this does: - Installs the DesignSafe API package from PyPI - Imports the main client class and JSON for pretty-printing job requests
Step 2: Initialize Client
What this does:
- Creates an authenticated connection to DesignSafe services
- Handles OAuth2 authentication automatically
- Sets up connections to Tapis API, file systems, and job services
Authentication: dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the authentication guide.
Step 3: Configure Job Parameters
# Job configuration parameters
ds_path: str = "/MyData/template-notebooks/tapis3/OpenFOAM/DH1_run" # Path to OpenFOAM case directory
max_job_minutes: int = 10 # Maximum runtime in minutes
tacc_allocation: str = "ASC25049" # TACC allocation to charge
app_id_to_use = "openfoam-stampede3" # OpenFOAM application ID
# OpenFOAM-specific environment variables
openfoam_env_vars = [
{"key": "mesh", "value": "On"}, # Enable mesh generation with blockMesh
{"key": "solver", "value": "pisoFoam"}, # CFD solver to use
{"key": "decomp", "value": "On"} # Enable domain decomposition for parallel runs
]
What each parameter does:
ds_path
: DesignSafe path to your OpenFOAM case directory containing 0/, constant/, and system/ foldersmax_job_minutes
: Maximum wall-clock time for the job (prevents runaway simulations)tacc_allocation
: Your TACC allocation account (required for compute time billing)app_id_to_use
: Specific OpenFOAM application version on DesignSafeopenfoam_env_vars
: OpenFOAM-specific configuration:mesh: "On"
- Runs blockMesh to generate computational meshsolver: "pisoFoam"
- Transient, incompressible Navier-Stokes solverdecomp: "On"
- Enables parallel domain decomposition for multi-core runs
Alternative solver options:
# Different OpenFOAM solvers you can use
solvers = {
"pisoFoam": "Transient, incompressible (general purpose)",
"simpleFoam": "Steady-state, incompressible (RANS)",
"pimpleFoam": "Transient, incompressible (large time steps)",
"rhoSimpleFoam": "Steady-state, compressible",
"sonicFoam": "Transient, compressible (high speed flows)"
}
Step 4: Convert Path to URI
# Convert DesignSafe path to Tapis URI format
input_uri = ds.files.translate_path_to_uri(ds_path)
print(f"Input Directory Tapis URI: {input_uri}")
What this does:
- Converts human-readable DesignSafe paths (like
/MyData/...
) to Tapis URI format - Tapis URIs are required for job submission and follow the pattern:
tapis://system/path
- Automatically detects your username and the correct storage system
Step 5: Generate Job Request
# Generate job request dictionary using app defaults
job_dict = ds.jobs.generate_request(
app_id=app_id_to_use,
input_dir_uri=input_uri,
max_minutes=max_job_minutes,
allocation=tacc_allocation,
archive_system="designsafe",
extra_env_vars=openfoam_env_vars,
input_dir_param_name="Case Directory" # OpenFOAM apps use "Case Directory" instead of "Input Directory"
)
print(json.dumps(job_dict, indent=2, default=str))
What each parameter does:
app_id
: Specifies which application to runinput_dir_uri
: Location of your OpenFOAM case filesmax_minutes
: Job timeout (prevents infinite runs)allocation
: TACC account to charge for compute timearchive_system
: Where to store results ("designsafe" = your MyData folder)extra_env_vars
: OpenFOAM-specific settings passed to the applicationinput_dir_param_name
: OpenFOAM apps expect "Case Directory" not "Input Directory"
Additional options you can add:
# Extended job configuration options
job_dict = ds.jobs.generate_request(
app_id=app_id_to_use,
input_dir_uri=input_uri,
max_minutes=max_job_minutes,
allocation=tacc_allocation,
# Resource configuration
node_count=2, # Number of compute nodes
cores_per_node=48, # Cores per node (max depends on system)
memory_mb=96000, # Memory in MB per node
queue="normal", # Queue: "development", "normal", "large", etc.
# Job metadata
job_name="my_cfd_simulation", # Custom job name
description="Wind flow around building", # Job description
tags=["research", "cfd", "wind-engineering"], # Searchable tags
# Archive configuration
archive_system="designsafe", # Where to store results
archive_path="openfoam-results", # Custom archive subdirectory
# Additional environment variables
extra_env_vars=[
{"key": "mesh", "value": "On"},
{"key": "solver", "value": "pisoFoam"},
{"key": "decomp", "value": "On"},
{"key": "OMP_NUM_THREADS", "value": "4"} # OpenMP threads per MPI process
]
)
Step 6: Customize Resources
# Customize job settings (optional)
job_dict["nodeCount"] = 1 # Use single node
job_dict["coresPerNode"] = 2 # Use 2 cores for parallel simulation
print(json.dumps(job_dict, indent=2, default=str))
What this does:
- Overrides default resource allocation from the app
nodeCount
: Number of compute nodes (1 for small jobs, multiple for large simulations)coresPerNode
: CPU cores per node (enables parallel processing)- More cores = faster solution but higher cost
Resource guidelines:
# Resource selection guidelines
resources = {
"small_case": {"nodes": 1, "cores": 2, "time": 30}, # < 100K cells
"medium_case": {"nodes": 1, "cores": 16, "time": 120}, # 100K - 1M cells
"large_case": {"nodes": 2, "cores": 48, "time": 480}, # > 1M cells
}
Step 7: Submit Job
# Submit the job to TACC
submitted_job = ds.jobs.submit_request(job_dict)
print(f"Job UUID: {submitted_job.uuid}")
What this does:
- Sends the job request to TACC's job scheduler
- Returns a
SubmittedJob
object for monitoring - Job UUID is a unique identifier for tracking
Step 8: Monitor Job
# Monitor job execution until completion
final_status = submitted_job.monitor(interval=15) # Check every 15 seconds
print(f"Job {submitted_job.uuid} finished with status: {final_status}")
What this does:
- Polls job status at specified intervals (15 seconds)
- Shows progress bars for different job phases
- Returns final status when job completes
interval=15
means check every 15 seconds (can be adjusted)
Job status meanings:
job_statuses = {
"PENDING": "Job submitted but not yet processed",
"PROCESSING_INPUTS": "Input files being staged",
"QUEUED": "Job waiting in scheduler queue",
"RUNNING": "Job actively executing",
"ARCHIVING": "Output files being archived",
"FINISHED": "Job completed successfully",
"FAILED": "Job failed during execution"
}
Step 9: Check Results
# Interpret and display job outcome
ds.jobs.interpret_status(final_status, submitted_job.uuid)
# Display job runtime summary
submitted_job.print_runtime_summary(verbose=False)
# Get current job status
current_status = ds.jobs.get_status(submitted_job.uuid)
print(f"Current status: {current_status}")
# Display last status message from TACC
print(f"Last message: {submitted_job.last_message}")
What each command does:
interpret_status
: Provides human-readable explanation of job outcomeprint_runtime_summary
: Shows time spent in each job phase (queued, running, etc.)get_status
: Gets current job status (useful for checking later)last_message
: Shows last status message from the job scheduler
Step 10: View Job Output
# Display job output from stdout
stdout_content = submitted_job.get_output_content("tapisjob.out", max_lines=50)
if stdout_content:
print("Job output:")
print(stdout_content)
What this does:
- tapisjob.out
contains all console output from your OpenFOAM simulation
- max_lines=50
limits output to last 50 lines (prevents overwhelming output)
- Shows OpenFOAM solver progress, residuals, and timing information
Step 11: Access Results
# List contents of job archive directory
archive_uri = submitted_job.archive_uri
print(f"Archive URI: {archive_uri}")
outputs = ds.files.list(archive_uri)
for item in outputs:
print(f"- {item.name} ({item.type})")
What this does:
- archive_uri
: Location where job results are stored
- ds.files.list
: Lists all files and directories in the archive
- Shows output files like mesh, solution fields, and post-processing data
Typical OpenFOAM output files:
typical_outputs = {
"inputDirectory/": "Copy of your case directory with results",
"tapisjob.out": "Console output from OpenFOAM",
"tapisjob.err": "Error messages (if any)",
"tapisjob.sh": "Job script that was executed",
"postProcessing/": "Force coefficients, residuals, monitoring data",
"processor*/": "Parallel decomposed solution (if using multiple cores)"
}
📊 Post-processing Results
Extract Force Coefficients
# Convert archive URI to local path for analysis
archive_path = ds.files.translate_uri_to_path(archive_uri)
print(f"Archive path: {archive_path}")
# Import plotting libraries
import numpy as np
import matplotlib.pyplot as plt
import os
# Load force coefficient data using pandas
import pandas as pd
force_data_path = archive_path + "/inputDirectory/postProcessing/forceCoeffs1/0/forceCoeffs.dat"
# Read the file, skipping header lines and using tab separator
data = pd.read_csv(force_data_path, sep='\t', skiprows=9, header=None)
print(f"Loaded force coefficients data with shape: {data.shape}")
What this does:
translate_uri_to_path
: Converts Tapis URI to local file system pathpandas.read_csv
: Reads force coefficient data (much cleaner than manual parsing)skiprows=9
: Skips OpenFOAM header linessep='\t'
: Uses tab separator (OpenFOAM default)
Force coefficient file format:
# Column meanings in forceCoeffs.dat
columns = {
0: "Time",
1: "Cm (moment coefficient)",
2: "Cd (drag coefficient)",
3: "Cl (lift coefficient)",
4: "Cl(f) (front lift)",
5: "Cl(r) (rear lift)"
}
Plot Results
# Plot drag coefficient (Cd) vs time
plt.plot(data.iloc[100:, 0], data.iloc[100:, 2])
plt.xlabel('Time')
plt.ylabel('$C_d$')
plt.title('Drag Coefficient vs Time')
plt.grid(True)
plt.show()
# Plot lift coefficient (Cl) vs time
plt.plot(data.iloc[100:, 0], data.iloc[100:, 3])
plt.xlabel('Time')
plt.ylabel('$C_l$')
plt.title('Lift Coefficient vs Time')
plt.grid(True)
plt.show()
What this does:
data.iloc[100:, 0]
: Time values (column 0) starting from row 100data.iloc[100:, 2]
: Drag coefficient values (column 2)[100:]
: Skips initial transient period for cleaner plots- Creates separate plots for drag and lift coefficients
Advanced plotting options:
# Create subplots for better comparison
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(data.iloc[100:, 0], data.iloc[100:, 2], 'b-', linewidth=2)
plt.xlabel('Time (s)')
plt.ylabel('$C_d$ (Drag Coefficient)')
plt.title('Drag Coefficient vs Time')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(data.iloc[100:, 0], data.iloc[100:, 3], 'r-', linewidth=2)
plt.xlabel('Time (s)')
plt.ylabel('$C_l$ (Lift Coefficient)')
plt.title('Lift Coefficient vs Time')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Calculate final values
final_cd = float(data.iloc[-1, 2])
final_cl = float(data.iloc[-1, 3])
print(f"Final drag coefficient: {final_cd:.6f}")
print(f"Final lift coefficient: {final_cl:.6f}")
🔧 Configuration Options
Environment Variable Options
# Complete list of OpenFOAM environment variables
openfoam_options = [
{"key": "mesh", "value": "On"}, # Generate mesh with blockMesh
{"key": "solver", "value": "pisoFoam"}, # Solver selection
{"key": "decomp", "value": "On"}, # Enable parallel decomposition
{"key": "reconstruct", "value": "On"}, # Reconstruct parallel results
{"key": "postProcess", "value": "On"}, # Run post-processing functions
]
Queue and System Options
# Available queues on different systems
queue_options = {
"stampede3": {
"development": {"max_nodes": 2, "max_time": 120}, # 2 hours, testing
"normal": {"max_nodes": 256, "max_time": 2880}, # 48 hours, production
"large": {"max_nodes": 512, "max_time": 1440}, # 24 hours, large jobs
}
}
# System-specific configurations
systems = {
"stampede3": {"cores_per_node": 48, "memory_per_node": 192000},
"frontera": {"cores_per_node": 56, "memory_per_node": 192000},
}
Complete Job Request Example
# Full-featured job request showing all options
complete_job = ds.jobs.generate_request(
# Required parameters
app_id="openfoam-stampede3",
input_dir_uri=input_uri,
allocation="YOUR_ALLOCATION",
# Resource configuration
max_minutes=120, # 2 hours
node_count=2, # Multiple nodes
cores_per_node=48, # Full node utilization
memory_mb=192000, # 192 GB RAM
queue="normal", # Production queue
# Job metadata
job_name="wind_flow_cfd_simulation",
description="RANS simulation of wind flow around building using OpenFOAM",
tags=["research", "cfd", "wind-engineering", "rans", "openfoam"],
# Archive configuration
archive_system="designsafe",
archive_path="cfd-results/wind-study", # Results go to MyData/cfd-results/wind-study/
# OpenFOAM configuration
extra_env_vars=[
{"key": "mesh", "value": "On"},
{"key": "solver", "value": "simpleFoam"}, # Steady-state RANS
{"key": "decomp", "value": "On"},
{"key": "reconstruct", "value": "On"},
{"key": "postProcess", "value": "On"},
],
# Advanced options
input_dir_param_name="Case Directory",
)