get_tapis_job_metadata()#
get_tapis_job_metadata(t,jobUuid)
Fetches metadata for a specified Tapis job and prints a structured summary including UUID, name, status, app ID, creation time, and archive location. If the job has completed, it intelligently reconstructs the expected local archive directory path under ~/MyData/tapis-jobs-archive, checks whether this directory exists, lists its contents, and returns all this information in a structured dictionary.
Inputs:
t (Tapis client object): Authenticated Tapis client instance (from tapis3 or py-tapis) used to query job details.
jobUuid (str): UUID of the job to query.
Outputs:
Returns a dictionary with keys:
Key
Type
Description
local_path
str or None
Local archive directory path if finished, else None.
exists
bool
True if the local directory exists.
files
list of str
Files in the directory if it exists.
message
str (optional)
Message describing status if data not available.
Behavior:
Prints a structured summary of job metadata.
Carefully reconstructs local archive path to avoid hardcoded assumptions.
Lists files if the local directory exists, or prints an explanatory message if not.
Example usage:
result = get_tapis_job_metadata(t, "a1b2c3d4-5678-90ef-ghij-klmnopqrstuv")
if result["exists"]:
print("Local data directory:", result["local_path"])
print("Files:", result["files"])
else:
print(result.get("message", "No data yet."))
Files#
You can find these files in Community Data.
get_tapis_job_metadata.py
def get_tapis_job_metadata(t, jobUuid,printAll = True):
"""
Retrieves and prints metadata for a specified Tapis job, including robust local archive reconstruction.
This function queries the Tapis jobs API for metadata on a given job, printing
details such as UUID, name, status, application ID, creation time, and archive location.
If the job has completed successfully (status == 'FINISHED'), it reconstructs
the expected local archive directory path under '~/MyData/tapis-jobs-archive',
checks whether it exists, lists its contents, and returns this information
in a structured dictionary.
If the job is not yet finished or the local directory does not exist, it prints
a notice and returns a dictionary describing the situation.
Parameters
----------
t : object
An authenticated Tapis client instance (e.g., from `tapis3`).
jobUuid : str
UUID of the job whose metadata is to be retrieved.
Returns
-------
dict
A dictionary with the following keys:
- "local_path" (str or None): local archive directory path if job is finished, else None.
- "exists" (bool): True if the local directory exists.
- "files" (list of str): list of files in the directory if it exists.
- "message" (str, optional): explanatory message if no data is available.
Prints
------
- Job UUID, name, status, appId, creation time, and archive directory.
- If finished, the reconstructed local archive path and the files contained within it,
or a notice if the local directory does not exist.
Example
-------
>>> result = get_tapis_job_metadata(t, "a1b2c3d4-5678-90ef-ghij-klmnopqrstuv")
>>> if result["exists"]:
... print("Archived job data at:", result["local_path"])
... print("Files:", result["files"])
... else:
... print(result.get("message", "Job not yet completed."))
"""
# Silvia Mazzoni, 2025
import os
import json
if printAll:
import ipywidgets as widgets
from IPython.display import display, clear_output
metadata_out = widgets.Output()
metadata_accordion = widgets.Accordion(children=[metadata_out])
metadata_accordion.set_title(0, f'Job Metadata ({jobUuid})')
metadata_accordion.selected_index = 0
display(metadata_accordion)
job_response = t.jobs.getJob(jobUuid=jobUuid)
job_dict_all = json.loads(json.dumps(job_response, default=lambda o: vars(o)))
if printAll:
with metadata_out:
# print('+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
print('+++++++++++++++++++++++++')
print('++++++ Job Metadata')
print('+++++++++++++++++++++++++')
print('+ uuid: ', job_response.uuid)
print('+ name: ', job_response.name)
print('+ status: ', job_response.status)
print('+ appId: ', job_response.appId)
print('+ created: ', job_response.created)
print('+ output-file location')
print('+ archiveSystemId:', job_response.archiveSystemId)
print('+ archiveSystemDir:', job_response.archiveSystemDir)
# ------
execSystemId = job_response.execSystemId
archiveSystemId = job_response.archiveSystemId
sysPath = ''
if archiveSystemId == 'designsafe.storage.default':
sysPath = 'MyData';
elif archiveSystemId in ['Work','work','cloud.data','stampede3',execSystemId]:
sysPath = f'Work/{execSystemId}';
if archiveSystemId == 'designsafe.storage.community':
sysPath = 'CommunityData'; ## but you can't write to community
if archiveSystemId == 'designsafe.storage.published':
sysPath = 'Published'; ## but you can't write to published!
archiveSystemDir = job_dict_all['archiveSystemDir']
archiveSystemDir_user = archiveSystemDir.split('tapis-jobs-archive'+os.path.sep)[1] # remove the first character slash
archiveSystemDir_user = os.path.join(sysPath,'tapis-jobs-archive',archiveSystemDir_user)
archiveSystemDir_out = archiveSystemDir_user
if job_response.appId in ["opensees-mp-s3","opensees-sp-s3"]:
archiveSystemDir_out = os.path.join(archiveSystemDir_out,'inputDirectory')
elif job_response.appId in ["opensees-express"]:
fileInputsList = json.loads(job_dict_all['fileInputs'])
for fileInputs in fileInputsList:
if fileInputs['name'] in ['Input Directory'] or fileInputs['envKey'] in ['inputDirectory']:
sourceUrl = fileInputs['sourceUrl']
input_folder_end = os.path.basename(sourceUrl)
archiveSystemDir_out = os.path.join(archiveSystemDir_user,input_folder_end)
break
# if sysPath != '':
archiveSystemDir_user = os.path.expanduser(os.path.join('~',archiveSystemDir_user))
archiveSystemDir_out = os.path.expanduser(os.path.join('~',archiveSystemDir_out))
job_dict_all['archiveSystemDir_user'] = archiveSystemDir_user
job_dict_all['archiveSystemDir_out'] = archiveSystemDir_out
if printAll:
with metadata_out:
print('+ archiveSystemDir_user:', job_dict_all['archiveSystemDir_user'])
print('+ archiveSystemDir_out:', job_dict_all['archiveSystemDir_out'])
JobInfoKeys = ['','uuid','name','','status','remoteOutcome','condition','lastMessage','','execSystemId','execSystemExecDir','execSystemOutputDir','','appId','appVersion','','tenant','trackingId','createdby','created','description','','execSystemId','execSystemLogicalQueue','nodeCount','coresPerNode','maxMinutes','memoryMB']
# print('\n-- Additional Relevant Job Metadata --')
print('+ ++++++++++++++++++++++++')
print('+ +++++ Additional Relevant Job Metadata')
print('+ ++++++++++++++++++++++++')
for thisKey in JobInfoKeys:
if thisKey in job_dict_all.keys():
thisValue = job_dict_all[thisKey]
# myJobTapisData[thisKey] = thisValue
print(f'+ - {thisKey}: {thisValue}')
else:
print('+ ----------------------')
print('+ ++++++++++++++++++++++++')
if 'fileInputs' in job_dict_all.keys():
this_out = widgets.Output()
this_accordion = widgets.Accordion(children=[this_out])
this_accordion.set_title(0, f'fileInputs')
# this_accordion.selected_index = 0
with metadata_out:
display(this_accordion)
with this_out:
print('')
print('#'*32)
print('### fileInputs')
print('#'*32)
fileInputsList = json.loads(job_dict_all['fileInputs'])
for thisLine in fileInputsList:
print('# ' + '+'*30)
for thisKey,thisKeyVal in thisLine.items():
print(f'# + {thisKey} = {thisKeyVal}')
print('# ' + '+'*30)
print('#'*32)
if 'parameterSet' in job_dict_all.keys():
this_out = widgets.Output()
this_accordion = widgets.Accordion(children=[this_out])
this_accordion.set_title(0, f'parameterSet')
# this_accordion.selected_index = 0
with metadata_out:
display(this_accordion)
with this_out:
print('')
print('#'*32)
print('### parameterSet')
print('#'*32)
parameterSetDict = json.loads(job_dict_all['parameterSet'])
for thisLineKey,thisLineValues in parameterSetDict.items():
print('#')
print('# ' + '+'*30)
print(f'# ++ {thisLineKey} ')
print('# ' + '+'*30)
if isinstance(thisLineValues, list):
for thisLine in thisLineValues:
print('# ' + '+ ' + '-'*28)
if isinstance(thisLine, dict):
for thisKey,thisVal in thisLine.items():
if thisVal != None:
print(f'# + - {thisKey} : {thisVal}')
else:
print('####################not dict',thisLine)
print('# ' + '+ ' + '-'*28)
elif isinstance(thisLineValues, dict):
for thisKey,thisVal in thisLineValues.items():
if thisVal != None:
print(f'# + {thisKey} : {thisVal}')
else:
print('not list nor dict',thisLine)
print('# ' + '+'*30)
print('#'*32)
return job_dict_all