Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

MPM Job Submission Example

This example demonstrates how to submit and monitor a Material Point Method (MPM) simulation using dapi. MPM is a particle-based numerical method for simulating large deformation problems in geomechanics and fluid mechanics.

Try on DesignSafe

Overview

This example covers the essential workflow for running MPM simulations:

Complete Example

Step 1: Install and Import dapi

# Install dapi package
!pip install dapi --user --quiet

# Import required modules
from dapi import DSClient
import json

What this does:

Step 2: Initialize Client

# Initialize DesignSafe client
ds = DSClient()

What this does:

Authentication: dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the authentication guide.

Step 3: Configure Job Parameters

# Job configuration parameters
ds_path: str = "/CommunityData/dapi/mpm/uniaxial_stress/" # Path to MPM input files
input_filename: str = "mpm.json" # Main MPM configuration file
max_job_minutes: int = 10 # Maximum runtime in minutes
tacc_allocation: str = "ASC25049" # TACC allocation to charge
app_id_to_use = "mpm-s3" # MPM application ID

What each parameter does:

MPM input json file structure:

# Typical MPM configuration file contains:
mpm_config = {
 "mesh": "mesh.txt", # Computational mesh definition
 "particles": "particles.txt", # Material point locations and properties
 "materials": { # Material constitutive models
 "LinearElastic2D": "For elastic analysis",
 "MohrCoulomb": "For soil mechanics",
 "NeoHookean": "For large deformation"
 },
 "analysis": {
 "type": "MPMExplicit2D", # Analysis type: 2D or 3D explicit
 "nsteps": 1000, # Number of time steps
 "dt": 0.001 # Time step size
 }
}

Step 4: Convert Path to URI

# Convert DesignSafe path to Tapis URI format
input_uri = ds.files.to_uri(ds_path)
print(f"Input Directory Tapis URI: {input_uri}")

What this does:

Step 5: Generate Job Request

# Generate job request dictionary using app defaults
job_dict = ds.jobs.generate(
 app_id=app_id_to_use,
 input_dir_uri=input_uri,
 script_filename=input_filename,
 max_minutes=max_job_minutes,
 allocation=tacc_allocation,
 archive_system="designsafe",
 # MPM-specific job metadata
 job_name=f"mpm_uniaxial_stress_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
 description="MPM simulation of uniaxial stress test",
 tags=["research", "mpm", "geomechanics", "uniaxial-stress"]
)
print(json.dumps(job_dict, indent=2, default=str))

What each parameter does:

Additional options you can add:

# Extended job configuration options
job_dict = ds.jobs.generate(
 app_id=app_id_to_use,
 input_dir_uri=input_uri,
 script_filename=input_filename,
 max_minutes=max_job_minutes,
 allocation=tacc_allocation,
 
 # Resource configuration
 node_count=1, # Number of compute nodes
 cores_per_node=48, # Cores per node (max depends on system)
 memory_mb=192000, # Memory in MB per node
 queue="skx-dev", # Queue: "skx-dev", "skx", "normal", etc.
 
 # Job metadata
 job_name="my_mpm_simulation", # Custom job name
 description="Large deformation analysis", # Job description
 tags=["research", "mpm", "geomechanics"], # Searchable tags
 
 # Archive configuration
 archive_system="designsafe", # Where to store results
 archive_path="mpm-results", # Custom archive subdirectory
)

Step 6: Customize Resources

# Customize job settings (optional)
job_dict["nodeCount"] = 1 # Use single node
job_dict["coresPerNode"] = 1 # Use single core for small problems
print(json.dumps(job_dict, indent=2, default=str))

What this does:

Resource guidelines:

# Resource selection guidelines for MPM
resources = {
 "small_case": {"nodes": 1, "cores": 1, "time": 30}, # < 10K particles
 "medium_case": {"nodes": 1, "cores": 16, "time": 120}, # 10K - 100K particles
 "large_case": {"nodes": 2, "cores": 48, "time": 480}, # > 100K particles
}

Step 7: Submit Job

# Submit the job to TACC
submitted_job = ds.jobs.submit(job_dict)
print(f"Job UUID: {submitted_job.uuid}")

What this does:

Step 8: Monitor Job

# Monitor job execution until completion
final_status = submitted_job.monitor(interval=15) # Check every 15 seconds
print(f"Job {submitted_job.uuid} finished with status: {final_status}")

What this does:

Job status meanings:

job_statuses = {
 "PENDING": "Job submitted but not yet processed",
 "PROCESSING_INPUTS": "Input files being staged",
 "QUEUED": "Job waiting in scheduler queue",
 "RUNNING": "Job actively executing",
 "ARCHIVING": "Output files being archived",
 "FINISHED": "Job completed successfully",
 "FAILED": "Job failed during execution"
}

Step 9: Check Results

# Interpret and display job outcome
ds.jobs.interpret_status(final_status, submitted_job.uuid)

# Display job runtime summary
submitted_job.print_runtime_summary(verbose=False)

# Get current job status
current_status = ds.jobs.status(submitted_job.uuid)
print(f"Current status: {current_status}")

# Display last status message from TACC
print(f"Last message: {submitted_job.last_message}")

What each command does:

Step 10: View Job Output

# Display job output from stdout
stdout_content = submitted_job.get_output_content("tapisjob.out", max_lines=50)
if stdout_content:
 print("Job output:")
 print(stdout_content)

What this does:

Typical MPM output includes:

# Example MPM console output:
mpm_output_info = {
 "git_revision": "Version information",
 "step_progress": "Step: 1 of 1000",
 "warnings": "Material sets, boundary conditions",
 "solver_duration": "Explicit USF solver duration: 285 ms",
 "completion": "Job execution finished"
}

Step 11: Access Results

# List contents of job archive directory
archive_uri = submitted_job.archive_uri
print(f"Archive URI: {archive_uri}")
outputs = ds.files.list(archive_uri)
for item in outputs:
 print(f"- {item.name} ({item.type})")

What this does:

Typical MPM output files:

typical_outputs = {
 "inputDirectory/": "Copy of your input directory with results",
 "tapisjob.out": "Console output from MPM simulation",
 "tapisjob.err": "Error messages (if any)",
 "tapisjob.sh": "Job script that was executed",
 "results/": "VTK files for visualization (particles, stresses, velocities)",
 "*.vtu": "Paraview-compatible visualization files"
}

Post-processing Results

Extract and Analyze Output

# Convert archive URI to local path for analysis
archive_path = ds.files.to_path(archive_uri)
print(f"Archive path: {archive_path}")

# Import analysis libraries
import numpy as np
import matplotlib.pyplot as plt
import os

# Navigate to results directory
results_path = os.path.join(archive_path, "inputDirectory", "results")
if os.path.exists(results_path):
 print(f"Results directory: {results_path}")
 
 # List VTK output files
 vtk_files = [f for f in os.listdir(results_path) if f.endswith('.vtu')]
 print(f"Found {len(vtk_files)} VTK files for visualization")
 
 # Example: Load and analyze particle data (requires appropriate library)
 # Note: Actual VTK analysis would require packages like vtk or pyvista
 print("Use ParaView or Python VTK libraries to visualize results")
else:
 print("No results directory found - check job completion status")

What this does: