Skip to main content
MLIP Arena uses Prefect as its workflow engine. Prefect handles task caching, parallel execution, state tracking, and integration with HPC schedulers — without requiring changes to your simulation code.

Flows vs tasks in Prefect

Prefect distinguishes two primitives:
PrimitiveDecoratorPurpose
Task@taskOne unit of work. Supports caching and retry.
Flow@flowOrchestrates multiple tasks. Entry point for execution.
In MLIP Arena, every simulation operation (OPT, EOS, MD, etc.) is a @task. Benchmark scripts wrap those tasks in a @flow to run them in parallel across models and structures.

Running tasks directly

You can call any task directly without a flow for single calculations:
from mlip_arena.tasks import MD
from mlip_arena.tasks.utils import get_calculator
from mlip_arena.models import MLIPEnum
from ase.build import bulk

atoms = bulk("Cu", "fcc", a=3.6) * (5, 5, 5)

result = MD(
    atoms=atoms,
    calculator=get_calculator(MLIPEnum["MACE-MP(M)"]),
    ensemble="nvt",
    dynamics="langevin",
    total_time=1e3,  # 1 ps
    time_step=2,     # fs
)

Using .submit() for parallel execution

To run calculations concurrently, call .submit() on the task instead of calling it directly. .submit() returns a PrefectFuture immediately and dispatches the work to a Prefect worker. Wrap all .submit() calls inside a @flow so Prefect can track and schedule them:
# .github/README.md (lines 122–136)
from prefect import flow
from mlip_arena.models import MLIPEnum
from mlip_arena.tasks import MD
from mlip_arena.tasks.utils import get_calculator
from ase.build import bulk

atoms = bulk("Cu", "fcc", a=3.6) * (5, 5, 5)

@flow
def run_all_models():
    futures = []
    for model in MLIPEnum:
        future = MD.submit(
            atoms=atoms,
            calculator=get_calculator(model),
            ensemble="nvt",
            total_time=1e3,
            time_step=2,
        )
        futures.append(future)

    return [f.result(raise_on_failure=False) for f in futures]

results = run_all_models()
raise_on_failure=False lets you collect all results even if some models fail. Inspect the returned State objects to identify which models succeeded.

A complete parallel benchmark flow

The homonuclear_diatomics flow in mlip_arena/flows/diatomics.py is a production example that parallelizes energy curve calculations across all 118 elements:
# mlip_arena/flows/diatomics.py (lines 260–285)
from prefect import flow, task
from prefect.futures import wait
from mlip_arena.models import REGISTRY, MLIPEnum
from mlip_arena.tasks.utils import get_calculator
from ase.data import chemical_symbols

@task
def homonuclear_diatomic(symbol: str, calculator, out_dir):
    """Calculate the potential energy curve for one diatomic."""
    # ... scans interatomic distances and writes trajectory

@task
def analyze(out_dir):
    """Compute physics metrics from all saved trajectories."""
    # ... returns a DataFrame with conservation, tortuosity, Spearman metrics

@flow
def homonuclear_diatomics(model: str, run_dir=None):
    futures = []
    for symbol in chemical_symbols[1:]:  # H through Og
        calculator = get_calculator(model)
        future = homonuclear_diatomic.submit(
            symbol,
            calculator,
            out_dir=out_dir,
        )
        futures.append(future)
    wait(futures)  # block until all 118 calculations complete

    df = analyze(out_dir)
    df["method"] = model_name
    df.to_json(out_dir / "homonuclear-diatomics.json", orient="records")
    return [f.result(raise_on_failure=False) for f in futures]
Key patterns in this flow:
  • @task on individual per-element calculations.
  • @flow wraps the loop and calls .submit() on each task.
  • wait(futures) blocks until all futures complete before the analyze task runs.
  • Results are collected with raise_on_failure=False to tolerate partial failures.

Caching behavior

All MLIP Arena tasks use the TASK_SOURCE + INPUTS cache policy:
# mlip_arena/tasks/optimize.py (lines 53–55)
from prefect.cache_policies import INPUTS, TASK_SOURCE

@task(
    name="OPT",
    cache_policy=TASK_SOURCE + INPUTS,
)
def run(atoms, calculator, ...):
    ...
This policy stores a cache key from the hash of:
  1. The task’s source code (TASK_SOURCE) — cache is invalidated when the task implementation changes.
  2. All input parameters (INPUTS) — separate results are cached for each unique (atoms, calculator, kwargs) combination.

Fresh execution

Pass refresh_cache=True via .with_options() to bypass the cache and re-run a task:
OPT_fresh = OPT.with_options(refresh_cache=True)
result = OPT_fresh(atoms=atoms, calculator=calc)

Persistent results

Pass persist_result=True to write results to a Prefect result backend. EOS uses this for intermediate OPT results:
OPT_cached = OPT.with_options(
    refresh_cache=False,
    persist_result=True,
)

Running on HPC with dask_jobqueue

For large-scale benchmarks, configure Prefect to use a dask_jobqueue worker pool that submits jobs to SLURM, PBS, or SGE:
1

Install dask-jobqueue

pip install dask-jobqueue prefect-dask
2

Configure a DaskTaskRunner

from prefect_dask import DaskTaskRunner
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    cores=8,
    memory="32GB",
    walltime="04:00:00",
    job_extra_directives=["-p gpu", "--gres=gpu:1"],
)
cluster.scale(jobs=16)  # submit 16 SLURM jobs

@flow(task_runner=DaskTaskRunner(address=cluster.scheduler_address))
def benchmark_flow():
    futures = []
    for model in MLIPEnum:
        future = MD.submit(atoms=atoms, calculator=get_calculator(model), ...)
        futures.append(future)
    return [f.result(raise_on_failure=False) for f in futures]
3

Run the flow

python benchmark_flow.py
Prefect submits each .submit() call as a Dask task, which dask_jobqueue dispatches as individual SLURM jobs.
For a practical HPC example, refer to the MD stability benchmark notebook at benchmarks/stability/temperature.ipynb.

Waiting for futures

Use prefect.futures.wait() to block until a set of futures completes before proceeding:
# mlip_arena/flows/diatomics.py (lines 280–284)
from prefect.futures import wait

futures = [task_fn.submit(...) for ... in items]
wait(futures)  # blocks here

result = post_process(out_dir)  # runs after all futures complete
This is necessary when a downstream task (like analyze) depends on the output files written by all upstream tasks.