From 64c0ab71a82dea0ee04dd8f7d7d3605734936507 Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Tue, 10 Jun 2025 10:51:08 -0500 Subject: [PATCH 1/6] Update docs --- README.md | 9 +- docs/examples/mpm.md | 821 ++++++++++++-------------------------- docs/examples/openfoam.md | 15 +- docs/examples/opensees.md | 442 ++++++++++++++------ 4 files changed, 579 insertions(+), 708 deletions(-) diff --git a/README.md b/README.md index 6a3c845..b60dff6 100644 --- a/README.md +++ b/README.md @@ -148,11 +148,4 @@ poetry install poetry run mkdocs serve ``` -This will start a local server at `http://127.0.0.1:8000/dapi/` where you can view the documentation. - -### API docs -To generate API docs: - -``` -pdoc --html --output-dir api-docs dapi --force -``` \ No newline at end of file +This will start a local server at `http://127.0.0.1:8000/dapi/` where you can view the documentation. \ No newline at end of file diff --git a/docs/examples/mpm.md b/docs/examples/mpm.md index 49ad449..151fc63 100644 --- a/docs/examples/mpm.md +++ b/docs/examples/mpm.md @@ -1,656 +1,339 @@ # MPM Job Submission Example -This comprehensive example demonstrates how to submit and monitor a Material Point Method (MPM) job using dapi. MPM is a particle-based method for simulating large deformation problems in geomechanics and fluid mechanics. +This example demonstrates how to submit and monitor a Material Point Method (MPM) simulation using dapi. MPM is a particle-based numerical method for simulating large deformation problems in geomechanics and fluid mechanics. [![Try on DesignSafe](https://raw.githubusercontent.com/DesignSafe-CI/dapi/main/DesignSafe-Badge.svg)](https://jupyter.designsafe-ci.org/hub/user-redirect/lab/tree/CommunityData/dapi/mpm/mpm-minimal.ipynb) -> More detailed example -[![Try on DesignSafe](https://raw.githubusercontent.com/DesignSafe-CI/dapi/main/DesignSafe-Badge.svg)](https://jupyter.designsafe-ci.org/hub/user-redirect/lab/tree/CommunityData/dapi/mpm/mpm.ipynb) - - ## 🎯 Overview -This example covers: +This example covers the essential workflow for running MPM simulations: -- Setting up authentication and environment -- Discovering available MPM applications -- Preparing input files and directories -- Generating and customizing job requests -- Submitting and monitoring jobs -- Accessing and analyzing results +- Installing and importing dapi +- Setting up MPM job parameters and input files +- Configuring analysis types, materials, and boundary conditions +- Submitting and monitoring MPM jobs +- Post-processing results and output analysis ## πŸš€ Complete Example -### Step 1: Setup and Authentication +### Step 1: Install and Import dapi ```python -import os -from dapi import ( - DSClient, - SubmittedJob, - interpret_job_status, - AppDiscoveryError, - FileOperationError, - JobSubmissionError, - JobMonitorError, - STATUS_TIMEOUT, - STATUS_UNKNOWN, - TAPIS_TERMINAL_STATES, -) +# Install dapi package +!pip install dapi --user --quiet + +# Import required modules +from dapi import DSClient import json -from datetime import datetime -import pandas as pd - -# Initialize DSClient with authentication -try: - print("Initializing DSClient...") - ds = DSClient() - print("DSClient initialized successfully.") -except Exception as e: - print(f"Initialization failed: {e}") - raise SystemExit("Stopping due to client initialization failure.") ``` -### Step 2: Configure Job Parameters +**What this does:** +- Installs the DesignSafe API package from PyPI +- Imports the main client class and JSON for pretty-printing job requests + +### Step 2: Initialize Client ```python -# Job configuration -ds_path: str = "/MyData/mpm-benchmarks/2d/uniaxial_stress/" -input_filename: str = "mpm.json" -max_job_minutes: int = 10 -tacc_allocation: str = "ASC25049" # Replace with your allocation -app_id_to_use = "mpm-s3" - -# Optional queue override (use default if not specified) -# queue: str = "skx" # Uncomment to override default queue +# Initialize DesignSafe client +ds = DSClient() ``` -### Step 3: Verify Input Path and Files +**What this does:** + +- Creates an authenticated connection to DesignSafe services +- Handles OAuth2 authentication automatically +- Sets up connections to Tapis API, file systems, and job services + +**Authentication:** dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the [authentication guide](../authentication.md). + +### Step 3: Configure Job Parameters ```python -# Translate and verify DesignSafe path -try: - input_uri = ds.files.translate_path_to_uri(ds_path, verify_exists=True) - print(f"βœ… Input Directory URI: {input_uri}") - - # List files in the input directory - print("\nπŸ“ Files in input directory:") - files = ds.files.list(input_uri) - for file in files: - print(f" - {file.name} ({file.type}, {file.size} bytes)") - - # Verify required input file exists - input_files = [f.name for f in files] - if input_filename not in input_files: - raise FileOperationError(f"Required file '{input_filename}' not found in {ds_path}") - print(f"βœ… Required input file '{input_filename}' found") - -except FileOperationError as e: - print(f"❌ Path verification failed: {e}") - raise SystemExit("Stopping due to path verification error.") +# Job configuration parameters +ds_path: str = "/CommunityData/dapi/mpm/uniaxial_stress/" # Path to MPM input files +input_filename: str = "mpm.json" # Main MPM configuration file +max_job_minutes: int = 10 # Maximum runtime in minutes +tacc_allocation: str = "ASC25049" # TACC allocation to charge +app_id_to_use = "mpm-s3" # MPM application ID ``` -### Step 4: Discover and Inspect MPM Applications +**What each parameter does:** + +- **`ds_path`**: DesignSafe path to your MPM case directory containing input files +- **`input_filename`**: Main MPM configuration file (typically .json format) +- **`max_job_minutes`**: Maximum wall-clock time for the job (prevents runaway simulations) +- **`tacc_allocation`**: Your TACC allocation account (required for compute time billing) +- **`app_id_to_use`**: Specific MPM application version on DesignSafe +**MPM input json file structure:** ```python -# Find available MPM applications -print("\nπŸ” Discovering available applications...") -try: - mpm_apps = ds.apps.find("mpm", verbose=True) - print(f"Found {len(mpm_apps)} MPM applications") - - # Get detailed information about the specific app - app_details = ds.apps.get_details(app_id_to_use, verbose=True) - if not app_details: - raise AppDiscoveryError(f"App '{app_id_to_use}' not found") - - print(f"\nπŸ“‹ Using Application: {app_details.id}") - print(f" Version: {app_details.version}") - print(f" Description: {app_details.description}") - print(f" Execution System: {app_details.jobAttributes.execSystemId}") - print(f" Default Queue: {app_details.jobAttributes.execSystemLogicalQueue}") - print(f" Max Runtime: {app_details.jobAttributes.maxMinutes} minutes") - print(f" Default Cores: {app_details.jobAttributes.coresPerNode}") - -except (AppDiscoveryError, Exception) as e: - print(f"❌ App discovery failed: {e}") - raise SystemExit("Stopping due to app discovery error.") +# Typical MPM configuration file contains: +mpm_config = { + "mesh": "mesh.txt", # Computational mesh definition + "particles": "particles.txt", # Material point locations and properties + "materials": { # Material constitutive models + "LinearElastic2D": "For elastic analysis", + "MohrCoulomb": "For soil mechanics", + "NeoHookean": "For large deformation" + }, + "analysis": { + "type": "MPMExplicit2D", # Analysis type: 2D or 3D explicit + "nsteps": 1000, # Number of time steps + "dt": 0.001 # Time step size + } +} ``` -### Step 5: Generate Job Request +### Step 4: Convert Path to URI ```python -# Generate job request with automatic parameter mapping -try: - print("\nβš™οΈ Generating job request...") - job_dict = ds.jobs.generate_request( - app_id=app_id_to_use, - input_dir_uri=input_uri, - script_filename=input_filename, - max_minutes=max_job_minutes, - allocation=tacc_allocation, - # Optional parameters - job_name=f"mpm_uniaxial_stress_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - description="MPM simulation of uniaxial stress test using dapi", - tags=["research", "mpm", "geomechanics", "uniaxial-stress"], - node_count=1, - cores_per_node=1, # Start with single core for testing - ) - - print("βœ… Job request generated successfully") - - # Display the generated job request - print("\nπŸ“„ Generated Job Request:") - print(json.dumps(job_dict, indent=2, default=str)) - -except (AppDiscoveryError, ValueError, JobSubmissionError) as e: - print(f"❌ Job request generation failed: {e}") - raise SystemExit("Stopping due to job request generation error.") +# Convert DesignSafe path to Tapis URI format +input_uri = ds.files.translate_path_to_uri(ds_path) +print(f"Input Directory Tapis URI: {input_uri}") ``` -### Step 6: Customize Job Request (Optional) +**What this does:** + +- Converts human-readable DesignSafe paths (like `/MyData/...`) to Tapis URI format +- Tapis URIs are required for job submission and follow the pattern: `tapis://system/path` +- Automatically detects your username and the correct storage system + +### Step 5: Generate Job Request ```python -# Optional: Modify job request before submission -print("\nπŸ”§ Customizing job request...") - -# Ensure we're using minimal resources for this example -job_dict["nodeCount"] = 1 -job_dict["coresPerNode"] = 1 -job_dict["maxMinutes"] = max_job_minutes - -# Add environment variables if needed -if "parameterSet" not in job_dict: - job_dict["parameterSet"] = {} -if "envVariables" not in job_dict["parameterSet"]: - job_dict["parameterSet"]["envVariables"] = [] - -# Example: Add OpenMP thread control -job_dict["parameterSet"]["envVariables"].append({ - "key": "OMP_NUM_THREADS", - "value": "1" -}) - -print("βœ… Job request customized") -print(f" Nodes: {job_dict['nodeCount']}") -print(f" Cores per node: {job_dict['coresPerNode']}") -print(f" Max runtime: {job_dict['maxMinutes']} minutes") +# Generate job request dictionary using app defaults +job_dict = ds.jobs.generate_request( + app_id=app_id_to_use, + input_dir_uri=input_uri, + script_filename=input_filename, + max_minutes=max_job_minutes, + allocation=tacc_allocation, + archive_system="designsafe", + # MPM-specific job metadata + job_name=f"mpm_uniaxial_stress_{datetime.now().strftime('%Y%m%d_%H%M%S')}", + description="MPM simulation of uniaxial stress test", + tags=["research", "mpm", "geomechanics", "uniaxial-stress"] +) +print(json.dumps(job_dict, indent=2, default=str)) ``` -### Step 7: Submit Job +**What each parameter does:** + +- **`app_id`**: Specifies which MPM application to run +- **`input_dir_uri`**: Location of your MPM input files +- **`script_filename`**: Main MPM configuration file (mpm.json) +- **`max_minutes`**: Job timeout (prevents infinite runs) +- **`allocation`**: TACC account to charge for compute time +- **`archive_system`**: Where to store results ("designsafe" = your MyData folder) + +**Additional options you can add:** ```python -# Submit the job -try: - print("\nπŸš€ Submitting job...") - submitted_job = ds.jobs.submit_request(job_dict) - print(f"βœ… Job submitted successfully!") - print(f" Job UUID: {submitted_job.uuid}") - print(f" Job Name: {job_dict['name']}") +# Extended job configuration options +job_dict = ds.jobs.generate_request( + app_id=app_id_to_use, + input_dir_uri=input_uri, + script_filename=input_filename, + max_minutes=max_job_minutes, + allocation=tacc_allocation, + + # Resource configuration + node_count=1, # Number of compute nodes + cores_per_node=48, # Cores per node (max depends on system) + memory_mb=192000, # Memory in MB per node + queue="skx-dev", # Queue: "skx-dev", "skx", "normal", etc. - # Save job UUID for later reference - job_uuid = submitted_job.uuid - with open("current_mpm_job.txt", "w") as f: - f.write(f"{job_uuid}\n{datetime.now().isoformat()}\n") - print(f" Job UUID saved to: current_mpm_job.txt") + # Job metadata + job_name="my_mpm_simulation", # Custom job name + description="Large deformation analysis", # Job description + tags=["research", "mpm", "geomechanics"], # Searchable tags -except JobSubmissionError as e: - print(f"❌ Job submission failed: {e}") - print("\nπŸ” Failed Job Request Details:") - print(json.dumps(job_dict, indent=2, default=str)) - raise SystemExit("Stopping due to job submission error.") + # Archive configuration + archive_system="designsafe", # Where to store results + archive_path="mpm-results", # Custom archive subdirectory +) ``` -### Step 8: Monitor Job Progress +### Step 6: Customize Resources ```python -# Monitor job with real-time progress -try: - print(f"\nπŸ‘€ Monitoring job {submitted_job.uuid}...") - print(" Use Ctrl+C to interrupt monitoring (job will continue running)") - - # Monitor with custom interval - final_status = submitted_job.monitor(interval=15) # Check every 15 seconds - - print(f"\n🏁 Job monitoring completed!") - print(f" Final Status: {final_status}") - -except KeyboardInterrupt: - print(f"\n⏸️ Monitoring interrupted by user") - print(f" Job is still running. UUID: {submitted_job.uuid}") - print(f" Check status later with: ds.jobs.get_status('{submitted_job.uuid}')") - final_status = "INTERRUPTED" -except Exception as e: - print(f"\n❌ Monitoring error: {e}") - final_status = "MONITOR_ERROR" +# Customize job settings (optional) +job_dict["nodeCount"] = 1 # Use single node +job_dict["coresPerNode"] = 1 # Use single core for small problems +print(json.dumps(job_dict, indent=2, default=str)) ``` -### Step 9: Interpret Results +**What this does:** +- Overrides default resource allocation from the app +- `nodeCount`: Number of compute nodes (1 for small jobs, multiple for large simulations) +- `coresPerNode`: CPU cores per node (MPM can utilize parallel processing) +- More cores = faster solution but higher cost + +**Resource guidelines:** ```python -# Interpret the final job status -print("\nπŸ“Š Job Results Analysis") -print("=" * 50) +# Resource selection guidelines for MPM +resources = { + "small_case": {"nodes": 1, "cores": 1, "time": 30}, # < 10K particles + "medium_case": {"nodes": 1, "cores": 16, "time": 120}, # 10K - 100K particles + "large_case": {"nodes": 2, "cores": 48, "time": 480}, # > 100K particles +} +``` -# Use the built-in status interpretation -ds.jobs.interpret_status(final_status, submitted_job.uuid) +### Step 7: Submit Job -# Additional analysis based on status -if final_status == "FINISHED": - print("\nβœ… SUCCESS: Job completed successfully!") - - # Get runtime summary - try: - print("\n⏱️ Runtime Summary:") - submitted_job.print_runtime_summary(verbose=False) - except Exception as e: - print(f"Could not display runtime summary: {e}") - -elif final_status == "FAILED": - print("\n❌ FAILURE: Job failed during execution") - print("πŸ’‘ Check the troubleshooting section below for common issues") - -elif final_status in ["TIMEOUT", "INTERRUPTED"]: - print(f"\n⚠️ WARNING: Job monitoring {final_status.lower()}") - print(" Job may still be running on the compute system") - - # Check current status - try: - current_status = submitted_job.get_status() - print(f" Current job status: {current_status}") - except Exception as e: - print(f" Could not check current status: {e}") +```python +# Submit the job to TACC +submitted_job = ds.jobs.submit_request(job_dict) +print(f"Job UUID: {submitted_job.uuid}") ``` -### Step 10: Access Job Outputs +**What this does:** + +- Sends the job request to TACC's job scheduler +- Returns a `SubmittedJob` object for monitoring +- Job UUID is a unique identifier for tracking + +### Step 8: Monitor Job ```python -# Access job outputs and results -if final_status in TAPIS_TERMINAL_STATES: - print("\nπŸ“ Accessing Job Outputs") - print("-" * 30) - - try: - # Get archive URI - archive_uri = submitted_job.archive_uri - if archive_uri: - print(f"πŸ“ Job Archive URI: {archive_uri}") - - # List all files in the archive - print("\nπŸ“‹ Files in job archive:") - outputs = submitted_job.list_outputs() - for output in outputs: - size_mb = output.size / (1024 * 1024) if output.size else 0 - print(f" - {output.name} ({output.type}, {size_mb:.2f} MB)") - - # Read job output log - print("\nπŸ“œ Job Output (last 50 lines):") - stdout_content = submitted_job.get_output_content( - "tapisjob.out", - max_lines=50, - missing_ok=True - ) - if stdout_content: - print("```") - print(stdout_content) - print("```") - else: - print(" No output log available") - - # Check for errors if job failed - if final_status == "FAILED": - print("\n🚨 Error Analysis:") - stderr_content = submitted_job.get_output_content( - "tapisjob.err", - max_lines=50, - missing_ok=True - ) - if stderr_content: - print("Error log:") - print("```") - print(stderr_content) - print("```") - else: - print(" No error log found (errors may be in main output)") - - else: - print("❌ Archive URI not available") - - except FileOperationError as e: - print(f"❌ Could not access job outputs: {e}") - except Exception as e: - print(f"❌ Unexpected error accessing outputs: {e}") +# Monitor job execution until completion +final_status = submitted_job.monitor(interval=15) # Check every 15 seconds +print(f"Job {submitted_job.uuid} finished with status: {final_status}") ``` -### Step 11: Download Results (Optional) +**What this does:** +- Polls job status at specified intervals (15 seconds) +- Shows progress bars for different job phases +- Returns final status when job completes +- `interval=15` means check every 15 seconds (can be adjusted) + +**Job status meanings:** ```python -# Download specific result files -if final_status == "FINISHED": - print("\nπŸ’Ύ Downloading Results") - print("-" * 25) - - try: - # Create local results directory - local_results_dir = f"mpm_results_{job_uuid[:8]}" - os.makedirs(local_results_dir, exist_ok=True) - - # Download key files - files_to_download = ["tapisjob.out", "tapisjob.err"] - - for filename in files_to_download: - try: - local_path = os.path.join(local_results_dir, filename) - submitted_job.download_output(filename, local_path) - print(f"βœ… Downloaded: {filename} -> {local_path}") - except Exception as e: - print(f"⚠️ Could not download {filename}: {e}") - - # List any MPM-specific output files and download them - try: - outputs = submitted_job.list_outputs() - mpm_files = [f for f in outputs if f.name.endswith(('.vtu', '.vtk', '.csv', '.txt')) - and f.type == 'file'] - - for mpm_file in mpm_files: - try: - local_path = os.path.join(local_results_dir, mpm_file.name) - submitted_job.download_output(mpm_file.name, local_path) - print(f"βœ… Downloaded MPM result: {mpm_file.name} -> {local_path}") - except Exception as e: - print(f"⚠️ Could not download {mpm_file.name}: {e}") - - except Exception as e: - print(f"⚠️ Could not list/download MPM output files: {e}") - - print(f"\nπŸ“ Results saved in: {local_results_dir}/") - - except Exception as e: - print(f"❌ Error during download: {e}") +job_statuses = { + "PENDING": "Job submitted but not yet processed", + "PROCESSING_INPUTS": "Input files being staged", + "QUEUED": "Job waiting in scheduler queue", + "RUNNING": "Job actively executing", + "ARCHIVING": "Output files being archived", + "FINISHED": "Job completed successfully", + "FAILED": "Job failed during execution" +} ``` -### Step 12: Status Summary and Next Steps +### Step 9: Check Results ```python -# Final summary -print("\n" + "=" * 60) -print("🎯 MPM Job Submission Summary") -print("=" * 60) -print(f"Job UUID: {submitted_job.uuid}") -print(f"Job Name: {job_dict['name']}") -print(f"Final Status: {final_status}") -print(f"Application: {app_id_to_use}") -print(f"Input Directory: {ds_path}") -print(f"Input File: {input_filename}") - -if final_status == "FINISHED": - print("βœ… Status: SUCCESS - Job completed successfully") - print("πŸ’‘ Next steps:") - print(" - Analyze results in the downloaded files") - print(" - Visualize outputs using ParaView (for .vtu files)") - print(" - Compare with expected results") - -elif final_status == "FAILED": - print("❌ Status: FAILED - Job execution failed") - print("πŸ’‘ Troubleshooting steps:") - print(" - Review error logs above") - print(" - Check input file format and parameters") - print(" - Verify resource requirements are reasonable") - print(" - Contact support if issues persist") - -elif final_status in ["TIMEOUT", "INTERRUPTED"]: - print(f"⚠️ Status: {final_status} - Monitoring stopped") - print("πŸ’‘ Check job status later:") - print(f" current_status = ds.jobs.get_status('{submitted_job.uuid}')") - -else: - print(f"❓ Status: {final_status} - Unexpected final status") +# Interpret and display job outcome +ds.jobs.interpret_status(final_status, submitted_job.uuid) + +# Display job runtime summary +submitted_job.print_runtime_summary(verbose=False) + +# Get current job status +current_status = ds.jobs.get_status(submitted_job.uuid) +print(f"Current status: {current_status}") -print("\nπŸ“š For more examples and documentation:") -print(" - https://designsafe-ci.github.io/dapi") -print(" - https://github.com/DesignSafe-CI/dapi") +# Display last status message from TACC +print(f"Last message: {submitted_job.last_message}") ``` -## πŸ”§ Advanced Customization +**What each command does:** -### Custom Resource Requirements +- **`interpret_status`**: Provides human-readable explanation of job outcome +- **`print_runtime_summary`**: Shows time spent in each job phase (queued, running, etc.) +- **`get_status`**: Gets current job status (useful for checking later) +- **`last_message`**: Shows last status message from the job scheduler + +### Step 10: View Job Output ```python -# For larger simulations, customize resources -advanced_job_request = ds.jobs.generate_request( - app_id="mpm-s3", - input_dir_uri=input_uri, - script_filename="large_simulation.json", - - # High-performance configuration - max_minutes=240, # 4 hours - node_count=4, # Multiple nodes - cores_per_node=48, # Full node utilization - memory_mb=192000, # 192 GB RAM - queue="normal", # Production queue - allocation=tacc_allocation, - - # Job metadata - job_name="mpm_large_scale_analysis", - description="Large-scale MPM simulation with multiple materials", - tags=["research", "mpm", "large-scale", "multi-material"], - - # Additional environment variables - extra_env_vars=[ - {"key": "OMP_NUM_THREADS", "value": "48"}, - {"key": "MPM_VERBOSE", "value": "1"}, - {"key": "MPM_OUTPUT_FREQ", "value": "100"} - ] -) +# Display job output from stdout +stdout_content = submitted_job.get_output_content("tapisjob.out", max_lines=50) +if stdout_content: + print("Job output:") + print(stdout_content) ``` -### Parametric Study Example [Work In Progress] +**What this does:** +- `tapisjob.out` contains all console output from your MPM simulation +- `max_lines=50` limits output to last 50 lines (prevents overwhelming output) +- Shows MPM solver progress, timestep information, and timing data +**Typical MPM output includes:** ```python -# Submit multiple jobs with different parameters -parameters = [ - {"friction": 0.1, "cohesion": 1000, "density": 2000}, - {"friction": 0.2, "cohesion": 1500, "density": 2200}, - {"friction": 0.3, "cohesion": 2000, "density": 2400}, -] - -submitted_jobs = [] - -for i, params in enumerate(parameters): - # Create parameter-specific job request - param_job_request = ds.jobs.generate_request( - app_id="mpm-s3", - input_dir_uri=input_uri, - script_filename="parametric_template.json", - max_minutes=60, - allocation=tacc_allocation, - job_name=f"mpm_parametric_{i:03d}", - description=f"Parametric study: friction={params['friction']}, cohesion={params['cohesion']}" - ) - - # Add parameters as environment variables - if "parameterSet" not in param_job_request: - param_job_request["parameterSet"] = {} - if "envVariables" not in param_job_request["parameterSet"]: - param_job_request["parameterSet"]["envVariables"] = [] - - param_job_request["parameterSet"]["envVariables"].extend([ - {"key": "MPM_FRICTION", "value": str(params["friction"])}, - {"key": "MPM_COHESION", "value": str(params["cohesion"])}, - {"key": "MPM_DENSITY", "value": str(params["density"])} - ]) - - # Submit job - job = ds.jobs.submit_request(param_job_request) - submitted_jobs.append((job, params)) - print(f"Submitted parametric job {i+1}/{len(parameters)}: {job.uuid}") - -# Monitor all jobs -print("Monitoring all parametric jobs...") -results = [] -for job, params in submitted_jobs: - final_status = job.monitor() - results.append({ - "job_uuid": job.uuid, - "parameters": params, - "final_status": final_status - }) - -# Summarize results -print("\nParametric Study Results:") -for result in results: - print(f" {result['parameters']} -> {result['final_status']}") +# Example MPM console output: +mpm_output_info = { + "git_revision": "Version information", + "step_progress": "Step: 1 of 1000", + "warnings": "Material sets, boundary conditions", + "solver_duration": "Explicit USF solver duration: 285 ms", + "completion": "Job execution finished" +} ``` -## 🚨 Troubleshooting - -### Common Issues and Solutions +### Step 11: Access Results ```python -# Check system and queue availability -def check_system_status(): - try: - # Check if target system is available - stampede_queues = ds.systems.list_queues("stampede3") - print("βœ… Stampede3 system is accessible") - - # Check specific queue - available_queues = [q.name for q in stampede_queues] - if "skx-dev" in available_queues: - print("βœ… Development queue is available") - else: - print("⚠️ Development queue not found. Available queues:") - for queue in available_queues: - print(f" - {queue}") - - except Exception as e: - print(f"❌ System check failed: {e}") - -# Validate input files -def validate_input_files(): - try: - # Check if input file is valid JSON - with open("local_copy_of_mpm.json", "r") as f: - import json - config = json.load(f) - print("βœ… Input file is valid JSON") - - # Check required fields (example) - required_fields = ["mesh", "particles", "materials"] - missing_fields = [field for field in required_fields if field not in config] - if missing_fields: - print(f"⚠️ Missing required fields: {missing_fields}") - else: - print("βœ… All required fields present") - - except FileNotFoundError: - print("❌ Input file not found locally for validation") - except json.JSONDecodeError as e: - print(f"❌ Invalid JSON format: {e}") - except Exception as e: - print(f"❌ Validation error: {e}") - -# Run diagnostics -print("πŸ” Running diagnostics...") -check_system_status() -validate_input_files() +# List contents of job archive directory +archive_uri = submitted_job.archive_uri +print(f"Archive URI: {archive_uri}") +outputs = ds.files.list(archive_uri) +for item in outputs: + print(f"- {item.name} ({item.type})") ``` -### Resume Monitoring +**What this does:** +- **`archive_uri`**: Location where job results are stored +- **`ds.files.list`**: Lists all files and directories in the archive +- Shows output files like simulation results, visualization data, and logs +**Typical MPM output files:** ```python -# If monitoring was interrupted, resume with saved job UUID -def resume_monitoring(): - try: - with open("current_mpm_job.txt", "r") as f: - lines = f.readlines() - saved_uuid = lines[0].strip() - submission_time = lines[1].strip() if len(lines) > 1 else "Unknown" - - print(f"Resuming monitoring for job: {saved_uuid}") - print(f"Originally submitted: {submission_time}") - - # Create SubmittedJob object - resumed_job = SubmittedJob(ds._tapis, saved_uuid) - - # Check current status - current_status = resumed_job.get_status() - print(f"Current status: {current_status}") - - if current_status not in resumed_job.TERMINAL_STATES: - print("Job is still running, resuming monitoring...") - final_status = resumed_job.monitor() - print(f"Final status: {final_status}") - else: - print("Job has already completed") - final_status = current_status - - return resumed_job, final_status - - except FileNotFoundError: - print("❌ No saved job UUID found") - return None, None - except Exception as e: - print(f"❌ Error resuming monitoring: {e}") - return None, None - -# Example usage: -# resumed_job, final_status = resume_monitoring() +typical_outputs = { + "inputDirectory/": "Copy of your input directory with results", + "tapisjob.out": "Console output from MPM simulation", + "tapisjob.err": "Error messages (if any)", + "tapisjob.sh": "Job script that was executed", + "results/": "VTK files for visualization (particles, stresses, velocities)", + "*.vtu": "Paraview-compatible visualization files" +} ``` -## πŸ“Š Performance Analysis +## πŸ“Š Post-processing Results -### Analyzing Job Performance +### Extract and Analyze Output ```python -def analyze_job_performance(job): - """Analyze job performance and resource utilization""" - try: - # Get runtime summary - print("πŸ“ˆ Performance Analysis") - print("-" * 30) - - job.print_runtime_summary(verbose=True) - - # Get job details - details = job.details - - # Calculate efficiency metrics - total_cores = details.nodeCount * details.coresPerNode - max_runtime_hours = details.maxMinutes / 60 - - print(f"\nπŸ“Š Resource Allocation:") - print(f" Nodes: {details.nodeCount}") - print(f" Cores per node: {details.coresPerNode}") - print(f" Total cores: {total_cores}") - print(f" Memory per node: {details.memoryMB} MB") - print(f" Max runtime: {max_runtime_hours:.2f} hours") - - # Analyze output for performance metrics - output = job.get_output_content("tapisjob.out", missing_ok=True) - if output: - # Look for timing information in MPM output - lines = output.split('\n') - duration_lines = [line for line in lines if 'duration' in line.lower()] - - if duration_lines: - print(f"\n⏱️ Execution Timing:") - for line in duration_lines: - print(f" {line.strip()}") - - print(f"\nπŸ’‘ Optimization suggestions:") - if total_cores == 1: - print(" - Consider using multiple cores for larger simulations") - if max_runtime_hours > 2: - print(" - Consider breaking large simulations into smaller parts") - print(" - Monitor memory usage to optimize allocation") - - except Exception as e: - print(f"❌ Performance analysis failed: {e}") - -# Example usage after job completion: -# if final_status == "FINISHED": -# analyze_job_performance(submitted_job) +# Convert archive URI to local path for analysis +archive_path = ds.files.translate_uri_to_path(archive_uri) +print(f"Archive path: {archive_path}") + +# Import analysis libraries +import numpy as np +import matplotlib.pyplot as plt +import os + +# Navigate to results directory +results_path = os.path.join(archive_path, "inputDirectory", "results") +if os.path.exists(results_path): + print(f"Results directory: {results_path}") + + # List VTK output files + vtk_files = [f for f in os.listdir(results_path) if f.endswith('.vtu')] + print(f"Found {len(vtk_files)} VTK files for visualization") + + # Example: Load and analyze particle data (requires appropriate library) + # Note: Actual VTK analysis would require packages like vtk or pyvista + print("Use ParaView or Python VTK libraries to visualize results") +else: + print("No results directory found - check job completion status") ``` -This comprehensive example demonstrates the complete workflow for submitting and monitoring MPM jobs using dapi, including error handling, result analysis, and advanced features for production use. \ No newline at end of file +**What this does:** + +- **`translate_uri_to_path`**: Converts Tapis URI to local file system path +- **`os.listdir`**: Lists files in the results directory +- **`.vtu files`**: VTK unstructured grid files for visualization +- **ParaView**: Recommended tool for visualizing MPM particle data \ No newline at end of file diff --git a/docs/examples/openfoam.md b/docs/examples/openfoam.md index 2c9c110..9fe3a3b 100644 --- a/docs/examples/openfoam.md +++ b/docs/examples/openfoam.md @@ -39,10 +39,13 @@ ds = DSClient() ``` **What this does:** + - Creates an authenticated connection to DesignSafe services - Handles OAuth2 authentication automatically - Sets up connections to Tapis API, file systems, and job services +**Authentication:** dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the [authentication guide](../authentication.md). + ### Step 3: Configure Job Parameters ```python @@ -92,6 +95,7 @@ print(f"Input Directory Tapis URI: {input_uri}") ``` **What this does:** + - Converts human-readable DesignSafe paths (like `/MyData/...`) to Tapis URI format - Tapis URIs are required for job submission and follow the pattern: `tapis://system/path` - Automatically detects your username and the correct storage system @@ -123,6 +127,7 @@ print(json.dumps(job_dict, indent=2, default=str)) - **`input_dir_param_name`**: OpenFOAM apps expect "Case Directory" not "Input Directory" **Additional options you can add:** + ```python # Extended job configuration options job_dict = ds.jobs.generate_request( @@ -166,6 +171,7 @@ print(json.dumps(job_dict, indent=2, default=str)) ``` **What this does:** + - Overrides default resource allocation from the app - `nodeCount`: Number of compute nodes (1 for small jobs, multiple for large simulations) - `coresPerNode`: CPU cores per node (enables parallel processing) @@ -190,6 +196,7 @@ print(f"Job UUID: {submitted_job.uuid}") ``` **What this does:** + - Sends the job request to TACC's job scheduler - Returns a `SubmittedJob` object for monitoring - Job UUID is a unique identifier for tracking @@ -203,6 +210,7 @@ print(f"Job {submitted_job.uuid} finished with status: {final_status}") ``` **What this does:** + - Polls job status at specified intervals (15 seconds) - Shows progress bars for different job phases - Returns final status when job completes @@ -239,6 +247,7 @@ print(f"Last message: {submitted_job.last_message}") ``` **What each command does:** + - **`interpret_status`**: Provides human-readable explanation of job outcome - **`print_runtime_summary`**: Shows time spent in each job phase (queued, running, etc.) - **`get_status`**: Gets current job status (useful for checking later) @@ -312,6 +321,7 @@ print(f"Loaded force coefficients data with shape: {data.shape}") ``` **What this does:** + - **`translate_uri_to_path`**: Converts Tapis URI to local file system path - **`pandas.read_csv`**: Reads force coefficient data (much cleaner than manual parsing) - **`skiprows=9`**: Skips OpenFOAM header lines @@ -351,6 +361,7 @@ plt.show() ``` **What this does:** + - **`data.iloc[100:, 0]`**: Time values (column 0) starting from row 100 - **`data.iloc[100:, 2]`**: Drag coefficient values (column 2) - **`[100:]`**: Skips initial transient period for cleaner plots @@ -457,6 +468,4 @@ complete_job = ds.jobs.generate_request( # Advanced options input_dir_param_name="Case Directory", ) -``` - -This streamlined approach focuses on the essential workflow while explaining what each step accomplishes, making it easy to understand and modify for different OpenFOAM simulations. \ No newline at end of file +``` \ No newline at end of file diff --git a/docs/examples/opensees.md b/docs/examples/opensees.md index dc8f67c..bf460ec 100644 --- a/docs/examples/opensees.md +++ b/docs/examples/opensees.md @@ -1,66 +1,96 @@ # OpenSees Job Submission Example -This comprehensive example demonstrates how to submit and monitor an OpenSees job using dapi. OpenSees is a software framework for developing applications to simulate earthquake engineering applications, featuring finite element analysis capabilities for structural and geotechnical systems. +This example demonstrates how to submit and monitor an OpenSees simulation using dapi. OpenSees is a software framework for developing applications to simulate earthquake engineering applications, featuring finite element analysis capabilities for structural and geotechnical systems. [![Try on DesignSafe](https://raw.githubusercontent.com/DesignSafe-CI/dapi/main/DesignSafe-Badge.svg)](https://jupyter.designsafe-ci.org/hub/user-redirect/lab/tree/CommunityData/dapi/opensees/opensees-mp/OpenSeesMP-dapi.ipynb) ## 🎯 Overview -This example covers: +This example covers the essential workflow for running OpenSees simulations: -- Setting up authentication and environment -- Configuring OpenSees Multi-Free Field Analysis -- Managing input files and DesignSafe paths -- Generating job requests with custom archive settings -- Submitting and monitoring OpenSees jobs -- Downloading and postprocessing results with plotting +- Installing and importing dapi +- Setting up OpenSees job parameters and script files +- Configuring multi-processor runs and resource allocation +- Submitting and monitoring structural analysis jobs +- Post-processing results with response spectra analysis ## πŸš€ Complete Example -### Step 1: Setup and Authentication +### Step 1: Install and Import dapi ```python -# Install required packages -!pip install dapi +# Install dapi package +!pip install dapi --user --quiet -# Import DAPI and other required libraries +# Import required modules from dapi import DSClient import os import json -import tempfile -import shutil import numpy as np import matplotlib.pyplot as plt from datetime import date +``` + +**What this does:** +- Installs the DesignSafe API package from PyPI +- Imports the main client class and analysis libraries +- Sets up matplotlib for response spectra plotting +### Step 2: Initialize Client + +```python # Initialize DesignSafe client ds = DSClient() -print("Authentication successful.") ``` -### Step 2: Configure Job Parameters +**What this does:** -```python -# Define DesignSafe paths and job configuration -ds_path = "/home/jupyter/MyData/template-notebooks/tapis3/opensees/OpenSeesMP_multiMotion/DS_input" -print(f"DesignSafe path: {ds_path}") +- Creates an authenticated connection to DesignSafe services +- Handles OAuth2 authentication automatically +- Sets up connections to Tapis API, file systems, and job services -# Translate DesignSafe path to Tapis URI -input_uri = ds.files.translate_path_to_uri(ds_path) -print(f"Input URI: {input_uri}") +**Authentication:** dapi supports multiple authentication methods including environment variables, .env files, and interactive prompts. For detailed authentication setup instructions, see the [authentication guide](../authentication.md). + +### Step 3: Configure Job Parameters +```python # Job configuration parameters -jobname: str = "opensees-MP-multiMotion-dapi" -app_id: str = "opensees-mp-s3" -input_filename: str = "Main_multiMotion.tcl" -control_exec_Dir: str = "DS_input" # Folder with files including input_filename -tacc_allocation: str = "ASC25049" # MUST USE YOUR OWN ALLOCATION !! -control_nodeCount: int = 1 -control_corespernode: int = 16 -max_job_minutes: int = 60 +ds_path = os.getcwd() + "/input" # Path to OpenSees input files +input_filename: str = "Main_multiMotion.tcl" # Main OpenSees script +tacc_allocation: str = "your-allocation" # TACC allocation to charge +app_id: str = "opensees-mp-s3" # OpenSees-MP application ID +max_job_minutes: int = 60 # Maximum runtime in minutes + +# Resource configuration +control_nodeCount: int = 1 # Number of compute nodes +control_corespernode: int = 16 # Cores per node for parallel analysis +``` + +**What each parameter does:** + +- **`ds_path`**: DesignSafe path to your OpenSees case directory containing .tcl files +- **`input_filename`**: Main OpenSees script file (typically .tcl format) +- **`tacc_allocation`**: Your TACC allocation account (required for compute time billing) +- **`app_id`**: Specific OpenSees application version on DesignSafe +- **`max_job_minutes`**: Maximum wall-clock time for the job (prevents runaway simulations) +- **`control_nodeCount`**: Number of compute nodes (typically 1 for most analyses) +- **`control_corespernode`**: CPU cores for parallel processing + +### Step 4: Convert Path to URI + +```python +# Convert DesignSafe path to Tapis URI format +input_uri = ds.files.translate_path_to_uri(ds_path) +print(f"Input Directory Tapis URI: {input_uri}") ``` -### Step 3: Generate Job Request with Custom Archive Settings +**What this does:** + +- Converts human-readable DesignSafe paths (like `/MyData/...`) to Tapis URI format +- Tapis URIs are required for job submission and follow the pattern: `tapis://system/path` +- Automatically detects your username and the correct storage system + +### Step 5: Generate Job Request ```python # Generate job request dictionary using app defaults @@ -73,10 +103,57 @@ job_dict = ds.jobs.generate_request( # Archive configuration for organized result storage archive_system="designsafe", archive_path="opensees-results", # Results go to MyData/opensees-results/ + # OpenSees-specific job metadata + job_name="opensees_multi_motion_analysis", + description="Multi-free field analysis using OpenSees-MP", + tags=["research", "opensees", "earthquake", "site-response"] ) +print(json.dumps(job_dict, indent=2, default=str)) +``` + +**What each parameter does:** +- **`app_id`**: Specifies which OpenSees application to run +- **`input_dir_uri`**: Location of your OpenSees script files +- **`script_filename`**: Main OpenSees .tcl script file +- **`max_minutes`**: Job timeout (prevents infinite runs) +- **`allocation`**: TACC account to charge for compute time +- **`archive_system`**: Where to store results ("designsafe" = your MyData folder) +- **`archive_path`**: Custom subdirectory for organized results + +**Additional options you can add:** + +```python +# Extended job configuration options +job_dict = ds.jobs.generate_request( + app_id=app_id, + input_dir_uri=input_uri, + script_filename=input_filename, + max_minutes=max_job_minutes, + allocation=tacc_allocation, + + # Resource configuration + node_count=1, # Number of compute nodes + cores_per_node=16, # Cores per node (enables parallel processing) + memory_mb=192000, # Memory in MB per node + queue="skx-dev", # Queue: "skx-dev", "skx", "normal", etc. + + # Job metadata + job_name="my_opensees_analysis", # Custom job name + description="Nonlinear site response analysis", # Job description + tags=["research", "opensees", "seismic"], # Searchable tags + + # Archive configuration + archive_system="designsafe", # Where to store results + archive_path="opensees-results/site-response", # Custom archive path +) +``` + +### Step 6: Customize Resources + +```python # Customize job settings -job_dict["name"] = jobname +job_dict["name"] = "opensees-MP-multiMotion-dapi" job_dict["nodeCount"] = control_nodeCount job_dict["coresPerNode"] = control_corespernode @@ -84,26 +161,85 @@ print("Generated job request:") print(json.dumps(job_dict, indent=2, default=str)) ``` -### Step 4: Submit and Monitor Job +**What this does:** + +- Overrides default resource allocation from the app +- `nodeCount`: Number of compute nodes (1 for most OpenSees analyses) +- `coresPerNode`: CPU cores per node (enables parallel element processing) +- More cores = faster solution for large models but higher cost + +**Resource guidelines:** +Visit [OpenSees userguide on DesignSafe](https://www.designsafe-ci.org/user-guide/tools/simulation/opensees/opensees/#decision-matrix-for-opensees-applications) + +### Step 7: Submit Job ```python # Submit job using dapi submitted_job = ds.jobs.submit_request(job_dict) print(f"Job launched with UUID: {submitted_job.uuid}") print("Can also check in DesignSafe portal under - Workspace > Tools & Application > Job Status") +``` + +**What this does:** +- Sends the job request to TACC's job scheduler +- Returns a `SubmittedJob` object for monitoring +- Job UUID is a unique identifier for tracking +- Provides DesignSafe portal link for status checking + +### Step 8: Monitor Job + +```python # Monitor job status using dapi -final_status = submitted_job.monitor(interval=15) +final_status = submitted_job.monitor(interval=15) # Check every 15 seconds print(f"Job finished with status: {final_status}") +``` + +**What this does:** + +- Polls job status at specified intervals (15 seconds) +- Shows progress bars for different job phases +- Returns final status when job completes +- `interval=15` means check every 15 seconds (can be adjusted) + +**Job status meanings:** +```python +job_statuses = { + "PENDING": "Job submitted but not yet processed", + "PROCESSING_INPUTS": "Input files being staged", + "QUEUED": "Job waiting in scheduler queue", + "RUNNING": "Job actively executing", + "ARCHIVING": "Output files being archived", + "FINISHED": "Job completed successfully", + "FAILED": "Job failed during execution" +} +``` +### Step 9: Check Results + +```python # Interpret job status ds.jobs.interpret_status(final_status, submitted_job.uuid) # Display runtime summary submitted_job.print_runtime_summary(verbose=False) + +# Get current job status +current_status = ds.jobs.get_status(submitted_job.uuid) +print(f"Current status: {current_status}") + +# Display last status message from TACC +print(f"Last message: {submitted_job.last_message}") ``` -### Step 5: Access Job Archive and Results +**What each command does:** + +- **`interpret_status`**: Provides human-readable explanation of job outcome +- **`print_runtime_summary`**: Shows time spent in each job phase (queued, running, etc.) +- **`get_status`**: Gets current job status (useful for checking later) +- **`last_message`**: Shows last status message from the job scheduler + +### Step 10: Access Job Archive and Results ```python # Get archive information using dapi @@ -116,170 +252,220 @@ print(f"Local archive path: {local_archive_path}") # List archive contents archive_files = ds.files.list(archive_uri) -print("\\nArchive contents:") +print("\nArchive contents:") for item in archive_files: print(f"- {item.name} ({item.type})") ``` -### Step 6: Download Results for Postprocessing +**What this does:** + +- **`archive_uri`**: Location where job results are stored +- **`translate_uri_to_path`**: Converts Tapis URI to local path for analysis +- **`ds.files.list`**: Lists all files and directories in the archive +- Shows output files like analysis results, output data, and logs +**Typical OpenSees output files:** ```python -# Download archive files to local directory for postprocessing -import tempfile -import shutil +typical_outputs = { + "inputDirectory/": "Copy of your input directory with results", + "tapisjob.out": "Console output from OpenSees analysis", + "tapisjob.err": "Error messages (if any)", + "tapisjob.sh": "Job script that was executed", + "Profile*_acc*.out": "Acceleration time histories", + "Profile*_Gstress*.out": "Stress time histories", + "opensees.zip": "Compressed results package" +} +``` -# Create temporary directory for downloaded files -temp_dir = tempfile.mkdtemp(prefix="opensees_results_") -print(f"Downloading results to: {temp_dir}") +### Step 11: Access Results from Input Directory +```python # Download the inputDirectory folder which contains results input_dir_archive_uri = f"{archive_uri}/inputDirectory" try: # List contents of inputDirectory in archive input_dir_files = ds.files.list(input_dir_archive_uri) - print("\\nFiles in inputDirectory:") + print("\nFiles in inputDirectory:") for item in input_dir_files: print(f"- {item.name} ({item.type})") + + # Change to the archive directory for post-processing + archive_path = ds.files.translate_uri_to_path(input_dir_archive_uri) + os.chdir(archive_path) + print(f"\nChanged to directory: {archive_path}") - # Download all files from inputDirectory (excluding subdirectories) - files_to_download = [item.name for item in input_dir_files if item.type == "file"] - - print(f"\\nDownloading {len(files_to_download)} files...") - successful_downloads = [] - - for filename in files_to_download: - try: - file_uri = f"{input_dir_archive_uri}/{filename}" - local_path = os.path.join(temp_dir, filename) - - # Try downloading with the ds.files.download method - try: - ds.files.download(file_uri, local_path) - print(f"Downloaded: {filename}") - successful_downloads.append(filename) - except Exception as download_error: - print(f"Standard download failed for {filename}: {download_error}") - - # Try alternative download approach using get_file_content - try: - content = ds.files.get_file_content(file_uri) - with open(local_path, 'wb') as f: - if hasattr(content, 'read'): - shutil.copyfileobj(content, f) - else: - f.write(content) - print(f"Downloaded (alternative method): {filename}") - successful_downloads.append(filename) - except Exception as alt_error: - print(f"Alternative download also failed for {filename}: {alt_error}") - - except Exception as e: - print(f"Could not download {filename}: {e}") - - print(f"\\nSuccessfully downloaded {len(successful_downloads)} out of {len(files_to_download)} files") - except Exception as e: print(f"Error accessing archive: {e}") - -# Change to the temporary directory for postprocessing -os.chdir(temp_dir) -print(f"\\nChanged to directory: {os.getcwd()}") -print("Local files:") -for f in sorted(os.listdir(".")): - print(f"- {f}") ``` -### Step 7: Install Plotting Dependencies and Setup +**What this does:** -```python -# Install matplotlib for plotting -!pip3 install matplotlib +- **`inputDirectory`**: Contains all your original files plus generated outputs +- **`os.chdir`**: Changes to the results directory for analysis +- Lists all available output files for post-processing +- Provides access to acceleration and stress time histories -# Setup matplotlib for inline plotting -%matplotlib inline -import numpy as np -import matplotlib.pyplot as plt -``` +## πŸ“Š Post-processing Results -### Step 8: Create Postprocessing Functions +### Response Spectra Analysis ```python -# Define response spectra function inline +# Define response spectra function def resp_spectra(a, time, nstep): """ This function builds response spectra from acceleration time history, a should be a numpy array, T and nStep should be integers. """ - # add initial zero value to acceleration and change units + # Add initial zero value to acceleration a = np.insert(a, 0, 0) - # number of periods at which spectral values are to be computed + # Number of periods at which spectral values are computed nperiod = 100 - # define range of considered periods by power of 10 + # Define range of considered periods by power of 10 minpower = -3.0 maxpower = 1.0 - # create vector of considered periods + # Create vector of considered periods p = np.logspace(minpower, maxpower, nperiod) - # incremental circular frequency + # Incremental circular frequency dw = 2.0 * np.pi / time - # vector of circular freq + # Vector of circular frequency w = np.arange(0, (nstep + 1) * dw, dw) - # fast fourier transform of acceleration + # Fast fourier transform of acceleration afft = np.fft.fft(a) - # arbitrary stiffness value + # Arbitrary stiffness value k = 1000.0 - # damping ratio + # Damping ratio damp = 0.05 umax = np.zeros(nperiod) vmax = np.zeros(nperiod) amax = np.zeros(nperiod) - # loop to compute spectral values at each period + + # Loop to compute spectral values at each period for j in range(0, nperiod): - # compute mass and dashpot coeff to produce desired periods + # Compute mass and dashpot coefficient for desired periods m = ((p[j] / (2 * np.pi)) ** 2) * k c = 2 * damp * (k * m) ** 0.5 h = np.zeros(nstep + 2, dtype=complex) - # compute transfer function + + # Compute transfer function for l in range(0, int(nstep / 2 + 1)): h[l] = 1.0 / (-m * w[l] * w[l] + 1j * c * w[l] + k) - # mirror image of transfer function + # Mirror image of transfer function h[nstep + 1 - l] = np.conj(h[l]) - # compute displacement in frequency domain using transfer function + # Compute displacement in frequency domain using transfer function qfft = -m * afft u = np.zeros(nstep + 1, dtype=complex) for l in range(0, nstep + 1): u[l] = h[l] * qfft[l] - # compute displacement in time domain (ignore imaginary part) + # Compute displacement in time domain (ignore imaginary part) utime = np.real(np.fft.ifft(u)) - # spectral displacement, velocity, and acceleration + # Spectral displacement, velocity, and acceleration umax[j] = np.max(np.abs(utime)) vmax[j] = (2 * np.pi / p[j]) * umax[j] amax[j] = (2 * np.pi / p[j]) * vmax[j] return p, umax, vmax, amax +``` + +**What this function does:** -# Define plot_acc function inline +- **`resp_spectra`**: Computes response spectra from acceleration time history +- **`p`**: Period vector (logarithmically spaced) +- **`umax, vmax, amax`**: Spectral displacement, velocity, and acceleration +- **`damp=0.05`**: 5% damping ratio (typical for structures) +- **FFT**: Uses Fast Fourier Transform for efficient computation + +### Plot Response Spectra + +```python +# Define plotting function def plot_acc(): """ - Plot acceleration time history and response spectra + Plot acceleration response spectra on log-linear scale """ - plt.figure() + plt.figure(figsize=(10, 6)) + # Plot response spectra for each profile for motion in ["motion1"]: for profile in ["A", "B", "C", "D"]: - acc = np.loadtxt("Profile" + profile + "_acc" + motion + ".out") - [p, umax, vmax, amax] = resp_spectra(acc[:, -1], acc[-1, 0], acc.shape[0]) - plt.semilogx(p, amax) - - # response spectra on log-linear plot - - plt.ylabel("$S_a (g)$") - plt.xlabel("$Period (s)$") + try: + # Load acceleration data + acc = np.loadtxt(f"Profile{profile}_acc{motion}.out") + + # Compute response spectra + [p, umax, vmax, amax] = resp_spectra(acc[:, -1], acc[-1, 0], acc.shape[0]) + + # Plot spectral acceleration + plt.semilogx(p, amax, label=f"Profile {profile}", linewidth=2) + + except FileNotFoundError: + print(f"File Profile{profile}_acc{motion}.out not found") + + # Format plot + plt.ylabel("$S_a$ (g)") + plt.xlabel("Period (s)") + plt.title("Acceleration Response Spectra (5% Damping)") + plt.grid(True, alpha=0.3) + plt.legend() + plt.xlim([0.01, 10]) + plt.show() # Execute the plotting function plot_acc() ``` -This comprehensive OpenSees example demonstrates the complete workflow for submitting, monitoring, and analyzing OpenSees simulations using dapi, including advanced features like custom archive management, parametric studies, and detailed result postprocessing with response spectra analysis. \ No newline at end of file +**What this does:** + +- **`plt.semilogx`**: Creates log-scale plot for period axis +- **`Profile*_acc*.out`**: Loads acceleration time histories from OpenSees output +- **`linewidth=2`**: Makes lines more visible in plots +- **Multiple profiles**: Compares response across different soil profiles +- **5% damping**: Standard damping for structural response spectra + +### Advanced Post-processing + +```python +# Analyze stress time histories +def analyze_stress_results(): + """ + Analyze stress time histories from OpenSees output + """ + stress_files = [f for f in os.listdir('.') if 'Gstress' in f and f.endswith('.out')] + + print(f"Found {len(stress_files)} stress output files:") + for file in stress_files: + print(f"- {file}") + + # Load stress data + try: + stress_data = np.loadtxt(file) + + # Basic statistics + max_stress = np.max(np.abs(stress_data[:, 1:])) # Skip time column + print(f" Maximum stress magnitude: {max_stress:.2f}") + + # Plot time history + plt.figure(figsize=(10, 4)) + plt.plot(stress_data[:, 0], stress_data[:, 1], label='Shear Stress') + plt.xlabel('Time (s)') + plt.ylabel('Stress (kPa)') + plt.title(f'Stress Time History - {file}') + plt.grid(True, alpha=0.3) + plt.legend() + plt.show() + + except Exception as e: + print(f" Error loading {file}: {e}") + +# Run stress analysis +analyze_stress_results() +``` + +**What this does:** + +- **`stress_files`**: Finds all stress output files automatically +- **`np.loadtxt`**: Loads numerical data from OpenSees output +- **`max_stress`**: Computes maximum stress magnitude +- **Time history plots**: Visualizes stress evolution during earthquake \ No newline at end of file From d28b5765a033569cdd859bfbe6fe8581d13b7212 Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Thu, 12 Jun 2025 17:46:19 -0500 Subject: [PATCH 2/6] WIP: Testing file path with spaces --- dapi/files.py | 12 +++++------- docs/installation.md | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dapi/files.py b/dapi/files.py index 0fa0fa8..373372f 100644 --- a/dapi/files.py +++ b/dapi/files.py @@ -36,7 +36,7 @@ def _parse_tapis_uri(tapis_uri: str) -> (str, str): try: parsed = urllib.parse.urlparse(tapis_uri) system_id = parsed.netloc - path = parsed.path.lstrip("/") if parsed.path else "" + path = urllib.parse.unquote(parsed.path.lstrip("/")) if parsed.path else "" if not system_id: raise ValueError(f"Invalid Tapis URI: '{tapis_uri}'. Missing system ID.") return system_id, path @@ -316,26 +316,24 @@ def get_ds_path_uri(t: Tapis, path: str, verify_exists: bool = False) -> str: print(f"Verifying existence of translated path: {input_uri}") try: system_id, remote_path = _parse_tapis_uri(input_uri) - # Decode the path part for the listFiles call, as it expects unencoded paths - decoded_remote_path = urllib.parse.unquote(remote_path) - print(f"Checking system '{system_id}' for path '{decoded_remote_path}'...") + print(f"Checking system '{system_id}' for path '{remote_path}'...") # Use limit=1 for efficiency, we only care if it *exists* # Note: listFiles might return successfully for the *parent* directory # if the final component doesn't exist. A more robust check might # involve checking the result count or specific item name, but this # basic check catches non-existent parent directories. - t.files.listFiles(systemId=system_id, path=decoded_remote_path, limit=1) + t.files.listFiles(systemId=system_id, path=remote_path, limit=1) print(f"Verification successful: Path exists.") except BaseTapyException as e: # Specifically check for 404 on the listFiles call if hasattr(e, "response") and e.response and e.response.status_code == 404: raise FileOperationError( - f"Verification failed: Path '{decoded_remote_path}' does not exist on system '{system_id}'. Translated URI: {input_uri}" + f"Verification failed: Path '{remote_path}' does not exist on system '{system_id}'. Translated URI: {input_uri}" ) from e else: # Re-raise other Tapis errors encountered during verification raise FileOperationError( - f"Verification error for path '{decoded_remote_path}' on system '{system_id}': {e}" + f"Verification error for path '{remote_path}' on system '{system_id}': {e}" ) from e except ( ValueError diff --git a/docs/installation.md b/docs/installation.md index 04bf114..c416055 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -23,7 +23,7 @@ pip install dapi To get the latest features and bug fixes: ```bash -pip install git+https://github.com/DesignSafe-CI/dapi.git +pip install git+https://github.com/DesignSafe-CI/dapi.git@dev ``` ### πŸ› οΈ Install for Development From fad03b8930635e3d4d8285a5ba21f94c005242ad Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Tue, 17 Jun 2025 10:01:30 -0500 Subject: [PATCH 3/6] WIP: Fix files verification with spaces --- dapi/files.py | 4 +++- .../mpm/uniaxial_stress/test_benchmark.py | 14 ------------- tests/auth/test_auth.py | 18 ++++++++-------- tests/files/test_uri_translation.py | 21 +++++++++++++++++++ tests/jobs/test_dir_uri.py | 2 +- tests/jobs/test_job_gen_jobinfo.py | 17 ++++++--------- 6 files changed, 40 insertions(+), 36 deletions(-) delete mode 100644 examples/mpm/uniaxial_stress/test_benchmark.py diff --git a/dapi/files.py b/dapi/files.py index 373372f..89eda0d 100644 --- a/dapi/files.py +++ b/dapi/files.py @@ -316,13 +316,15 @@ def get_ds_path_uri(t: Tapis, path: str, verify_exists: bool = False) -> str: print(f"Verifying existence of translated path: {input_uri}") try: system_id, remote_path = _parse_tapis_uri(input_uri) + # The Tapis API expects URL-encoded paths when they contain spaces or special characters + encoded_remote_path = urllib.parse.quote(remote_path) print(f"Checking system '{system_id}' for path '{remote_path}'...") # Use limit=1 for efficiency, we only care if it *exists* # Note: listFiles might return successfully for the *parent* directory # if the final component doesn't exist. A more robust check might # involve checking the result count or specific item name, but this # basic check catches non-existent parent directories. - t.files.listFiles(systemId=system_id, path=remote_path, limit=1) + t.files.listFiles(systemId=system_id, path=encoded_remote_path, limit=1) print(f"Verification successful: Path exists.") except BaseTapyException as e: # Specifically check for 404 on the listFiles call diff --git a/examples/mpm/uniaxial_stress/test_benchmark.py b/examples/mpm/uniaxial_stress/test_benchmark.py deleted file mode 100644 index a2fdab0..0000000 --- a/examples/mpm/uniaxial_stress/test_benchmark.py +++ /dev/null @@ -1,14 +0,0 @@ -import os -import pathlib -import pandas as pd - -# Get current path -os.chdir(pathlib.Path(__file__).parent.absolute()) -# Read results as DF -df = pd.read_hdf("results/uniaxial-stress-2d-usf/particles09.h5", "table") -# Maximum stress in yy and xx -syy = -0.9999999999999999 -sxx = 0.0 -# Check results are consistent -assert round(df["stress_yy"].max() - syy, 8) == 0.0 -assert round(df["stress_xx"].max() - sxx, 8) == 0.0 diff --git a/tests/auth/test_auth.py b/tests/auth/test_auth.py index 3f3a1c0..08ce93d 100644 --- a/tests/auth/test_auth.py +++ b/tests/auth/test_auth.py @@ -1,11 +1,11 @@ import unittest from unittest.mock import patch, MagicMock -from dapi.auth.auth import init +from dapi.auth import init class TestAuthInit(unittest.TestCase): - @patch("dapi.auth.auth.Tapis") - @patch("dapi.auth.auth.os.environ") + @patch("dapi.auth.Tapis") + @patch("dapi.auth.os.environ") def test_init_with_env_variables(self, mock_environ, mock_tapis): # Setup mock_environ.get.side_effect = { @@ -27,10 +27,10 @@ def test_init_with_env_variables(self, mock_environ, mock_tapis): mock_tapis_obj.get_tokens.assert_called_once() self.assertEqual(result, mock_tapis_obj) - @patch("dapi.auth.auth.Tapis") - @patch("dapi.auth.auth.os.environ") - @patch("dapi.auth.auth.input") - @patch("dapi.auth.auth.getpass") + @patch("dapi.auth.Tapis") + @patch("dapi.auth.os.environ") + @patch("dapi.auth.input") + @patch("dapi.auth.getpass") def test_init_with_user_input( self, mock_getpass, mock_input, mock_environ, mock_tapis ): @@ -53,8 +53,8 @@ def test_init_with_user_input( mock_tapis_obj.get_tokens.assert_called_once() self.assertEqual(result, mock_tapis_obj) - @patch("dapi.auth.auth.Tapis") - @patch("dapi.auth.auth.os.environ") + @patch("dapi.auth.Tapis") + @patch("dapi.auth.os.environ") def test_init_authentication_failure(self, mock_environ, mock_tapis): # Setup mock_environ.get.side_effect = { diff --git a/tests/files/test_uri_translation.py b/tests/files/test_uri_translation.py index ab7143d..db2dfc9 100644 --- a/tests/files/test_uri_translation.py +++ b/tests/files/test_uri_translation.py @@ -61,6 +61,27 @@ def test_empty_path(self): result = tapis_uri_to_local_path(input_uri) self.assertEqual(result, expected) + def test_path_with_spaces(self): + """Test handling of paths with spaces (URL encoded)""" + input_uri = "tapis://designsafe.storage.default/kks32/DS%20input/file.txt" + expected = "/home/jupyter/MyData/DS input/file.txt" + result = tapis_uri_to_local_path(input_uri) + self.assertEqual(result, expected) + + def test_community_path_with_spaces(self): + """Test handling of community paths with spaces""" + input_uri = "tapis://designsafe.storage.community/My%20Dataset/data.csv" + expected = "/home/jupyter/CommunityData/My Dataset/data.csv" + result = tapis_uri_to_local_path(input_uri) + self.assertEqual(result, expected) + + def test_project_path_with_spaces(self): + """Test handling of project paths with spaces""" + input_uri = "tapis://project-1234-abcd/simulation%20results/output.txt" + expected = "/home/jupyter/MyProjects/simulation results/output.txt" + result = tapis_uri_to_local_path(input_uri) + self.assertEqual(result, expected) + # This allows running the test from the command line if __name__ == "__main__": diff --git a/tests/jobs/test_dir_uri.py b/tests/jobs/test_dir_uri.py index 0d25100..1d83049 100644 --- a/tests/jobs/test_dir_uri.py +++ b/tests/jobs/test_dir_uri.py @@ -1,6 +1,6 @@ import unittest from unittest.mock import MagicMock, patch -from dapi.jobs import get_ds_path_uri +from dapi.files import get_ds_path_uri from tapipy.tapis import Tapis diff --git a/tests/jobs/test_job_gen_jobinfo.py b/tests/jobs/test_job_gen_jobinfo.py index a802c32..582be02 100644 --- a/tests/jobs/test_job_gen_jobinfo.py +++ b/tests/jobs/test_job_gen_jobinfo.py @@ -1,6 +1,6 @@ import unittest from unittest.mock import Mock, patch -from dapi.jobs import jobs +from dapi.jobs import generate_job_request from datetime import datetime @@ -20,10 +20,10 @@ def setUp(self): self.app_info_mock.jobAttributes.execSystemLogicalQueue = "normal" self.t_mock.apps.getAppLatestVersion.return_value = self.app_info_mock - @patch("dapi.jobs.jobs.datetime") + @patch("dapi.jobs.datetime") def test_generate_job_info_default(self, mock_datetime): mock_datetime.now.return_value = datetime(2023, 5, 1, 12, 0, 0) - result = jobs.generate_job_info( + result = generate_job_request( self.t_mock, self.app_name, self.input_uri, self.input_file ) self.assertEqual(result["name"], f"{self.app_name}_20230501_120000") @@ -52,11 +52,10 @@ def test_generate_job_info_custom(self): custom_cores_per_node = 4 custom_queue = "high-priority" custom_allocation = "project123" - result = jobs.generate_job_info( + result = generate_job_request( self.t_mock, self.app_name, self.input_uri, - self.input_file, job_name=custom_job_name, max_minutes=custom_max_minutes, node_count=custom_node_count, @@ -77,15 +76,11 @@ def test_generate_job_info_custom(self): def test_generate_job_info_invalid_app(self): self.t_mock.apps.getAppLatestVersion.side_effect = Exception("Invalid app") with self.assertRaises(Exception): - jobs.generate_job_info( - self.t_mock, "invalid-app", self.input_uri, self.input_file - ) + generate_job_request(self.t_mock, "invalid-app", self.input_uri) def test_generate_job_info_opensees(self): opensees_app_name = "opensees-express" - result = jobs.generate_job_info( - self.t_mock, opensees_app_name, self.input_uri, self.input_file - ) + result = generate_job_request(self.t_mock, opensees_app_name, self.input_uri) self.assertIn("parameterSet", result) self.assertIn("envVariables", result["parameterSet"]) self.assertEqual( From bd54ff06157060c7f26af3433f20fa3a40272e81 Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Fri, 20 Jun 2025 14:14:06 -0600 Subject: [PATCH 4/6] Fix path with spaces --- dapi/files.py | 13 +++++-------- dapi/jobs.py | 6 +++--- tests/files/test_uri_translation.py | 8 ++++---- tests/jobs/test_dir_uri.py | 2 +- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/dapi/files.py b/dapi/files.py index 89eda0d..941dbd3 100644 --- a/dapi/files.py +++ b/dapi/files.py @@ -19,7 +19,7 @@ def _parse_tapis_uri(tapis_uri: str) -> (str, str): tapis_uri (str): URI in the format 'tapis://system_id/path'. Returns: - tuple: A tuple containing (system_id, path) where path is URL-decoded. + tuple: A tuple containing (system_id, path). Raises: ValueError: If the URI format is invalid or missing required components. @@ -36,7 +36,7 @@ def _parse_tapis_uri(tapis_uri: str) -> (str, str): try: parsed = urllib.parse.urlparse(tapis_uri) system_id = parsed.netloc - path = urllib.parse.unquote(parsed.path.lstrip("/")) if parsed.path else "" + path = parsed.path.lstrip("/") if parsed.path else "" if not system_id: raise ValueError(f"Invalid Tapis URI: '{tapis_uri}'. Missing system ID.") return system_id, path @@ -190,8 +190,7 @@ def get_ds_path_uri(t: Tapis, path: str, verify_exists: bool = False) -> str: ) else: tapis_path = path_remainder - encoded_path = urllib.parse.quote(tapis_path) - input_uri = f"tapis://{storage_system_id}/{encoded_path}" + input_uri = f"tapis://{storage_system_id}/{tapis_path}" print(f"Translated '{path}' to '{input_uri}' using t.username") break # Found match, exit loop @@ -206,8 +205,7 @@ def get_ds_path_uri(t: Tapis, path: str, verify_exists: bool = False) -> str: if pattern in path: path_remainder = path.split(pattern, 1)[1].lstrip("/") tapis_path = path_remainder - encoded_path = urllib.parse.quote(tapis_path) - input_uri = f"tapis://{storage_system_id}/{encoded_path}" + input_uri = f"tapis://{storage_system_id}/{tapis_path}" print(f"Translated '{path}' to '{input_uri}'") break # Found match, exit loop @@ -295,8 +293,7 @@ def get_ds_path_uri(t: Tapis, path: str, verify_exists: bool = False) -> str: f"Could not resolve project ID '{project_id_part}' to a Tapis system ID." ) - encoded_path_within_project = urllib.parse.quote(path_within_project) - input_uri = f"tapis://{found_system_id}/{encoded_path_within_project}" + input_uri = f"tapis://{found_system_id}/{path_within_project}" print(f"Translated '{path}' to '{input_uri}' using Tapis v3 lookup") break # Found match, exit loop diff --git a/dapi/jobs.py b/dapi/jobs.py index ea1ada2..25a5488 100644 --- a/dapi/jobs.py +++ b/dapi/jobs.py @@ -1008,7 +1008,7 @@ def archive_uri(self) -> Optional[str]: if details.archiveSystemId and details.archiveSystemDir: archive_path = details.archiveSystemDir.lstrip("/") return ( - f"tapis://{details.archiveSystemId}/{urllib.parse.quote(archive_path)}" + f"tapis://{details.archiveSystemId}/{archive_path}" ) return None @@ -1048,7 +1048,7 @@ def list_outputs( full_archive_path = os.path.join(details.archiveSystemDir, path.lstrip("/")) full_archive_path = os.path.normpath(full_archive_path).lstrip("/") try: - archive_base_uri = f"tapis://{details.archiveSystemId}/{urllib.parse.quote(full_archive_path)}" + archive_base_uri = f"tapis://{details.archiveSystemId}/{full_archive_path}" from .files import list_files return list_files(self._tapis, archive_base_uri, limit=limit, offset=offset) @@ -1085,7 +1085,7 @@ def download_output(self, remote_path: str, local_target: str): ) full_archive_path = os.path.normpath(full_archive_path).lstrip("/") remote_uri = ( - f"tapis://{details.archiveSystemId}/{urllib.parse.quote(full_archive_path)}" + f"tapis://{details.archiveSystemId}/{full_archive_path}" ) try: from .files import download_file diff --git a/tests/files/test_uri_translation.py b/tests/files/test_uri_translation.py index db2dfc9..6ba3228 100644 --- a/tests/files/test_uri_translation.py +++ b/tests/files/test_uri_translation.py @@ -62,22 +62,22 @@ def test_empty_path(self): self.assertEqual(result, expected) def test_path_with_spaces(self): - """Test handling of paths with spaces (URL encoded)""" - input_uri = "tapis://designsafe.storage.default/kks32/DS%20input/file.txt" + """Test handling of paths with spaces""" + input_uri = "tapis://designsafe.storage.default/kks32/DS input/file.txt" expected = "/home/jupyter/MyData/DS input/file.txt" result = tapis_uri_to_local_path(input_uri) self.assertEqual(result, expected) def test_community_path_with_spaces(self): """Test handling of community paths with spaces""" - input_uri = "tapis://designsafe.storage.community/My%20Dataset/data.csv" + input_uri = "tapis://designsafe.storage.community/My Dataset/data.csv" expected = "/home/jupyter/CommunityData/My Dataset/data.csv" result = tapis_uri_to_local_path(input_uri) self.assertEqual(result, expected) def test_project_path_with_spaces(self): """Test handling of project paths with spaces""" - input_uri = "tapis://project-1234-abcd/simulation%20results/output.txt" + input_uri = "tapis://project-1234-abcd/simulation results/output.txt" expected = "/home/jupyter/MyProjects/simulation results/output.txt" result = tapis_uri_to_local_path(input_uri) self.assertEqual(result, expected) diff --git a/tests/jobs/test_dir_uri.py b/tests/jobs/test_dir_uri.py index 1d83049..611fc90 100644 --- a/tests/jobs/test_dir_uri.py +++ b/tests/jobs/test_dir_uri.py @@ -52,7 +52,7 @@ def test_no_matching_pattern(self): def test_space_in_path(self): path = "jupyter/MyData/path with spaces" - expected = "tapis://designsafe.storage.default/testuser/path%20with%20spaces" + expected = "tapis://designsafe.storage.default/testuser/path with spaces" self.assertEqual(get_ds_path_uri(self.t, path), expected) From 69d6e7dd44cc3ecd59885aeb262e7ea07d2a50ec Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Fri, 20 Jun 2025 14:16:26 -0600 Subject: [PATCH 5/6] Fixes #4 TAPIS file path with spaces --- dapi/jobs.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dapi/jobs.py b/dapi/jobs.py index 25a5488..8e5d037 100644 --- a/dapi/jobs.py +++ b/dapi/jobs.py @@ -1007,9 +1007,7 @@ def archive_uri(self) -> Optional[str]: details = self._get_details() if details.archiveSystemId and details.archiveSystemDir: archive_path = details.archiveSystemDir.lstrip("/") - return ( - f"tapis://{details.archiveSystemId}/{archive_path}" - ) + return f"tapis://{details.archiveSystemId}/{archive_path}" return None def list_outputs( @@ -1084,9 +1082,7 @@ def download_output(self, remote_path: str, local_target: str): details.archiveSystemDir, remote_path.lstrip("/") ) full_archive_path = os.path.normpath(full_archive_path).lstrip("/") - remote_uri = ( - f"tapis://{details.archiveSystemId}/{full_archive_path}" - ) + remote_uri = f"tapis://{details.archiveSystemId}/{full_archive_path}" try: from .files import download_file From e8b589ad08cefe493b61ce2aa485a8a07b7403e5 Mon Sep 17 00:00:00 2001 From: Krishna Kumar Date: Fri, 20 Jun 2025 14:31:46 -0600 Subject: [PATCH 6/6] Make encoding consitent --- dapi/files.py | 46 ++++++++++- tests/files/test_encoding_consistency.py | 97 ++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 tests/files/test_encoding_consistency.py diff --git a/dapi/files.py b/dapi/files.py index 941dbd3..35a0bd5 100644 --- a/dapi/files.py +++ b/dapi/files.py @@ -11,6 +11,36 @@ from typing import List +def _safe_quote(path: str) -> str: + """Safely URL-encode a path, avoiding double encoding. + + Args: + path (str): The path to encode + + Returns: + str: URL-encoded path + + Example: + >>> _safe_quote("folder with spaces") + 'folder%20with%20spaces' + >>> _safe_quote("folder%20with%20spaces") # Already encoded + 'folder%20with%20spaces' + """ + # Check if the path appears to be already URL-encoded + # by trying to decode it and seeing if it changes + try: + decoded = urllib.parse.unquote(path) + if decoded != path: + # Path was URL-encoded, return as-is to avoid double encoding + return path + else: + # Path was not URL-encoded, encode it + return urllib.parse.quote(path) + except Exception: + # If there's any error in decoding, just encode the original path + return urllib.parse.quote(path) + + # _parse_tapis_uri helper remains the same def _parse_tapis_uri(tapis_uri: str) -> (str, str): """Parse a Tapis URI into system ID and path components. @@ -314,7 +344,7 @@ def get_ds_path_uri(t: Tapis, path: str, verify_exists: bool = False) -> str: try: system_id, remote_path = _parse_tapis_uri(input_uri) # The Tapis API expects URL-encoded paths when they contain spaces or special characters - encoded_remote_path = urllib.parse.quote(remote_path) + encoded_remote_path = _safe_quote(remote_path) print(f"Checking system '{system_id}' for path '{remote_path}'...") # Use limit=1 for efficiency, we only care if it *exists* # Note: listFiles might return successfully for the *parent* directory @@ -376,8 +406,12 @@ def upload_file(t: Tapis, local_path: str, remote_uri: str): print( f"Uploading '{local_path}' to system '{system_id}' at path '{dest_path}'..." ) + # URL-encode the destination path for API call + encoded_dest_path = _safe_quote(dest_path) t.upload( - system_id=system_id, source_file_path=local_path, dest_file_path=dest_path + system_id=system_id, + source_file_path=local_path, + dest_file_path=encoded_dest_path, ) print("Upload complete.") except BaseTapyException as e: @@ -421,8 +455,10 @@ def download_file(t: Tapis, remote_uri: str, local_path: str): os.makedirs(local_dir, exist_ok=True) # Use getContents which returns the raw bytes # Set stream=True for potentially large files + # URL-encode the source path for API call + encoded_source_path = _safe_quote(source_path) response = t.files.getContents( - systemId=system_id, path=source_path, stream=True + systemId=system_id, path=encoded_source_path, stream=True ) # Write the streamed content to the local file @@ -474,8 +510,10 @@ def list_files( try: system_id, path = _parse_tapis_uri(remote_uri) print(f"Listing files in system '{system_id}' at path '{path}'...") + # URL-encode the path for API call + encoded_path = _safe_quote(path) results = t.files.listFiles( - systemId=system_id, path=path, limit=limit, offset=offset + systemId=system_id, path=encoded_path, limit=limit, offset=offset ) print(f"Found {len(results)} items.") return results diff --git a/tests/files/test_encoding_consistency.py b/tests/files/test_encoding_consistency.py new file mode 100644 index 0000000..f7fcde5 --- /dev/null +++ b/tests/files/test_encoding_consistency.py @@ -0,0 +1,97 @@ +import unittest +from unittest.mock import MagicMock, Mock +from dapi.files import _safe_quote, _parse_tapis_uri, get_ds_path_uri +from tapipy.tapis import Tapis +import urllib.parse + + +class TestEncodingConsistency(unittest.TestCase): + """Test encoding consistency and double-encoding prevention.""" + + def test_safe_quote_prevents_double_encoding(self): + """Test that _safe_quote prevents double encoding.""" + # Test with unencoded path + unencoded = "folder with spaces" + encoded_once = _safe_quote(unencoded) + encoded_twice = _safe_quote(encoded_once) + + self.assertEqual(encoded_once, "folder%20with%20spaces") + self.assertEqual(encoded_twice, encoded_once) # Should not double encode + + def test_safe_quote_with_multiple_spaces(self): + """Test _safe_quote with multiple spaces.""" + test_cases = [ + ("folder with spaces", "folder%20with%20spaces"), + ("folder%20with%20spaces", "folder%20with%20spaces"), # Already encoded + ("normal_folder", "normal_folder"), + ("path/with spaces/here", "path/with%20spaces/here"), + ( + "path%2Fwith%20spaces%2Fhere", + "path%2Fwith%20spaces%2Fhere", + ), # Already encoded + ] + + for input_path, expected in test_cases: + with self.subTest(input_path=input_path): + result = _safe_quote(input_path) + self.assertEqual(result, expected) + + def test_uri_generation_uses_spaces(self): + """Test that URI generation creates URIs with spaces, not %20.""" + mock_tapis = MagicMock(spec=Tapis) + mock_tapis.username = "testuser" + + path = "jupyter/MyData/folder with spaces/file.txt" + uri = get_ds_path_uri(mock_tapis, path) + + # URI should contain actual spaces + self.assertIn("folder with spaces", uri) + self.assertNotIn("folder%20with%20spaces", uri) + self.assertEqual( + uri, + "tapis://designsafe.storage.default/testuser/folder with spaces/file.txt", + ) + + def test_uri_parsing_handles_spaces(self): + """Test that URI parsing correctly handles URIs with spaces.""" + uri_with_spaces = ( + "tapis://designsafe.storage.default/testuser/folder with spaces/file.txt" + ) + system_id, path = _parse_tapis_uri(uri_with_spaces) + + self.assertEqual(system_id, "designsafe.storage.default") + self.assertEqual(path, "testuser/folder with spaces/file.txt") + + def test_uri_parsing_handles_encoded_uris(self): + """Test that URI parsing correctly handles pre-encoded URIs.""" + uri_with_encoding = "tapis://designsafe.storage.default/testuser/folder%20with%20spaces/file.txt" + system_id, path = _parse_tapis_uri(uri_with_encoding) + + self.assertEqual(system_id, "designsafe.storage.default") + # Should return the path as-is (with %20) since we no longer decode + self.assertEqual(path, "testuser/folder%20with%20spaces/file.txt") + + def test_round_trip_consistency(self): + """Test that URI generation and parsing are consistent.""" + mock_tapis = MagicMock(spec=Tapis) + mock_tapis.username = "testuser" + + original_path = "jupyter/MyData/folder with spaces/file.txt" + + # Generate URI + uri = get_ds_path_uri(mock_tapis, original_path) + + # Parse URI back + system_id, parsed_path = _parse_tapis_uri(uri) + + # The parsed path should match the expected Tapis path + expected_tapis_path = "testuser/folder with spaces/file.txt" + self.assertEqual(parsed_path, expected_tapis_path) + + # Safe quote should properly encode for API calls + api_path = _safe_quote(parsed_path) + self.assertEqual(api_path, "testuser/folder%20with%20spaces/file.txt") + + +if __name__ == "__main__": + unittest.main()