How Prefect flows enable parallel benchmark execution, caching, and HPC-scale orchestration in MLIP Arena.
MLIP Arena uses Prefect as its workflow engine. Prefect handles task caching, parallel execution, state tracking, and integration with HPC schedulers — without requiring changes to your simulation code.
Orchestrates multiple tasks. Entry point for execution.
In MLIP Arena, every simulation operation (OPT, EOS, MD, etc.) is a @task. Benchmark scripts wrap those tasks in a @flow to run them in parallel across models and structures.
To run calculations concurrently, call .submit() on the task instead of calling it directly. .submit() returns a PrefectFuture immediately and dispatches the work to a Prefect worker.Wrap all .submit() calls inside a @flow so Prefect can track and schedule them:
# .github/README.md (lines 122–136)from prefect import flowfrom mlip_arena.models import MLIPEnumfrom mlip_arena.tasks import MDfrom mlip_arena.tasks.utils import get_calculatorfrom ase.build import bulkatoms = bulk("Cu", "fcc", a=3.6) * (5, 5, 5)@flowdef run_all_models(): futures = [] for model in MLIPEnum: future = MD.submit( atoms=atoms, calculator=get_calculator(model), ensemble="nvt", total_time=1e3, time_step=2, ) futures.append(future) return [f.result(raise_on_failure=False) for f in futures]results = run_all_models()
raise_on_failure=False lets you collect all results even if some models fail. Inspect the returned State objects to identify which models succeeded.
The homonuclear_diatomics flow in mlip_arena/flows/diatomics.py is a production example that parallelizes energy curve calculations across all 118 elements:
# mlip_arena/flows/diatomics.py (lines 260–285)from prefect import flow, taskfrom prefect.futures import waitfrom mlip_arena.models import REGISTRY, MLIPEnumfrom mlip_arena.tasks.utils import get_calculatorfrom ase.data import chemical_symbols@taskdef homonuclear_diatomic(symbol: str, calculator, out_dir): """Calculate the potential energy curve for one diatomic.""" # ... scans interatomic distances and writes trajectory@taskdef analyze(out_dir): """Compute physics metrics from all saved trajectories.""" # ... returns a DataFrame with conservation, tortuosity, Spearman metrics@flowdef homonuclear_diatomics(model: str, run_dir=None): futures = [] for symbol in chemical_symbols[1:]: # H through Og calculator = get_calculator(model) future = homonuclear_diatomic.submit( symbol, calculator, out_dir=out_dir, ) futures.append(future) wait(futures) # block until all 118 calculations complete df = analyze(out_dir) df["method"] = model_name df.to_json(out_dir / "homonuclear-diatomics.json", orient="records") return [f.result(raise_on_failure=False) for f in futures]
Key patterns in this flow:
@task on individual per-element calculations.
@flow wraps the loop and calls .submit() on each task.
wait(futures) blocks until all futures complete before the analyze task runs.
Results are collected with raise_on_failure=False to tolerate partial failures.