(mpi)=
# MPI in Mantid
## What is MPI?
MPI (Message Passing Interface) is a standardized and portable message-passing system
designed to enable parallel computing across multiple processors or nodes.
It allows programs to coordinate and communicate data between processes running simultaneously,
making it possible to distribute computational workloads and process large datasets more efficiently.
In the context of scientific computing and data reduction, MPI enables:
- **Parallel data processing** - Split large datasets across multiple processes for faster analysis
- **Distributed memory computing** - Each process has its own memory space, allowing processing of datasets larger than a single machine's memory
- **Scalability** - Run the same code on anything from a laptop to a supercomputer cluster
- **Collective operations** - Built-in operations for common patterns like broadcasting, gathering, and reducing data
## MPI in Mantid
Mantid provides MPI support through a few specialized algorithms that enable parallel data reduction workflows.
These algorithms are designed to work with Mantid's workspace objects as MPI's communication primitives.
The MPI algorithms in Mantid are:
- **BroadcastWorkspace** - Distributes a workspace from one process to all other processes
- **GatherWorkspaces** - Collects workspaces from all processes to a single root process
These algorithms are built as part of the `MantidMPIAlgorithms` plugin and are only available for linux in the `mantidmpi` package
or when Mantid is compiled with the cmake flag `MPI_BUILD=ON`.
## Building Mantid with MPI Support
To use MPI algorithms, Mantid must be compiled with MPI support enabled:
```bash
cmake -DMPI_BUILD=ON --preset=linux
cd build
ninja
```
The required MPI packages are already included as mantid dependencies.
## Running Mantid Scripts with MPI
MPI-enabled Mantid scripts are executed using the `mpiexec` launcher:
```bash
mpiexec -np 4 python my_mpi_script.py
```
Where:
- `-np 4` specifies the number of MPI processes (ranks) to launch
- Each process executes the same script but can take different code paths based on its rank
## Common MPI Patterns
### Master-Worker Pattern
The most common pattern involves a "master" process (typically rank 0) that coordinates work:
```python
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
# Master process
print(f"Running with {size} processes")
# Coordinate work
else:
# Worker processes
# Perform assigned tasks
pass
```
### Broadcast-Process-Gather Pattern
A typical parallel data reduction workflow:
1. **Broadcast** - Distribute common data (workspace, calibration, instrument geometry) to all processes
1. **Process** - Each process works on its portion of the data independently
1. **Gather** - Collect results back to the master process
## MPI Algorithm Reference
### BroadcastWorkspace
**Purpose**: Distribute a workspace from one MPI process (the broadcaster) to all other processes in the MPI job.
**Key Features**:
- Only the broadcaster rank needs to provide an input workspace
- Optimized for large workspaces through chunked broadcasting
- Automatically detects and optimizes shared X data across spectra
- Memory-efficient with configurable chunk sizes
**Usage**: This is a collective MPI operation - all processes must call it simultaneously.
### GatherWorkspaces
**Purpose**: Collect workspaces from all MPI processes to a single root process.
**Key Features**:
- Supports two accumulation methods: Add and Append
- Chunked processing for large workspaces
- Preserves EventWorkspace data when requested
- Only root process receives output
**Usage**: This is a collective MPI operation - all participating processes must call it simultaneously.
## Examples
### Example 1: Broadcasting Calibration Data
A common use case is to load a workspace or calibration data on one process and distribute it to all others:
```python
from mantid.simpleapi import Load, BroadcastWorkspace
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# Only rank 0 loads the calibration file
if rank == 0:
cal_ws = Load(Filename="calibration.nxs")
shared_cal = BroadcastWorkspace(InputWorkspace=cal_ws, BroadcasterRank=0, OutputWorkspace="calibration")
else:
# Other ranks receive the broadcast
shared_cal = BroadcastWorkspace(BroadcasterRank=0, OutputWorkspace="calibration")
# All ranks now have 'shared_cal' and can use it for processing
```
### Example 2: Parallel Data Reduction with Gather
Process different run numbers in parallel and combine results:
```python
from mantid.simpleapi import Load, Rebin, GatherWorkspaces
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Define run numbers to process
run_numbers = [12345, 12346, 12347, 12348, 12349, 12350, 12351, 12352]
# Each rank processes a subset of runs
my_runs = run_numbers[rank::size] # Distribute runs round-robin
# Process assigned runs
for run_num in my_runs:
ws = Load(Filename=f"data_{run_num}.nxs")
ws = Rebin(InputWorkspace=ws, Params="0.1,-0.001,3.0")
# Additional processing...
# For this example, assume last processed workspace is 'ws'
# Gather all processed workspaces to rank 0
if rank == 0:
combined = GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Append", OutputWorkspace="all_runs_combined")
print(f"Combined workspace has {combined.getNumberHistograms()} spectra")
else:
GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Append")
```
### Example 3: Summing Data Across Processes
Use "Add" mode to sum corresponding spectra from all processes:
```python
from mantid.simpleapi import CreateWorkspace, GatherWorkspaces
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# Each rank creates workspace with rank-dependent values
x = np.linspace(0, 10, 101)
y = np.ones(100) * (rank + 1) # Rank 0: 1s, Rank 1: 2s, etc.
ws = CreateWorkspace(DataX=x, DataY=y, NSpec=1)
# Sum all workspaces together
if rank == 0:
summed = GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Add", OutputWorkspace="summed")
# If 4 processes: result = 1 + 2 + 3 + 4 = 10
print(f"Summed value: {summed.readY(0)[0]}")
else:
GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Add")
```
### Example 4: Complete Workflow - Broadcast, Process, Gather
A complete parallel data reduction workflow:
```python
from mantid.simpleapi import Load, BroadcastWorkspace, AlignAndFocusPowder, GatherWorkspaces
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# Step 1: Broadcast calibration data
if rank == 0:
cal = Load(Filename="calibration.nxs")
cal = BroadcastWorkspace(InputWorkspace=cal, BroadcasterRank=0, OutputWorkspace="cal")
else:
cal = BroadcastWorkspace(BroadcasterRank=0, OutputWorkspace="cal")
# Step 2: Each rank processes its assigned data
my_file = f"data_rank{rank}.nxs"
ws = Load(Filename=my_file)
# Use the shared calibration
ws = AlignAndFocusPowder(InputWorkspace=ws, CalFileName=cal, Params="0.5,-0.001,10")
# Step 3: Gather results
if rank == 0:
combined = GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Append", OutputWorkspace="final_result")
# Save combined result
SaveNexus(InputWorkspace=combined, Filename="combined_output.nxs")
else:
GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Append")
```
## Performance Considerations
### Chunking Strategy
Both `BroadcastWorkspace` and `GatherWorkspaces` support chunked processing:
- **Automatic (ChunkSize=0)** - Recommended for most cases, targets ~100MB chunks
- **All-at-once (ChunkSize=-1)** - Fastest but uses most memory
- **Explicit (ChunkSize=N)** - Manual control, useful for memory-constrained systems
### Memory Usage
- `BroadcastWorkspace` temporarily duplicates data during transmission
- `GatherWorkspaces` with Append mode requires memory for all gathered data on root
- Consider using chunking and processing data in stages for very large datasets
### Network Bandwidth
- Shared X data is automatically detected and broadcast only once
- Use appropriate chunk sizes to balance memory and network efficiency
- EventWorkspaces can be large - consider converting to histograms if events aren't needed
## Common Pitfalls
### Collective Operations
**Problem**: One process doesn't call the MPI algorithm, causing the job to hang.
**Solution**: Ensure all processes in the communicator call MPI algorithms, even if they don't have input data.
```python
# WRONG - only rank 0 calls the algorithm
if rank == 0:
output = GatherWorkspaces(InputWorkspace=ws)
# CORRECT - all ranks participate
if rank == 0:
output = GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Append", OutputWorkspace="result")
else:
GatherWorkspaces(InputWorkspace=ws, AccumulationMethod="Append")
```
### Workspace Compatibility
**Problem**: Attempting to gather workspaces with different numbers of bins.
**Solution**: Ensure all workspaces have compatible structure before gathering:
```python
# All workspaces must have same number of bins
# Rebin to common grid if necessary
ws = Rebin(InputWorkspace=ws, Params="0,-0.001,10")
```
### Missing Input on Root
**Problem**: Root process doesn't provide input to `BroadcastWorkspace`.
**Solution**: Only the broadcaster rank needs to provide input:
```python
if rank == 0:
ws = Load(Filename="data.nxs")
output = BroadcastWorkspace(InputWorkspace=ws) # Required
else:
output = BroadcastWorkspace(BroadcasterRank=0) # No input needed
```
## Additional Resources
- MPI Tutorial:
- mpi4py Documentation:
- Mantid MPI Algorithms: See {ref}`BroadcastWorkspace ` and {ref}`GatherWorkspaces `
```{eval-rst}
.. categories:: Concepts
```