Pipeline¤
The Pipeline chains multiple Blocks into an automated workflow. Use Step for sequential operations and Loop for iterative processes. Supports both local execution and HPC clusters via SlurmCluster.
See also
- Tutorial: Building a Pipeline
- Core Concepts: Pipeline
f3dasm.Pipeline
¤
A composable, executable pipeline of f3dasm blocks.
A pipeline is an ordered sequence of :class:Step and
:class:Loop objects. Steps run once; loops repeat their
inner steps for a given number of iterations.
In SLURM mode, a single self-resubmitting orchestrator script manages the entire pipeline, submitting one step (or one loop iteration) at a time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the pipeline (used for directory naming and SLURM job prefixes). |
''
|
steps
|
list[Step | Loop]
|
The ordered sequence of steps and loops. |
<factory>
|
orchestrator_resources
|
SlurmResources
|
SLURM resource requirements for the orchestrator job. If
|
None
|
Examples:
Simple three-phase pipeline::
pipeline = Pipeline(
name="my_experiment",
steps=[
Step("create", block=my_create_block),
Step("run", block=my_generator, parallel=True),
Step("post", block=my_post_block),
],
)
pipeline.run(mode="local")
Pipeline with a loop::
pipeline = Pipeline(
name="online_rl",
steps=[
Step("create", block=create_block),
Loop(n_iterations=10, steps=[
Step("run", block=generator, parallel=True),
Step("post", block=update_block),
]),
],
)
pipeline.run(
mode="slurm",
cluster=SlurmCluster(
partition="batch",
account="my_account",
),
rootdir="/scratch/user",
)
Source code in src/f3dasm/_src/pipeline/pipeline.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 | |
name = ''
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
_flatten() -> list[tuple[Step, int, int]]
¤
Flatten the pipeline into an ordered list of steps.
Walks the steps list and expands any :class:Loop
elements into repeated step entries.
Returns:
| Type | Description |
|---|---|
list[tuple[Step, int, int]]
|
Each entry is |
Source code in src/f3dasm/_src/pipeline/pipeline.py
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | |
from_step(step: int | str) -> Pipeline
¤
Return a new Pipeline starting from the specified step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step
|
int | str
|
Step index (int) or step name (str) to start from.
When a name is given and the step lives inside a
:class: |
required |
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new Pipeline containing only the steps from
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a step name is given and no matching step is found. |
Examples:
Resume a pipeline from the "run" step::
# Original run
job_id = pipeline.run(mode="local")
# Resume from "run", reusing the same output directory
pipeline.from_step("run").run(mode="local", project_job=job_id)
Source code in src/f3dasm/_src/pipeline/pipeline.py
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | |
generate_scripts(cluster: SlurmCluster, project_job: str = 'PLACEHOLDER', rootdir: Path | str | None = None) -> dict[str, str]
¤
Generate SLURM scripts without submitting them.
Useful for inspecting or manually editing scripts before
submission. For pipelines containing loops, the result
includes orchestrator scripts and loop body step scripts
with $F3DASM_ITERATION as the iteration placeholder.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cluster
|
SlurmCluster
|
Cluster configuration. |
required |
project_job
|
str
|
Project job ID to embed in scripts. |
'PLACEHOLDER'
|
rootdir
|
Path | str
|
Root directory under which the job folder is created. Defaults to the current working directory. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Mapping of label to rendered sbatch script content. |
Source code in src/f3dasm/_src/pipeline/pipeline.py
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 | |
run(mode: Literal['local', 'slurm'] = 'local', cluster: SlurmCluster | None = None, project_job: str | None = None, rootdir: Path | str | None = None) -> str
¤
Execute the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode
|
Literal['local', 'slurm']
|
Execution mode: |
'local'
|
cluster
|
SlurmCluster
|
Cluster configuration (required when
|
None
|
project_job
|
str
|
Job identifier used as the top-level run folder
( |
None
|
rootdir
|
Path | str
|
Root directory under which the job folder is created. Defaults to the current working directory. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The project job ID. |
Source code in src/f3dasm/_src/pipeline/pipeline.py
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | |
f3dasm.Step
¤
A single step in a :class:Pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block
|
Block | DataGenerator | Callable
|
The operation to execute. Can be a :class: |
required |
name
|
str
|
Human-readable name for this step (used in logs and SLURM job names). |
''
|
parallel
|
bool
|
If |
False
|
resources
|
SlurmResources
|
SLURM resource requirements for this step. |
<factory>
|
dependency
|
Literal['afterok', 'afterany']
|
SLURM dependency type for the previous step.
Must be |
'afterok'
|
project_dir
|
str
|
Sub-path relative to the job directory where
ExperimentData is loaded and stored for this step.
Defaults to |
'.'
|
kwargs
|
dict[str, Any]
|
Extra keyword arguments forwarded to the block's
|
<factory>
|
Source code in src/f3dasm/_src/pipeline/pipeline.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | |
dependency = 'afterok'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
name = ''
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
parallel = False
class-attribute
¤
bool(x) -> bool
Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.
project_dir = '.'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
f3dasm.Loop
¤
A group of steps that are repeated multiple times.
Use Loop inside a :class:Pipeline's steps list to
express iterative workflows (e.g. train → evaluate loops).
In SLURM mode, a self-resubmitting orchestrator script is generated so that only one iteration's jobs are queued at a time, preventing cluster job-count limits from being exceeded.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_iterations
|
int
|
Number of times to repeat the inner steps. |
1
|
steps
|
list[Step]
|
The steps to repeat each iteration. |
<factory>
|
Examples:
::
Pipeline(
name="online_rl",
steps=[
Step("create", block=create_block),
Loop(n_iterations=10, steps=[
Step("run", block=generator,
parallel=True),
Step("post", block=update_block),
]),
],
)
Source code in src/f3dasm/_src/pipeline/loop.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | |
n_iterations = 1
class-attribute
¤
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
int('0b100', base=0) 4
Resources¤
f3dasm.SlurmCluster
¤
Configuration for a specific SLURM cluster.
The cluster is assumed to be a POSIX system (Linux): generated
sbatch scripts use POSIX-style paths and bash shell syntax,
and sbatch must be available on the submission host's
PATH.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition
|
str
|
SLURM partition name. |
'batch'
|
account
|
str
|
SLURM account string. |
'default'
|
env_setup
|
list[str]
|
Shell commands to run before the Python command
(e.g. module loads, |
<factory>
|
env_vars
|
dict[str, str]
|
Environment variables exported before execution. |
<factory>
|
runner
|
str
|
Command prefix for running Python scripts
(e.g. |
'python'
|
log_dir
|
str
|
Log directory template. May contain |
'logs/{project_job}'
|
Source code in src/f3dasm/_src/pipeline/resources.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | |
account = 'default'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
log_dir = 'logs/{project_job}'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
partition = 'batch'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
runner = 'python'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
from_yaml(config: DictConfig) -> SlurmCluster
classmethod
¤
Create a SlurmCluster from a Hydra DictConfig.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DictConfig
|
Hydra DictConfig for the cluster section, e.g. |
required |
Returns:
| Type | Description |
|---|---|
SlurmCluster
|
|
Examples:
>>> cluster = SlurmCluster.from_yaml(cfg.cluster)
Source code in src/f3dasm/_src/pipeline/resources.py
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | |
f3dasm.SlurmResources
¤
Resource requirements for a single pipeline step on SLURM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
time
|
str
|
Wall-clock time limit (e.g. |
'01:00:00'
|
mem
|
str
|
Memory per node (e.g. |
'4G'
|
cpus_per_task
|
int
|
Number of CPUs per task. |
1
|
nodes
|
int
|
Number of nodes. |
1
|
max_array_size
|
int
|
Maximum SLURM array size (capped by cluster policy). |
900
|
max_concurrent
|
int
|
Maximum number of concurrently running array tasks. |
64
|
extra_sbatch
|
dict[str, str]
|
Arbitrary extra |
<factory>
|
Source code in src/f3dasm/_src/pipeline/resources.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | |
cpus_per_task = 1
class-attribute
¤
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
int('0b100', base=0) 4
max_array_size = 900
class-attribute
¤
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
int('0b100', base=0) 4
max_concurrent = 64
class-attribute
¤
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
int('0b100', base=0) 4
mem = '4G'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
nodes = 1
class-attribute
¤
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
int('0b100', base=0) 4
time = '01:00:00'
class-attribute
¤
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Built-in blocks¤
f3dasm.CollectArrayResults
¤
Collect results from SLURM array job JSON files.
This block wraps the common post-processing pattern of calling
:meth:ExperimentData.update_from_experimentssample_json and
then cleaning up the experiment_sample directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cleanup
|
bool
|
Whether to remove the |
True
|
Source code in src/f3dasm/_src/pipeline/blocks.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | |
_abc_impl = <_abc._abc_data object at 0x75b0efef6e80>
class-attribute
¤
Internal state held by ABC machinery.
arm(data: ExperimentData) -> None
¤
Prepare the block with a given ExperimentData.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
ExperimentData
|
The experiment data to be used by the block. |
required |
Notes
This method can be inherited by a subclasses to prepare the block with the given experiment data. It is not required to implement this method in the subclass.
Source code in src/f3dasm/_src/core.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | |
from_yaml(init_config: DictConfig, call_config: Optional[DictConfig] = None) -> Block
classmethod
¤
Create a block from a YAML configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
init_config
|
DictConfig
|
The configuration for the block's initialization. |
required |
call_config
|
DictConfig
|
The configuration for the block's call method, by default None |
None
|
Returns:
| Type | Description |
|---|---|
Block
|
The block object created from the configuration. |
Source code in src/f3dasm/_src/core.py
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | |
loop(n_iterations: int) -> LoopBlock
¤
Repeat this block n_iterations times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_iterations
|
int
|
Number of times to run the block. |
required |
Returns:
| Type | Description |
|---|---|
LoopBlock
|
A new block that runs |
Examples:
>>> step = update_step >> data_generator
>>> data = step.loop(50).call(initial_data)
Source code in src/f3dasm/_src/core.py
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | |
call(data: ExperimentData, **kwargs) -> ExperimentData
¤
Collect array results and optionally clean up.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
ExperimentData
|
The experiment data to update. |
required |
**kwargs
|
dict
|
Unused. |
required |
Returns:
| Type | Description |
|---|---|
ExperimentData
|
A new ExperimentData with collected results merged in. |
Source code in src/f3dasm/_src/pipeline/blocks.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | |