Skip to content

Pipeline¤

The Pipeline chains multiple Blocks into an automated workflow. Use Step for sequential operations and Loop for iterative processes. Supports both local execution and HPC clusters via SlurmCluster.

f3dasm.Pipeline ¤

A composable, executable pipeline of f3dasm blocks.

A pipeline is an ordered sequence of :class:Step and :class:Loop objects. Steps run once; loops repeat their inner steps for a given number of iterations.

In SLURM mode, a single self-resubmitting orchestrator script manages the entire pipeline, submitting one step (or one loop iteration) at a time.

Parameters:

Name Type Description Default
name str

Name of the pipeline (used for directory naming and SLURM job prefixes).

''
steps list[Step | Loop]

The ordered sequence of steps and loops.

<factory>
orchestrator_resources SlurmResources

SLURM resource requirements for the orchestrator job. If None, a lightweight default is used (10 min, 1 GB, 1 CPU). The orchestrator only runs sbatch commands, so minimal resources suffice.

None

Examples:

Simple three-phase pipeline::

pipeline = Pipeline(
    name="my_experiment",
    steps=[
        Step("create", block=my_create_block),
        Step("run", block=my_generator, parallel=True),
        Step("post", block=my_post_block),
    ],
)
pipeline.run(mode="local")

Pipeline with a loop::

pipeline = Pipeline(
    name="online_rl",
    steps=[
        Step("create", block=create_block),
        Loop(n_iterations=10, steps=[
            Step("run", block=generator, parallel=True),
            Step("post", block=update_block),
        ]),
    ],
)
pipeline.run(
    mode="slurm",
    cluster=SlurmCluster(
        partition="batch",
        account="my_account",
    ),
    rootdir="/scratch/user",
)
Source code in src/f3dasm/_src/pipeline/pipeline.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@dataclass
class Pipeline:
    """A composable, executable pipeline of f3dasm blocks.

    A pipeline is an ordered sequence of :class:`Step` and
    :class:`Loop` objects. Steps run once; loops repeat their
    inner steps for a given number of iterations.

    In SLURM mode, a single self-resubmitting orchestrator script
    manages the entire pipeline, submitting one step (or one loop
    iteration) at a time.

    Parameters
    ----------
    name : str
        Name of the pipeline (used for directory naming and SLURM
        job prefixes).
    steps : list[Step | Loop]
        The ordered sequence of steps and loops.
    orchestrator_resources : SlurmResources, optional
        SLURM resource requirements for the orchestrator job. If
        ``None``, a lightweight default is used (10 min, 1 GB,
        1 CPU). The orchestrator only runs ``sbatch`` commands, so
        minimal resources suffice.

    Examples
    --------
    Simple three-phase pipeline::

        pipeline = Pipeline(
            name="my_experiment",
            steps=[
                Step("create", block=my_create_block),
                Step("run", block=my_generator, parallel=True),
                Step("post", block=my_post_block),
            ],
        )
        pipeline.run(mode="local")

    Pipeline with a loop::

        pipeline = Pipeline(
            name="online_rl",
            steps=[
                Step("create", block=create_block),
                Loop(n_iterations=10, steps=[
                    Step("run", block=generator, parallel=True),
                    Step("post", block=update_block),
                ]),
            ],
        )
        pipeline.run(
            mode="slurm",
            cluster=SlurmCluster(
                partition="batch",
                account="my_account",
            ),
            rootdir="/scratch/user",
        )
    """

    name: str = ""
    steps: list[PipelineElement] = field(default_factory=list)
    orchestrator_resources: SlurmResources | None = None

    def _flatten(self) -> list[tuple[Step, int, int]]:
        """Flatten the pipeline into an ordered list of steps.

        Walks the ``steps`` list and expands any :class:`Loop`
        elements into repeated step entries.

        Returns
        -------
        list[tuple[Step, int, int]]
            Each entry is ``(step, iteration, n_iterations)``
            where ``iteration`` is the current loop iteration
            (0 for non-looped steps) and ``n_iterations`` is the
            total (1 for non-looped steps).
        """
        flat: list[tuple[Step, int, int]] = []
        for element in self.steps:
            if isinstance(element, Step):
                flat.append((element, 0, 1))
            elif isinstance(element, Loop):
                for i in range(element.n_iterations):
                    for step in element.steps:
                        flat.append((step, i, element.n_iterations))
        return flat

    def from_step(self, step: int | str) -> Pipeline:
        """Return a new Pipeline starting from the specified step.

        Parameters
        ----------
        step : int | str
            Step index (int) or step name (str) to start from.
            When a name is given and the step lives inside a
            :class:`Loop`, the whole Loop element is included.

        Returns
        -------
        Pipeline
            A new Pipeline containing only the steps from
            ``step`` onwards, with the same name and resources.

        Raises
        ------
        ValueError
            If a step name is given and no matching step is found.

        Examples
        --------
        Resume a pipeline from the ``"run"`` step::

            # Original run
            job_id = pipeline.run(mode="local")

            # Resume from "run", reusing the same output directory
            pipeline.from_step("run").run(mode="local", project_job=job_id)
        """
        if isinstance(step, int):
            return Pipeline(
                name=self.name,
                steps=self.steps[step:],
                orchestrator_resources=self.orchestrator_resources,
            )

        # str: find the PipelineElement that contains the named step
        for i, element in enumerate(self.steps):
            if isinstance(element, Step) and element.name == step:
                return Pipeline(
                    name=self.name,
                    steps=self.steps[i:],
                    orchestrator_resources=self.orchestrator_resources,
                )
            elif isinstance(element, Loop):
                for loop_step in element.steps:
                    if loop_step.name == step:
                        return Pipeline(
                            name=self.name,
                            steps=self.steps[i:],
                            orchestrator_resources=self.orchestrator_resources,
                        )

        raise ValueError(f"Step {step!r} not found in pipeline {self.name!r}.")

    def run(
        self,
        mode: Literal["local", "slurm"] = "local",
        cluster: SlurmCluster | None = None,
        project_job: str | None = None,
        rootdir: Path | str | None = None,
    ) -> str:
        """Execute the pipeline.

        Parameters
        ----------
        mode : Literal["local", "slurm"]
            Execution mode: ``"local"`` or ``"slurm"``.
        cluster : SlurmCluster, optional
            Cluster configuration (required when
            ``mode="slurm"``).
        project_job : str, optional
            Job identifier used as the top-level run folder
            (``rootdir / project_job``). Defaults to
            ``str(int(time.time()))``. Pass an existing ID to
            resume a previous run.
        rootdir : Path | str, optional
            Root directory under which the job folder is created.
            Defaults to the current working directory.

        Returns
        -------
        str
            The project job ID.
        """
        # Lazy imports to avoid circular dependency:
        # pipeline -> executors -> pipeline
        from .executors.local import LocalExecutor
        from .executors.slurm import SlurmExecutor

        _rootdir: Path | None = Path(rootdir) if rootdir is not None else None

        if mode == "local":
            executor = LocalExecutor()
        elif mode == "slurm":
            if cluster is None:
                raise ValueError(
                    "A SlurmCluster must be provided for mode='slurm'."
                )
            executor = SlurmExecutor(cluster=cluster)
        else:
            raise ValueError(f"Unknown mode {mode!r}. Use 'local' or 'slurm'.")

        return executor.run(
            pipeline=self,
            project_job=project_job,
            rootdir=_rootdir,
        )

    def generate_scripts(
        self,
        cluster: SlurmCluster,
        project_job: str = "PLACEHOLDER",
        rootdir: Path | str | None = None,
    ) -> dict[str, str]:
        """Generate SLURM scripts without submitting them.

        Useful for inspecting or manually editing scripts before
        submission. For pipelines containing loops, the result
        includes orchestrator scripts and loop body step scripts
        with ``$F3DASM_ITERATION`` as the iteration placeholder.

        Parameters
        ----------
        cluster : SlurmCluster
            Cluster configuration.
        project_job : str
            Project job ID to embed in scripts.
        rootdir : Path | str, optional
            Root directory under which the job folder is created.
            Defaults to the current working directory.

        Returns
        -------
        dict[str, str]
            Mapping of label to rendered sbatch script content.
        """
        from .executors.slurm import SlurmExecutor

        _rootdir: Path | None = Path(rootdir) if rootdir is not None else None
        executor = SlurmExecutor(cluster=cluster)
        return executor.generate_scripts(
            pipeline=self,
            project_job=project_job,
            rootdir=_rootdir,
        )
name = '' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

_flatten() -> list[tuple[Step, int, int]] ¤

Flatten the pipeline into an ordered list of steps.

Walks the steps list and expands any :class:Loop elements into repeated step entries.

Returns:

Type Description
list[tuple[Step, int, int]]

Each entry is (step, iteration, n_iterations) where iteration is the current loop iteration (0 for non-looped steps) and n_iterations is the total (1 for non-looped steps).

Source code in src/f3dasm/_src/pipeline/pipeline.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def _flatten(self) -> list[tuple[Step, int, int]]:
    """Flatten the pipeline into an ordered list of steps.

    Walks the ``steps`` list and expands any :class:`Loop`
    elements into repeated step entries.

    Returns
    -------
    list[tuple[Step, int, int]]
        Each entry is ``(step, iteration, n_iterations)``
        where ``iteration`` is the current loop iteration
        (0 for non-looped steps) and ``n_iterations`` is the
        total (1 for non-looped steps).
    """
    flat: list[tuple[Step, int, int]] = []
    for element in self.steps:
        if isinstance(element, Step):
            flat.append((element, 0, 1))
        elif isinstance(element, Loop):
            for i in range(element.n_iterations):
                for step in element.steps:
                    flat.append((step, i, element.n_iterations))
    return flat
from_step(step: int | str) -> Pipeline ¤

Return a new Pipeline starting from the specified step.

Parameters:

Name Type Description Default
step int | str

Step index (int) or step name (str) to start from. When a name is given and the step lives inside a :class:Loop, the whole Loop element is included.

required

Returns:

Type Description
Pipeline

A new Pipeline containing only the steps from step onwards, with the same name and resources.

Raises:

Type Description
ValueError

If a step name is given and no matching step is found.

Examples:

Resume a pipeline from the "run" step::

# Original run
job_id = pipeline.run(mode="local")

# Resume from "run", reusing the same output directory
pipeline.from_step("run").run(mode="local", project_job=job_id)
Source code in src/f3dasm/_src/pipeline/pipeline.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def from_step(self, step: int | str) -> Pipeline:
    """Return a new Pipeline starting from the specified step.

    Parameters
    ----------
    step : int | str
        Step index (int) or step name (str) to start from.
        When a name is given and the step lives inside a
        :class:`Loop`, the whole Loop element is included.

    Returns
    -------
    Pipeline
        A new Pipeline containing only the steps from
        ``step`` onwards, with the same name and resources.

    Raises
    ------
    ValueError
        If a step name is given and no matching step is found.

    Examples
    --------
    Resume a pipeline from the ``"run"`` step::

        # Original run
        job_id = pipeline.run(mode="local")

        # Resume from "run", reusing the same output directory
        pipeline.from_step("run").run(mode="local", project_job=job_id)
    """
    if isinstance(step, int):
        return Pipeline(
            name=self.name,
            steps=self.steps[step:],
            orchestrator_resources=self.orchestrator_resources,
        )

    # str: find the PipelineElement that contains the named step
    for i, element in enumerate(self.steps):
        if isinstance(element, Step) and element.name == step:
            return Pipeline(
                name=self.name,
                steps=self.steps[i:],
                orchestrator_resources=self.orchestrator_resources,
            )
        elif isinstance(element, Loop):
            for loop_step in element.steps:
                if loop_step.name == step:
                    return Pipeline(
                        name=self.name,
                        steps=self.steps[i:],
                        orchestrator_resources=self.orchestrator_resources,
                    )

    raise ValueError(f"Step {step!r} not found in pipeline {self.name!r}.")
generate_scripts(cluster: SlurmCluster, project_job: str = 'PLACEHOLDER', rootdir: Path | str | None = None) -> dict[str, str] ¤

Generate SLURM scripts without submitting them.

Useful for inspecting or manually editing scripts before submission. For pipelines containing loops, the result includes orchestrator scripts and loop body step scripts with $F3DASM_ITERATION as the iteration placeholder.

Parameters:

Name Type Description Default
cluster SlurmCluster

Cluster configuration.

required
project_job str

Project job ID to embed in scripts.

'PLACEHOLDER'
rootdir Path | str

Root directory under which the job folder is created. Defaults to the current working directory.

None

Returns:

Type Description
dict[str, str]

Mapping of label to rendered sbatch script content.

Source code in src/f3dasm/_src/pipeline/pipeline.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def generate_scripts(
    self,
    cluster: SlurmCluster,
    project_job: str = "PLACEHOLDER",
    rootdir: Path | str | None = None,
) -> dict[str, str]:
    """Generate SLURM scripts without submitting them.

    Useful for inspecting or manually editing scripts before
    submission. For pipelines containing loops, the result
    includes orchestrator scripts and loop body step scripts
    with ``$F3DASM_ITERATION`` as the iteration placeholder.

    Parameters
    ----------
    cluster : SlurmCluster
        Cluster configuration.
    project_job : str
        Project job ID to embed in scripts.
    rootdir : Path | str, optional
        Root directory under which the job folder is created.
        Defaults to the current working directory.

    Returns
    -------
    dict[str, str]
        Mapping of label to rendered sbatch script content.
    """
    from .executors.slurm import SlurmExecutor

    _rootdir: Path | None = Path(rootdir) if rootdir is not None else None
    executor = SlurmExecutor(cluster=cluster)
    return executor.generate_scripts(
        pipeline=self,
        project_job=project_job,
        rootdir=_rootdir,
    )
run(mode: Literal['local', 'slurm'] = 'local', cluster: SlurmCluster | None = None, project_job: str | None = None, rootdir: Path | str | None = None) -> str ¤

Execute the pipeline.

Parameters:

Name Type Description Default
mode Literal['local', 'slurm']

Execution mode: "local" or "slurm".

'local'
cluster SlurmCluster

Cluster configuration (required when mode="slurm").

None
project_job str

Job identifier used as the top-level run folder (rootdir / project_job). Defaults to str(int(time.time())). Pass an existing ID to resume a previous run.

None
rootdir Path | str

Root directory under which the job folder is created. Defaults to the current working directory.

None

Returns:

Type Description
str

The project job ID.

Source code in src/f3dasm/_src/pipeline/pipeline.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def run(
    self,
    mode: Literal["local", "slurm"] = "local",
    cluster: SlurmCluster | None = None,
    project_job: str | None = None,
    rootdir: Path | str | None = None,
) -> str:
    """Execute the pipeline.

    Parameters
    ----------
    mode : Literal["local", "slurm"]
        Execution mode: ``"local"`` or ``"slurm"``.
    cluster : SlurmCluster, optional
        Cluster configuration (required when
        ``mode="slurm"``).
    project_job : str, optional
        Job identifier used as the top-level run folder
        (``rootdir / project_job``). Defaults to
        ``str(int(time.time()))``. Pass an existing ID to
        resume a previous run.
    rootdir : Path | str, optional
        Root directory under which the job folder is created.
        Defaults to the current working directory.

    Returns
    -------
    str
        The project job ID.
    """
    # Lazy imports to avoid circular dependency:
    # pipeline -> executors -> pipeline
    from .executors.local import LocalExecutor
    from .executors.slurm import SlurmExecutor

    _rootdir: Path | None = Path(rootdir) if rootdir is not None else None

    if mode == "local":
        executor = LocalExecutor()
    elif mode == "slurm":
        if cluster is None:
            raise ValueError(
                "A SlurmCluster must be provided for mode='slurm'."
            )
        executor = SlurmExecutor(cluster=cluster)
    else:
        raise ValueError(f"Unknown mode {mode!r}. Use 'local' or 'slurm'.")

    return executor.run(
        pipeline=self,
        project_job=project_job,
        rootdir=_rootdir,
    )

f3dasm.Step ¤

A single step in a :class:Pipeline.

Parameters:

Name Type Description Default
block Block | DataGenerator | Callable

The operation to execute. Can be a :class:Block, :class:DataGenerator, or a plain callable with signature (project_dir, project_job, **kwargs) -> None.

required
name str

Human-readable name for this step (used in logs and SLURM job names).

''
parallel bool

If True, this step is executed as a SLURM array job (or with multiprocessing locally). Only meaningful when the block is a :class:DataGenerator. The array size is determined at submission time from the number of open experiments in the step's ExperimentData on disk, capped by resources.max_array_size.

False
resources SlurmResources

SLURM resource requirements for this step.

<factory>
dependency Literal['afterok', 'afterany']

SLURM dependency type for the previous step. Must be "afterok" or "afterany".

'afterok'
project_dir str

Sub-path relative to the job directory where ExperimentData is loaded and stored for this step. Defaults to '.' (the job directory itself).

'.'
kwargs dict[str, Any]

Extra keyword arguments forwarded to the block's call method at execution time.

<factory>
Source code in src/f3dasm/_src/pipeline/pipeline.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class Step:
    """A single step in a :class:`Pipeline`.

    Parameters
    ----------
    block : Block | DataGenerator | Callable
        The operation to execute. Can be a :class:`Block`,
        :class:`DataGenerator`, or a plain callable with signature
        ``(project_dir, project_job, **kwargs) -> None``.
    name : str
        Human-readable name for this step (used in logs and SLURM
        job names).
    parallel : bool
        If ``True``, this step is executed as a SLURM array job
        (or with multiprocessing locally). Only meaningful when
        the block is a :class:`DataGenerator`. The array size is
        determined at submission time from the number of open
        experiments in the step's ExperimentData on disk, capped
        by ``resources.max_array_size``.
    resources : SlurmResources
        SLURM resource requirements for this step.
    dependency : Literal["afterok", "afterany"]
        SLURM dependency type for the previous step.
        Must be ``"afterok"`` or ``"afterany"``.
    project_dir : str
        Sub-path relative to the job directory where
        ExperimentData is loaded and stored for this step.
        Defaults to ``'.'`` (the job directory itself).
    kwargs : dict[str, Any]
        Extra keyword arguments forwarded to the block's
        ``call`` method at execution time.
    """

    block: Block | DataGenerator | Callable
    name: str = ""
    parallel: bool = False
    resources: SlurmResources = field(default_factory=SlurmResources)
    dependency: Literal["afterok", "afterany"] = "afterok"
    project_dir: str = "."
    kwargs: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self) -> None:
        """Validate the dependency type.

        Raises
        ------
        ValueError
            If ``dependency`` is not one of the valid SLURM
            dependency types.
        """
        if self.dependency not in VALID_DEPENDENCIES:
            raise ValueError(
                f"Invalid dependency {self.dependency!r}. "
                f"Must be one of {VALID_DEPENDENCIES}."
            )
dependency = 'afterok' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

name = '' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

parallel = False class-attribute ¤

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

project_dir = '.' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

f3dasm.Loop ¤

A group of steps that are repeated multiple times.

Use Loop inside a :class:Pipeline's steps list to express iterative workflows (e.g. train → evaluate loops).

In SLURM mode, a self-resubmitting orchestrator script is generated so that only one iteration's jobs are queued at a time, preventing cluster job-count limits from being exceeded.

Parameters:

Name Type Description Default
n_iterations int

Number of times to repeat the inner steps.

1
steps list[Step]

The steps to repeat each iteration.

<factory>

Examples:

::

Pipeline(
    name="online_rl",
    steps=[
        Step("create", block=create_block),
        Loop(n_iterations=10, steps=[
            Step("run", block=generator,
                 parallel=True),
            Step("post", block=update_block),
        ]),
    ],
)
Source code in src/f3dasm/_src/pipeline/loop.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@dataclass
class Loop:
    """A group of steps that are repeated multiple times.

    Use ``Loop`` inside a :class:`Pipeline`'s ``steps`` list to
    express iterative workflows (e.g. train → evaluate loops).

    In SLURM mode, a self-resubmitting orchestrator script is
    generated so that only one iteration's jobs are queued at a
    time, preventing cluster job-count limits from being exceeded.

    Parameters
    ----------
    n_iterations : int
        Number of times to repeat the inner steps.
    steps : list[Step]
        The steps to repeat each iteration.

    Examples
    --------
    ::

        Pipeline(
            name="online_rl",
            steps=[
                Step("create", block=create_block),
                Loop(n_iterations=10, steps=[
                    Step("run", block=generator,
                         parallel=True),
                    Step("post", block=update_block),
                ]),
            ],
        )
    """

    n_iterations: int = 1
    steps: list[Step] = field(default_factory=list)
n_iterations = 1 class-attribute ¤

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

Resources¤

f3dasm.SlurmCluster ¤

Configuration for a specific SLURM cluster.

The cluster is assumed to be a POSIX system (Linux): generated sbatch scripts use POSIX-style paths and bash shell syntax, and sbatch must be available on the submission host's PATH.

Parameters:

Name Type Description Default
partition str

SLURM partition name.

'batch'
account str

SLURM account string.

'default'
env_setup list[str]

Shell commands to run before the Python command (e.g. module loads, unset LD_LIBRARY_PATH).

<factory>
env_vars dict[str, str]

Environment variables exported before execution.

<factory>
runner str

Command prefix for running Python scripts (e.g. "uv run" or "python").

'python'
log_dir str

Log directory template. May contain {project_job}.

'logs/{project_job}'
Source code in src/f3dasm/_src/pipeline/resources.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@dataclass
class SlurmCluster:
    """Configuration for a specific SLURM cluster.

    The cluster is assumed to be a POSIX system (Linux): generated
    sbatch scripts use POSIX-style paths and bash shell syntax,
    and ``sbatch`` must be available on the submission host's
    ``PATH``.

    Parameters
    ----------
    partition : str
        SLURM partition name.
    account : str
        SLURM account string.
    env_setup : list[str]
        Shell commands to run before the Python command
        (e.g. module loads, ``unset LD_LIBRARY_PATH``).
    env_vars : dict[str, str]
        Environment variables exported before execution.
    runner : str
        Command prefix for running Python scripts
        (e.g. ``"uv run"`` or ``"python"``).
    log_dir : str
        Log directory template. May contain ``{project_job}``.
    """

    partition: str = "batch"
    account: str = "default"
    env_setup: list[str] = field(default_factory=list)
    env_vars: dict[str, str] = field(default_factory=dict)
    runner: str = "python"
    log_dir: str = "logs/{project_job}"

    @classmethod
    def from_yaml(cls, config: DictConfig) -> SlurmCluster:
        """Create a SlurmCluster from a Hydra DictConfig.

        Parameters
        ----------
        config : DictConfig
            Hydra DictConfig for the cluster section, e.g. ``cfg.cluster``.

        Returns
        -------
        SlurmCluster

        Examples
        --------
        >>> cluster = SlurmCluster.from_yaml(cfg.cluster)
        """
        _dict = OmegaConf.to_container(config, resolve=True)
        _dict.pop("enabled", None)
        return cls(**_dict)
account = 'default' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

log_dir = 'logs/{project_job}' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

partition = 'batch' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

runner = 'python' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

from_yaml(config: DictConfig) -> SlurmCluster classmethod ¤

Create a SlurmCluster from a Hydra DictConfig.

Parameters:

Name Type Description Default
config DictConfig

Hydra DictConfig for the cluster section, e.g. cfg.cluster.

required

Returns:

Type Description
SlurmCluster

Examples:

>>> cluster = SlurmCluster.from_yaml(cfg.cluster)
Source code in src/f3dasm/_src/pipeline/resources.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@classmethod
def from_yaml(cls, config: DictConfig) -> SlurmCluster:
    """Create a SlurmCluster from a Hydra DictConfig.

    Parameters
    ----------
    config : DictConfig
        Hydra DictConfig for the cluster section, e.g. ``cfg.cluster``.

    Returns
    -------
    SlurmCluster

    Examples
    --------
    >>> cluster = SlurmCluster.from_yaml(cfg.cluster)
    """
    _dict = OmegaConf.to_container(config, resolve=True)
    _dict.pop("enabled", None)
    return cls(**_dict)

f3dasm.SlurmResources ¤

Resource requirements for a single pipeline step on SLURM.

Parameters:

Name Type Description Default
time str

Wall-clock time limit (e.g. "01:00:00").

'01:00:00'
mem str

Memory per node (e.g. "4G").

'4G'
cpus_per_task int

Number of CPUs per task.

1
nodes int

Number of nodes.

1
max_array_size int

Maximum SLURM array size (capped by cluster policy).

900
max_concurrent int

Maximum number of concurrently running array tasks.

64
extra_sbatch dict[str, str]

Arbitrary extra #SBATCH directives as key-value pairs.

<factory>
Source code in src/f3dasm/_src/pipeline/resources.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass
class SlurmResources:
    """Resource requirements for a single pipeline step on SLURM.

    Parameters
    ----------
    time : str
        Wall-clock time limit (e.g. ``"01:00:00"``).
    mem : str
        Memory per node (e.g. ``"4G"``).
    cpus_per_task : int
        Number of CPUs per task.
    nodes : int
        Number of nodes.
    max_array_size : int
        Maximum SLURM array size (capped by cluster policy).
    max_concurrent : int
        Maximum number of concurrently running array tasks.
    extra_sbatch : dict[str, str]
        Arbitrary extra ``#SBATCH`` directives as key-value
        pairs.
    """

    time: str = "01:00:00"
    mem: str = "4G"
    cpus_per_task: int = 1
    nodes: int = 1
    max_array_size: int = 900
    max_concurrent: int = 64
    extra_sbatch: dict[str, str] = field(default_factory=dict)
cpus_per_task = 1 class-attribute ¤

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

max_array_size = 900 class-attribute ¤

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

max_concurrent = 64 class-attribute ¤

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

mem = '4G' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

nodes = 1 class-attribute ¤

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

time = '01:00:00' class-attribute ¤

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Built-in blocks¤

f3dasm.CollectArrayResults ¤

Collect results from SLURM array job JSON files.

This block wraps the common post-processing pattern of calling :meth:ExperimentData.update_from_experimentssample_json and then cleaning up the experiment_sample directory.

Parameters:

Name Type Description Default
cleanup bool

Whether to remove the experiment_sample directory after collection. Defaults to True.

True
Source code in src/f3dasm/_src/pipeline/blocks.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class CollectArrayResults(Block):
    """Collect results from SLURM array job JSON files.

    This block wraps the common post-processing pattern of calling
    :meth:`ExperimentData.update_from_experimentssample_json` and
    then cleaning up the ``experiment_sample`` directory.

    Parameters
    ----------
    cleanup : bool
        Whether to remove the ``experiment_sample`` directory
        after collection. Defaults to ``True``.
    """

    def __init__(self, cleanup: bool = True) -> None:
        self.cleanup = cleanup

    def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
        """Collect array results and optionally clean up.

        Parameters
        ----------
        data : ExperimentData
            The experiment data to update.
        **kwargs : dict
            Unused.

        Returns
        -------
        ExperimentData
            A new ExperimentData with collected results merged in.
        """
        logger.info("Collecting array job results from JSON files")

        # update_from_experimentssample_json returns a new
        # ExperimentData by default (in_place=False), so we
        # capture and return the result.
        data = data.update_from_experimentssample_json()

        if self.cleanup and data._project_dir is not None:
            sample_dir: Path = data._project_dir / "experiment_sample"
            if sample_dir.exists():
                logger.info(f"Cleaning up {sample_dir}")
                shutil.rmtree(sample_dir)

        return data
_abc_impl = <_abc._abc_data object at 0x75b0efef6e80> class-attribute ¤

Internal state held by ABC machinery.

arm(data: ExperimentData) -> None ¤

Prepare the block with a given ExperimentData.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to be used by the block.

required
Notes

This method can be inherited by a subclasses to prepare the block with the given experiment data. It is not required to implement this method in the subclass.

Source code in src/f3dasm/_src/core.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def arm(self, data: ExperimentData) -> None:
    """
    Prepare the block with a given ExperimentData.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to be used by the block.

    Notes
    -----
    This method can be inherited by a subclasses to prepare the block
    with the given experiment data. It is not required to implement this
    method in the subclass.
    """
    pass
from_yaml(init_config: DictConfig, call_config: Optional[DictConfig] = None) -> Block classmethod ¤

Create a block from a YAML configuration.

Parameters:

Name Type Description Default
init_config DictConfig

The configuration for the block's initialization.

required
call_config DictConfig

The configuration for the block's call method, by default None

None

Returns:

Type Description
Block

The block object created from the configuration.

Source code in src/f3dasm/_src/core.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@classmethod
def from_yaml(
    cls, init_config: DictConfig, call_config: Optional[DictConfig] = None
) -> Block:
    """
    Create a block from a YAML configuration.

    Parameters
    ----------
    init_config : DictConfig
        The configuration for the block's initialization.
    call_config : DictConfig, optional
        The configuration for the block's call method, by default None

    Returns
    -------
    Block
        The block object created from the configuration.
    """
    block: Block = instantiate(init_config)
    if call_config is not None:
        block.call = partial(block.call, **call_config)

    return block
loop(n_iterations: int) -> LoopBlock ¤

Repeat this block n_iterations times.

Parameters:

Name Type Description Default
n_iterations int

Number of times to run the block.

required

Returns:

Type Description
LoopBlock

A new block that runs self n_iterations times, passing the output of each iteration as the input to the next.

Examples:

>>> step = update_step >> data_generator
>>> data = step.loop(50).call(initial_data)
Source code in src/f3dasm/_src/core.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def loop(self, n_iterations: int) -> LoopBlock:
    """Repeat this block ``n_iterations`` times.

    Parameters
    ----------
    n_iterations : int
        Number of times to run the block.

    Returns
    -------
    LoopBlock
        A new block that runs ``self`` ``n_iterations`` times, passing
        the output of each iteration as the input to the next.

    Examples
    --------
    >>> step = update_step >> data_generator
    >>> data = step.loop(50).call(initial_data)
    """
    return LoopBlock(block=self, n_iterations=n_iterations)
call(data: ExperimentData, **kwargs) -> ExperimentData ¤

Collect array results and optionally clean up.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to update.

required
**kwargs dict

Unused.

required

Returns:

Type Description
ExperimentData

A new ExperimentData with collected results merged in.

Source code in src/f3dasm/_src/pipeline/blocks.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
    """Collect array results and optionally clean up.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to update.
    **kwargs : dict
        Unused.

    Returns
    -------
    ExperimentData
        A new ExperimentData with collected results merged in.
    """
    logger.info("Collecting array job results from JSON files")

    # update_from_experimentssample_json returns a new
    # ExperimentData by default (in_place=False), so we
    # capture and return the result.
    data = data.update_from_experimentssample_json()

    if self.cleanup and data._project_dir is not None:
        sample_dir: Path = data._project_dir / "experiment_sample"
        if sample_dir.exists():
            logger.info(f"Cleaning up {sample_dir}")
            shutil.rmtree(sample_dir)

    return data