Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

OpenFOAM Job Submission Example

This example demonstrates how to submit and monitor an OpenFOAM CFD simulation using dapi. OpenFOAM is a free, open-source computational fluid dynamics (CFD) software package.

Try on DesignSafe

Overview

This example covers the essential workflow for running OpenFOAM simulations:

Complete Example

Step 1: Install and Import dapi

# Install dapi package
!pip install dapi --user --quiet

# Import required modules
from dapi import DSClient
import json

What this does:

Step 2: Initialize Client

# Initialize DesignSafe client
ds = DSClient()

What this does:

Authentication: dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the authentication guide.

Step 3: Configure Job Parameters

# Job configuration parameters
ds_path: str = "/MyData/template-notebooks/tapis3/OpenFOAM/DH1_run" # Path to OpenFOAM case directory
max_job_minutes: int = 10 # Maximum runtime in minutes
tacc_allocation: str = "ASC25049" # TACC allocation to charge
app_id_to_use = "openfoam-stampede3" # OpenFOAM application ID

# OpenFOAM-specific environment variables
openfoam_env_vars = [
 {"key": "mesh", "value": "On"}, # Enable mesh generation with blockMesh
 {"key": "solver", "value": "pisoFoam"}, # CFD solver to use
 {"key": "decomp", "value": "On"} # Enable domain decomposition for parallel runs
]

What each parameter does:

Alternative solver options:

# Different OpenFOAM solvers you can use
solvers = {
 "pisoFoam": "Transient, incompressible (general purpose)",
 "simpleFoam": "Steady-state, incompressible (RANS)",
 "pimpleFoam": "Transient, incompressible (large time steps)",
 "rhoSimpleFoam": "Steady-state, compressible",
 "sonicFoam": "Transient, compressible (high speed flows)"
}

Step 4: Convert Path to URI

# Convert DesignSafe path to Tapis URI format
input_uri = ds.files.to_uri(ds_path)
print(f"Input Directory Tapis URI: {input_uri}")

What this does:

Step 5: Generate Job Request

# Generate job request dictionary using app defaults
job_dict = ds.jobs.generate(
 app_id=app_id_to_use,
 input_dir_uri=input_uri,
 max_minutes=max_job_minutes,
 allocation=tacc_allocation,
 archive_system="designsafe",
 extra_env_vars=openfoam_env_vars,
 input_dir_param_name="Case Directory" # OpenFOAM apps use "Case Directory" instead of "Input Directory"
)
print(json.dumps(job_dict, indent=2, default=str))

What each parameter does:

Additional options you can add:

# Extended job configuration options
job_dict = ds.jobs.generate(
 app_id=app_id_to_use,
 input_dir_uri=input_uri,
 max_minutes=max_job_minutes,
 allocation=tacc_allocation,
 
 # Resource configuration
 node_count=2, # Number of compute nodes
 cores_per_node=48, # Cores per node (max depends on system)
 memory_mb=96000, # Memory in MB per node
 queue="normal", # Queue: "development", "normal", "large", etc.
 
 # Job metadata
 job_name="my_cfd_simulation", # Custom job name
 description="Wind flow around building", # Job description
 tags=["research", "cfd", "wind-engineering"], # Searchable tags
 
 # Archive configuration
 archive_system="designsafe", # Where to store results
 archive_path="openfoam-results", # Custom archive subdirectory
 
 # Additional environment variables
 extra_env_vars=[
 {"key": "mesh", "value": "On"},
 {"key": "solver", "value": "pisoFoam"},
 {"key": "decomp", "value": "On"},
 {"key": "OMP_NUM_THREADS", "value": "4"} # OpenMP threads per MPI process
 ]
)

Step 6: Customize Resources

# Customize job settings (optional)
job_dict["nodeCount"] = 1 # Use single node
job_dict["coresPerNode"] = 2 # Use 2 cores for parallel simulation
print(json.dumps(job_dict, indent=2, default=str))

What this does:

Resource guidelines:

# Resource selection guidelines
resources = {
 "small_case": {"nodes": 1, "cores": 2, "time": 30}, # < 100K cells
 "medium_case": {"nodes": 1, "cores": 16, "time": 120}, # 100K - 1M cells
 "large_case": {"nodes": 2, "cores": 48, "time": 480}, # > 1M cells
}

Step 7: Submit Job

# Submit the job to TACC
submitted_job = ds.jobs.submit(job_dict)
print(f"Job UUID: {submitted_job.uuid}")

What this does:

Step 8: Monitor Job

# Monitor job execution until completion
final_status = submitted_job.monitor(interval=15) # Check every 15 seconds
print(f"Job {submitted_job.uuid} finished with status: {final_status}")

What this does:

Job status meanings:

job_statuses = {
 "PENDING": "Job submitted but not yet processed",
 "PROCESSING_INPUTS": "Input files being staged",
 "QUEUED": "Job waiting in scheduler queue",
 "RUNNING": "Job actively executing",
 "ARCHIVING": "Output files being archived",
 "FINISHED": "Job completed successfully",
 "FAILED": "Job failed during execution"
}

Step 9: Check Results

# Interpret and display job outcome
ds.jobs.interpret_status(final_status, submitted_job.uuid)

# Display job runtime summary
submitted_job.print_runtime_summary(verbose=False)

# Get current job status
current_status = ds.jobs.status(submitted_job.uuid)
print(f"Current status: {current_status}")

# Display last status message from TACC
print(f"Last message: {submitted_job.last_message}")

What each command does:

Step 10: View Job Output

# Display job output from stdout
stdout_content = submitted_job.get_output_content("tapisjob.out", max_lines=50)
if stdout_content:
 print("Job output:")
 print(stdout_content)

What this does:

Step 11: Access Results

# List contents of job archive directory
archive_uri = submitted_job.archive_uri
print(f"Archive URI: {archive_uri}")
outputs = ds.files.list(archive_uri)
for item in outputs:
 print(f"- {item.name} ({item.type})")

What this does:

Typical OpenFOAM output files:

typical_outputs = {
 "inputDirectory/": "Copy of your case directory with results",
 "tapisjob.out": "Console output from OpenFOAM",
 "tapisjob.err": "Error messages (if any)",
 "tapisjob.sh": "Job script that was executed",
 "postProcessing/": "Force coefficients, residuals, monitoring data",
 "processor*/": "Parallel decomposed solution (if using multiple cores)"
}

Post-processing Results

Extract Force Coefficients

# Convert archive URI to local path for analysis
archive_path = ds.files.to_path(archive_uri)
print(f"Archive path: {archive_path}")

# Import plotting libraries
import numpy as np
import matplotlib.pyplot as plt
import os

# Load force coefficient data using pandas
import pandas as pd

force_data_path = archive_path + "/inputDirectory/postProcessing/forceCoeffs1/0/forceCoeffs.dat"

# Read the file, skipping header lines and using tab separator
data = pd.read_csv(force_data_path, sep='\t', skiprows=9, header=None)
print(f"Loaded force coefficients data with shape: {data.shape}")

What this does:

Force coefficient file format:

# Column meanings in forceCoeffs.dat
columns = {
 0: "Time",
 1: "Cm (moment coefficient)",
 2: "Cd (drag coefficient)",
 3: "Cl (lift coefficient)", 
 4: "Cl(f) (front lift)",
 5: "Cl(r) (rear lift)"
}

Plot Results

# Plot drag coefficient (Cd) vs time
plt.plot(data.iloc[100:, 0], data.iloc[100:, 2])
plt.xlabel('Time')
plt.ylabel('$C_d$')
plt.title('Drag Coefficient vs Time')
plt.grid(True)
plt.show()

# Plot lift coefficient (Cl) vs time 
plt.plot(data.iloc[100:, 0], data.iloc[100:, 3])
plt.xlabel('Time')
plt.ylabel('$C_l$')
plt.title('Lift Coefficient vs Time')
plt.grid(True)
plt.show()

What this does:

Advanced plotting options:

# Create subplots for better comparison
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(data.iloc[100:, 0], data.iloc[100:, 2], 'b-', linewidth=2)
plt.xlabel('Time (s)')
plt.ylabel('$C_d$ (Drag Coefficient)')
plt.title('Drag Coefficient vs Time')
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(data.iloc[100:, 0], data.iloc[100:, 3], 'r-', linewidth=2)
plt.xlabel('Time (s)')
plt.ylabel('$C_l$ (Lift Coefficient)')
plt.title('Lift Coefficient vs Time')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Calculate final values
final_cd = float(data.iloc[-1, 2])
final_cl = float(data.iloc[-1, 3])
print(f"Final drag coefficient: {final_cd:.6f}")
print(f"Final lift coefficient: {final_cl:.6f}")

Configuration Options

Environment Variable Options

# Complete list of OpenFOAM environment variables
openfoam_options = [
 {"key": "mesh", "value": "On"}, # Generate mesh with blockMesh
 {"key": "solver", "value": "pisoFoam"}, # Solver selection
 {"key": "decomp", "value": "On"}, # Enable parallel decomposition
 {"key": "reconstruct", "value": "On"}, # Reconstruct parallel results
 {"key": "postProcess", "value": "On"}, # Run post-processing functions
]

Queue and System Options

# Available queues on different systems
queue_options = {
 "stampede3": {
 "development": {"max_nodes": 2, "max_time": 120}, # 2 hours, testing
 "normal": {"max_nodes": 256, "max_time": 2880}, # 48 hours, production
 "large": {"max_nodes": 512, "max_time": 1440}, # 24 hours, large jobs
 }
}

# System-specific configurations
systems = {
 "stampede3": {"cores_per_node": 48, "memory_per_node": 192000},
 "frontera": {"cores_per_node": 56, "memory_per_node": 192000},
}

Complete Job Request Example

# Full-featured job request showing all options
complete_job = ds.jobs.generate(
 # Required parameters
 app_id="openfoam-stampede3",
 input_dir_uri=input_uri,
 allocation="YOUR_ALLOCATION",
 
 # Resource configuration
 max_minutes=120, # 2 hours
 node_count=2, # Multiple nodes
 cores_per_node=48, # Full node utilization
 memory_mb=192000, # 192 GB RAM
 queue="normal", # Production queue
 
 # Job metadata
 job_name="wind_flow_cfd_simulation",
 description="RANS simulation of wind flow around building using OpenFOAM",
 tags=["research", "cfd", "wind-engineering", "rans", "openfoam"],
 
 # Archive configuration
 archive_system="designsafe",
 archive_path="cfd-results/wind-study", # Results go to MyData/cfd-results/wind-study/
 
 # OpenFOAM configuration
 extra_env_vars=[
 {"key": "mesh", "value": "On"},
 {"key": "solver", "value": "simpleFoam"}, # Steady-state RANS
 {"key": "decomp", "value": "On"},
 {"key": "reconstruct", "value": "On"},
 {"key": "postProcess", "value": "On"},
 ],
 
 # Advanced options
 input_dir_param_name="Case Directory",
)