MPM Job Submission Example
This example demonstrates how to submit and monitor a Material Point Method (MPM) simulation using dapi. MPM is a particle-based numerical method for simulating large deformation problems in geomechanics and fluid mechanics.
🎯 Overview
This example covers the essential workflow for running MPM simulations:
- Installing and importing dapi
- Setting up MPM job parameters and input files
- Configuring analysis types, materials, and boundary conditions
- Submitting and monitoring MPM jobs
- Post-processing results and output analysis
🚀 Complete Example
Step 1: Install and Import dapi
# Install dapi package
!pip install dapi --user --quiet
# Import required modules
from dapi import DSClient
import json
What this does: - Installs the DesignSafe API package from PyPI - Imports the main client class and JSON for pretty-printing job requests
Step 2: Initialize Client
What this does:
- Creates an authenticated connection to DesignSafe services
- Handles OAuth2 authentication automatically
- Sets up connections to Tapis API, file systems, and job services
Authentication: dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the authentication guide.
Step 3: Configure Job Parameters
# Job configuration parameters
ds_path: str = "/CommunityData/dapi/mpm/uniaxial_stress/" # Path to MPM input files
input_filename: str = "mpm.json" # Main MPM configuration file
max_job_minutes: int = 10 # Maximum runtime in minutes
tacc_allocation: str = "ASC25049" # TACC allocation to charge
app_id_to_use = "mpm-s3" # MPM application ID
What each parameter does:
ds_path
: DesignSafe path to your MPM case directory containing input filesinput_filename
: Main MPM configuration file (typically .json format)max_job_minutes
: Maximum wall-clock time for the job (prevents runaway simulations)tacc_allocation
: Your TACC allocation account (required for compute time billing)app_id_to_use
: Specific MPM application version on DesignSafe
MPM input json file structure:
# Typical MPM configuration file contains:
mpm_config = {
"mesh": "mesh.txt", # Computational mesh definition
"particles": "particles.txt", # Material point locations and properties
"materials": { # Material constitutive models
"LinearElastic2D": "For elastic analysis",
"MohrCoulomb": "For soil mechanics",
"NeoHookean": "For large deformation"
},
"analysis": {
"type": "MPMExplicit2D", # Analysis type: 2D or 3D explicit
"nsteps": 1000, # Number of time steps
"dt": 0.001 # Time step size
}
}
Step 4: Convert Path to URI
# Convert DesignSafe path to Tapis URI format
input_uri = ds.files.translate_path_to_uri(ds_path)
print(f"Input Directory Tapis URI: {input_uri}")
What this does:
- Converts human-readable DesignSafe paths (like
/MyData/...
) to Tapis URI format - Tapis URIs are required for job submission and follow the pattern:
tapis://system/path
- Automatically detects your username and the correct storage system
Step 5: Generate Job Request
# Generate job request dictionary using app defaults
job_dict = ds.jobs.generate_request(
app_id=app_id_to_use,
input_dir_uri=input_uri,
script_filename=input_filename,
max_minutes=max_job_minutes,
allocation=tacc_allocation,
archive_system="designsafe",
# MPM-specific job metadata
job_name=f"mpm_uniaxial_stress_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
description="MPM simulation of uniaxial stress test",
tags=["research", "mpm", "geomechanics", "uniaxial-stress"]
)
print(json.dumps(job_dict, indent=2, default=str))
What each parameter does:
app_id
: Specifies which MPM application to runinput_dir_uri
: Location of your MPM input filesscript_filename
: Main MPM configuration file (mpm.json)max_minutes
: Job timeout (prevents infinite runs)allocation
: TACC account to charge for compute timearchive_system
: Where to store results ("designsafe" = your MyData folder)
Additional options you can add:
# Extended job configuration options
job_dict = ds.jobs.generate_request(
app_id=app_id_to_use,
input_dir_uri=input_uri,
script_filename=input_filename,
max_minutes=max_job_minutes,
allocation=tacc_allocation,
# Resource configuration
node_count=1, # Number of compute nodes
cores_per_node=48, # Cores per node (max depends on system)
memory_mb=192000, # Memory in MB per node
queue="skx-dev", # Queue: "skx-dev", "skx", "normal", etc.
# Job metadata
job_name="my_mpm_simulation", # Custom job name
description="Large deformation analysis", # Job description
tags=["research", "mpm", "geomechanics"], # Searchable tags
# Archive configuration
archive_system="designsafe", # Where to store results
archive_path="mpm-results", # Custom archive subdirectory
)
Step 6: Customize Resources
# Customize job settings (optional)
job_dict["nodeCount"] = 1 # Use single node
job_dict["coresPerNode"] = 1 # Use single core for small problems
print(json.dumps(job_dict, indent=2, default=str))
What this does:
- Overrides default resource allocation from the app
nodeCount
: Number of compute nodes (1 for small jobs, multiple for large simulations)coresPerNode
: CPU cores per node (MPM can utilize parallel processing)- More cores = faster solution but higher cost
Resource guidelines:
# Resource selection guidelines for MPM
resources = {
"small_case": {"nodes": 1, "cores": 1, "time": 30}, # < 10K particles
"medium_case": {"nodes": 1, "cores": 16, "time": 120}, # 10K - 100K particles
"large_case": {"nodes": 2, "cores": 48, "time": 480}, # > 100K particles
}
Step 7: Submit Job
# Submit the job to TACC
submitted_job = ds.jobs.submit_request(job_dict)
print(f"Job UUID: {submitted_job.uuid}")
What this does:
- Sends the job request to TACC's job scheduler
- Returns a
SubmittedJob
object for monitoring - Job UUID is a unique identifier for tracking
Step 8: Monitor Job
# Monitor job execution until completion
final_status = submitted_job.monitor(interval=15) # Check every 15 seconds
print(f"Job {submitted_job.uuid} finished with status: {final_status}")
What this does:
- Polls job status at specified intervals (15 seconds)
- Shows progress bars for different job phases
- Returns final status when job completes
interval=15
means check every 15 seconds (can be adjusted)
Job status meanings:
job_statuses = {
"PENDING": "Job submitted but not yet processed",
"PROCESSING_INPUTS": "Input files being staged",
"QUEUED": "Job waiting in scheduler queue",
"RUNNING": "Job actively executing",
"ARCHIVING": "Output files being archived",
"FINISHED": "Job completed successfully",
"FAILED": "Job failed during execution"
}
Step 9: Check Results
# Interpret and display job outcome
ds.jobs.interpret_status(final_status, submitted_job.uuid)
# Display job runtime summary
submitted_job.print_runtime_summary(verbose=False)
# Get current job status
current_status = ds.jobs.get_status(submitted_job.uuid)
print(f"Current status: {current_status}")
# Display last status message from TACC
print(f"Last message: {submitted_job.last_message}")
What each command does:
interpret_status
: Provides human-readable explanation of job outcomeprint_runtime_summary
: Shows time spent in each job phase (queued, running, etc.)get_status
: Gets current job status (useful for checking later)last_message
: Shows last status message from the job scheduler
Step 10: View Job Output
# Display job output from stdout
stdout_content = submitted_job.get_output_content("tapisjob.out", max_lines=50)
if stdout_content:
print("Job output:")
print(stdout_content)
What this does:
- tapisjob.out
contains all console output from your MPM simulation
- max_lines=50
limits output to last 50 lines (prevents overwhelming output)
- Shows MPM solver progress, timestep information, and timing data
Typical MPM output includes:
# Example MPM console output:
mpm_output_info = {
"git_revision": "Version information",
"step_progress": "Step: 1 of 1000",
"warnings": "Material sets, boundary conditions",
"solver_duration": "Explicit USF solver duration: 285 ms",
"completion": "Job execution finished"
}
Step 11: Access Results
# List contents of job archive directory
archive_uri = submitted_job.archive_uri
print(f"Archive URI: {archive_uri}")
outputs = ds.files.list(archive_uri)
for item in outputs:
print(f"- {item.name} ({item.type})")
What this does:
- archive_uri
: Location where job results are stored
- ds.files.list
: Lists all files and directories in the archive
- Shows output files like simulation results, visualization data, and logs
Typical MPM output files:
typical_outputs = {
"inputDirectory/": "Copy of your input directory with results",
"tapisjob.out": "Console output from MPM simulation",
"tapisjob.err": "Error messages (if any)",
"tapisjob.sh": "Job script that was executed",
"results/": "VTK files for visualization (particles, stresses, velocities)",
"*.vtu": "Paraview-compatible visualization files"
}
📊 Post-processing Results
Extract and Analyze Output
# Convert archive URI to local path for analysis
archive_path = ds.files.translate_uri_to_path(archive_uri)
print(f"Archive path: {archive_path}")
# Import analysis libraries
import numpy as np
import matplotlib.pyplot as plt
import os
# Navigate to results directory
results_path = os.path.join(archive_path, "inputDirectory", "results")
if os.path.exists(results_path):
print(f"Results directory: {results_path}")
# List VTK output files
vtk_files = [f for f in os.listdir(results_path) if f.endswith('.vtu')]
print(f"Found {len(vtk_files)} VTK files for visualization")
# Example: Load and analyze particle data (requires appropriate library)
# Note: Actual VTK analysis would require packages like vtk or pyvista
print("Use ParaView or Python VTK libraries to visualize results")
else:
print("No results directory found - check job completion status")
What this does:
translate_uri_to_path
: Converts Tapis URI to local file system pathos.listdir
: Lists files in the results directory.vtu files
: VTK unstructured grid files for visualization- ParaView: Recommended tool for visualizing MPM particle data