Skip to content

Data-driven process¤

Block is the base class for all computational units in f3dasm. Samplers, data generators, and optimizer update steps are all plain Block subclasses; they compose with the >> operator into a ChainedBlock and can be repeated with .loop(n) to produce a LoopBlock. Use the @datagenerator decorator to quickly create blocks from functions.

f3dasm.Block ¤

Abstract base class representing a single operation in a data-driven pipeline.

A Block transforms an :class:ExperimentData object and returns the updated result. Subclasses must implement the :meth:call method. Optionally, they may override :meth:arm to perform any one-time setup (e.g. fitting a surrogate model) before the block is executed.

Blocks compose with the >> operator into a :class:ChainedBlock, and can be repeated with :meth:loop to produce a :class:LoopBlock. The built-in samplers (:class:RandomUniform, :class:Latin, :class:Sobol, :class:Grid), the :class:DataGenerator, and the built-in optimizer update steps are all Block subclasses.

Examples:

Define a custom block:

>>> class MyBlock(Block):
...     def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
...         # transform data here
...         return data
Source code in src/f3dasm/_src/core.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class Block(ABC):
    """Abstract base class representing a single operation in a data-driven
    pipeline.

    A Block transforms an :class:`ExperimentData` object and returns the
    updated result. Subclasses must implement the :meth:`call` method.
    Optionally, they may override :meth:`arm` to perform any one-time setup
    (e.g. fitting a surrogate model) before the block is executed.

    Blocks compose with the ``>>`` operator into a :class:`ChainedBlock`,
    and can be repeated with :meth:`loop` to produce a :class:`LoopBlock`.
    The built-in samplers (:class:`RandomUniform`, :class:`Latin`,
    :class:`Sobol`, :class:`Grid`), the :class:`DataGenerator`, and the
    built-in optimizer update steps are all Block subclasses.

    Examples
    --------
    Define a custom block:

    >>> class MyBlock(Block):
    ...     def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
    ...         # transform data here
    ...         return data
    """

    def arm(self, data: ExperimentData) -> None:
        """
        Prepare the block with a given ExperimentData.

        Parameters
        ----------
        data : ExperimentData
            The experiment data to be used by the block.

        Notes
        -----
        This method can be inherited by a subclasses to prepare the block
        with the given experiment data. It is not required to implement this
        method in the subclass.
        """
        pass

    @abstractmethod
    def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
        """
        Execute the block's operation on the ExperimentData.

        Parameters
        ----------
        data : ExperimentData
            The experiment data to process.
        **kwargs : dict
            Additional keyword arguments for the operation.

        Returns
        -------
        ExperimentData
            The processed experiment data.
        """
        pass

    def __rshift__(self, other: Block) -> ChainedBlock:
        """Chain two blocks with the ``>>`` operator.

        Parameters
        ----------
        other : Block
            The block to execute after this one.

        Returns
        -------
        ChainedBlock
            A new block that runs ``self`` then ``other``.
        """
        if isinstance(other, ChainedBlock):
            return ChainedBlock(blocks=[self, *other.blocks])
        return ChainedBlock(blocks=[self, other])

    def loop(self, n_iterations: int) -> LoopBlock:
        """Repeat this block ``n_iterations`` times.

        Parameters
        ----------
        n_iterations : int
            Number of times to run the block.

        Returns
        -------
        LoopBlock
            A new block that runs ``self`` ``n_iterations`` times, passing
            the output of each iteration as the input to the next.

        Examples
        --------
        >>> step = update_step >> data_generator
        >>> data = step.loop(50).call(initial_data)
        """
        return LoopBlock(block=self, n_iterations=n_iterations)

    @classmethod
    def from_yaml(
        cls, init_config: DictConfig, call_config: Optional[DictConfig] = None
    ) -> Block:
        """
        Create a block from a YAML configuration.

        Parameters
        ----------
        init_config : DictConfig
            The configuration for the block's initialization.
        call_config : DictConfig, optional
            The configuration for the block's call method, by default None

        Returns
        -------
        Block
            The block object created from the configuration.
        """
        block: Block = instantiate(init_config)
        if call_config is not None:
            block.call = partial(block.call, **call_config)

        return block
_abc_impl = <_abc._abc_data object at 0x75b107456480> class-attribute ¤

Internal state held by ABC machinery.

arm(data: ExperimentData) -> None ¤

Prepare the block with a given ExperimentData.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to be used by the block.

required
Notes

This method can be inherited by a subclasses to prepare the block with the given experiment data. It is not required to implement this method in the subclass.

Source code in src/f3dasm/_src/core.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def arm(self, data: ExperimentData) -> None:
    """
    Prepare the block with a given ExperimentData.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to be used by the block.

    Notes
    -----
    This method can be inherited by a subclasses to prepare the block
    with the given experiment data. It is not required to implement this
    method in the subclass.
    """
    pass
call(data: ExperimentData, **kwargs) -> ExperimentData ¤

Execute the block's operation on the ExperimentData.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to process.

required
**kwargs dict

Additional keyword arguments for the operation.

required

Returns:

Type Description
ExperimentData

The processed experiment data.

Source code in src/f3dasm/_src/core.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@abstractmethod
def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
    """
    Execute the block's operation on the ExperimentData.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to process.
    **kwargs : dict
        Additional keyword arguments for the operation.

    Returns
    -------
    ExperimentData
        The processed experiment data.
    """
    pass
from_yaml(init_config: DictConfig, call_config: Optional[DictConfig] = None) -> Block classmethod ¤

Create a block from a YAML configuration.

Parameters:

Name Type Description Default
init_config DictConfig

The configuration for the block's initialization.

required
call_config DictConfig

The configuration for the block's call method, by default None

None

Returns:

Type Description
Block

The block object created from the configuration.

Source code in src/f3dasm/_src/core.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@classmethod
def from_yaml(
    cls, init_config: DictConfig, call_config: Optional[DictConfig] = None
) -> Block:
    """
    Create a block from a YAML configuration.

    Parameters
    ----------
    init_config : DictConfig
        The configuration for the block's initialization.
    call_config : DictConfig, optional
        The configuration for the block's call method, by default None

    Returns
    -------
    Block
        The block object created from the configuration.
    """
    block: Block = instantiate(init_config)
    if call_config is not None:
        block.call = partial(block.call, **call_config)

    return block
loop(n_iterations: int) -> LoopBlock ¤

Repeat this block n_iterations times.

Parameters:

Name Type Description Default
n_iterations int

Number of times to run the block.

required

Returns:

Type Description
LoopBlock

A new block that runs self n_iterations times, passing the output of each iteration as the input to the next.

Examples:

>>> step = update_step >> data_generator
>>> data = step.loop(50).call(initial_data)
Source code in src/f3dasm/_src/core.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def loop(self, n_iterations: int) -> LoopBlock:
    """Repeat this block ``n_iterations`` times.

    Parameters
    ----------
    n_iterations : int
        Number of times to run the block.

    Returns
    -------
    LoopBlock
        A new block that runs ``self`` ``n_iterations`` times, passing
        the output of each iteration as the input to the next.

    Examples
    --------
    >>> step = update_step >> data_generator
    >>> data = step.loop(50).call(initial_data)
    """
    return LoopBlock(block=self, n_iterations=n_iterations)

f3dasm.ChainedBlock ¤

A block that runs multiple blocks in sequence.

Created automatically by the >> operator on :class:Block instances.

Parameters:

Name Type Description Default
blocks list[Block]

The blocks to execute in order.

required
Source code in src/f3dasm/_src/core.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class ChainedBlock(Block):
    """A block that runs multiple blocks in sequence.

    Created automatically by the ``>>`` operator on
    :class:`Block` instances.

    Parameters
    ----------
    blocks : list[Block]
        The blocks to execute in order.
    """

    def __init__(self, blocks: list[Block]) -> None:
        """Initialize the chained block.

        Parameters
        ----------
        blocks : list[Block]
            Ordered list of blocks to execute sequentially.
        """
        self.blocks = blocks

    def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
        """Execute all chained blocks in order.

        Parameters
        ----------
        data : ExperimentData
            The experiment data to process.
        **kwargs : dict
            Additional keyword arguments for each block.

        Returns
        -------
        ExperimentData
            The processed experiment data.
        """
        for block in self.blocks:
            data = block.call(data=data, **kwargs)
        return data

    def arm(self, data: ExperimentData) -> None:
        """Arm all chained blocks."""
        for block in self.blocks:
            block.arm(data)

    def __rshift__(self, other: Block) -> ChainedBlock:
        if isinstance(other, ChainedBlock):
            return ChainedBlock(blocks=[*self.blocks, *other.blocks])
        return ChainedBlock(blocks=[*self.blocks, other])
_abc_impl = <_abc._abc_data object at 0x75b107950d00> class-attribute ¤

Internal state held by ABC machinery.

from_yaml(init_config: DictConfig, call_config: Optional[DictConfig] = None) -> Block classmethod ¤

Create a block from a YAML configuration.

Parameters:

Name Type Description Default
init_config DictConfig

The configuration for the block's initialization.

required
call_config DictConfig

The configuration for the block's call method, by default None

None

Returns:

Type Description
Block

The block object created from the configuration.

Source code in src/f3dasm/_src/core.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@classmethod
def from_yaml(
    cls, init_config: DictConfig, call_config: Optional[DictConfig] = None
) -> Block:
    """
    Create a block from a YAML configuration.

    Parameters
    ----------
    init_config : DictConfig
        The configuration for the block's initialization.
    call_config : DictConfig, optional
        The configuration for the block's call method, by default None

    Returns
    -------
    Block
        The block object created from the configuration.
    """
    block: Block = instantiate(init_config)
    if call_config is not None:
        block.call = partial(block.call, **call_config)

    return block
loop(n_iterations: int) -> LoopBlock ¤

Repeat this block n_iterations times.

Parameters:

Name Type Description Default
n_iterations int

Number of times to run the block.

required

Returns:

Type Description
LoopBlock

A new block that runs self n_iterations times, passing the output of each iteration as the input to the next.

Examples:

>>> step = update_step >> data_generator
>>> data = step.loop(50).call(initial_data)
Source code in src/f3dasm/_src/core.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def loop(self, n_iterations: int) -> LoopBlock:
    """Repeat this block ``n_iterations`` times.

    Parameters
    ----------
    n_iterations : int
        Number of times to run the block.

    Returns
    -------
    LoopBlock
        A new block that runs ``self`` ``n_iterations`` times, passing
        the output of each iteration as the input to the next.

    Examples
    --------
    >>> step = update_step >> data_generator
    >>> data = step.loop(50).call(initial_data)
    """
    return LoopBlock(block=self, n_iterations=n_iterations)
arm(data: ExperimentData) -> None ¤

Arm all chained blocks.

Source code in src/f3dasm/_src/core.py
209
210
211
212
def arm(self, data: ExperimentData) -> None:
    """Arm all chained blocks."""
    for block in self.blocks:
        block.arm(data)
call(data: ExperimentData, **kwargs) -> ExperimentData ¤

Execute all chained blocks in order.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to process.

required
**kwargs dict

Additional keyword arguments for each block.

required

Returns:

Type Description
ExperimentData

The processed experiment data.

Source code in src/f3dasm/_src/core.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
    """Execute all chained blocks in order.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to process.
    **kwargs : dict
        Additional keyword arguments for each block.

    Returns
    -------
    ExperimentData
        The processed experiment data.
    """
    for block in self.blocks:
        data = block.call(data=data, **kwargs)
    return data

f3dasm.LoopBlock ¤

A block that repeats an inner block n_iterations times.

Each iteration passes the output of the previous call as the input to the next. Typical use is to drive an optimizer update step chained with a data generator:

loop = (update_step >> data_generator).loop(50) data = loop.call(initial_data)

Parameters:

Name Type Description Default
block Block

The inner block to repeat.

required
n_iterations int

Number of iterations to run.

required
Source code in src/f3dasm/_src/core.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
class LoopBlock(Block):
    """A block that repeats an inner block ``n_iterations`` times.

    Each iteration passes the output of the previous call as the input to
    the next. Typical use is to drive an optimizer update step chained with
    a data generator:

    >>> loop = (update_step >> data_generator).loop(50)
    >>> data = loop.call(initial_data)

    Parameters
    ----------
    block : Block
        The inner block to repeat.
    n_iterations : int
        Number of iterations to run.
    """

    def __init__(self, block: Block, n_iterations: int) -> None:
        self.block = block
        self.n_iterations = n_iterations

    def arm(self, data: ExperimentData) -> None:
        """Arm the inner block once before the loop starts."""
        self.block.arm(data)

    def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
        """Run the inner block ``n_iterations`` times sequentially.

        Parameters
        ----------
        data : ExperimentData
            The experiment data to process.
        **kwargs : dict
            Additional keyword arguments forwarded to each iteration.

        Returns
        -------
        ExperimentData
            The experiment data after ``n_iterations`` iterations.
        """
        for _ in range(self.n_iterations):
            data = self.block.call(data=data, **kwargs)
        return data
_abc_impl = <_abc._abc_data object at 0x75b107951e00> class-attribute ¤

Internal state held by ABC machinery.

from_yaml(init_config: DictConfig, call_config: Optional[DictConfig] = None) -> Block classmethod ¤

Create a block from a YAML configuration.

Parameters:

Name Type Description Default
init_config DictConfig

The configuration for the block's initialization.

required
call_config DictConfig

The configuration for the block's call method, by default None

None

Returns:

Type Description
Block

The block object created from the configuration.

Source code in src/f3dasm/_src/core.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@classmethod
def from_yaml(
    cls, init_config: DictConfig, call_config: Optional[DictConfig] = None
) -> Block:
    """
    Create a block from a YAML configuration.

    Parameters
    ----------
    init_config : DictConfig
        The configuration for the block's initialization.
    call_config : DictConfig, optional
        The configuration for the block's call method, by default None

    Returns
    -------
    Block
        The block object created from the configuration.
    """
    block: Block = instantiate(init_config)
    if call_config is not None:
        block.call = partial(block.call, **call_config)

    return block
loop(n_iterations: int) -> LoopBlock ¤

Repeat this block n_iterations times.

Parameters:

Name Type Description Default
n_iterations int

Number of times to run the block.

required

Returns:

Type Description
LoopBlock

A new block that runs self n_iterations times, passing the output of each iteration as the input to the next.

Examples:

>>> step = update_step >> data_generator
>>> data = step.loop(50).call(initial_data)
Source code in src/f3dasm/_src/core.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def loop(self, n_iterations: int) -> LoopBlock:
    """Repeat this block ``n_iterations`` times.

    Parameters
    ----------
    n_iterations : int
        Number of times to run the block.

    Returns
    -------
    LoopBlock
        A new block that runs ``self`` ``n_iterations`` times, passing
        the output of each iteration as the input to the next.

    Examples
    --------
    >>> step = update_step >> data_generator
    >>> data = step.loop(50).call(initial_data)
    """
    return LoopBlock(block=self, n_iterations=n_iterations)
arm(data: ExperimentData) -> None ¤

Arm the inner block once before the loop starts.

Source code in src/f3dasm/_src/core.py
242
243
244
def arm(self, data: ExperimentData) -> None:
    """Arm the inner block once before the loop starts."""
    self.block.arm(data)
call(data: ExperimentData, **kwargs) -> ExperimentData ¤

Run the inner block n_iterations times sequentially.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to process.

required
**kwargs dict

Additional keyword arguments forwarded to each iteration.

required

Returns:

Type Description
ExperimentData

The experiment data after n_iterations iterations.

Source code in src/f3dasm/_src/core.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def call(self, data: ExperimentData, **kwargs) -> ExperimentData:
    """Run the inner block ``n_iterations`` times sequentially.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to process.
    **kwargs : dict
        Additional keyword arguments forwarded to each iteration.

    Returns
    -------
    ExperimentData
        The experiment data after ``n_iterations`` iterations.
    """
    for _ in range(self.n_iterations):
        data = self.block.call(data=data, **kwargs)
    return data

f3dasm.DataGenerator ¤

Base class for generating output data for experiment samples.

Subclasses must implement the :meth:execute method, which receives a single :class:ExperimentSample and returns it after storing the computed outputs. The :meth:call method drives execution across a full :class:ExperimentData object and supports several parallelisation backends (sequential, multiprocessing, cluster file-lock, MPI).

Examples:

Define a custom data generator:

>>> class MyDataGenerator(DataGenerator):
...     def execute(
...         self, experiment_sample: ExperimentSample, **kwargs
...     ) -> ExperimentSample:
...         x0 = experiment_sample.input_data['x0']
...         experiment_sample.store('y', x0 ** 2)
...         return experiment_sample
Source code in src/f3dasm/_src/core.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
class DataGenerator:
    """Base class for generating output data for experiment samples.

    Subclasses must implement the :meth:`execute` method, which receives a
    single :class:`ExperimentSample` and returns it after storing the computed
    outputs.  The :meth:`call` method drives execution across a full
    :class:`ExperimentData` object and supports several parallelisation
    backends (sequential, multiprocessing, cluster file-lock, MPI).

    Examples
    --------
    Define a custom data generator:

    >>> class MyDataGenerator(DataGenerator):
    ...     def execute(
    ...         self, experiment_sample: ExperimentSample, **kwargs
    ...     ) -> ExperimentSample:
    ...         x0 = experiment_sample.input_data['x0']
    ...         experiment_sample.store('y', x0 ** 2)
    ...         return experiment_sample
    """

    def arm(self, data: ExperimentData) -> None:
        """Prepare the data generator before execution.

        Parameters
        ----------
        data : ExperimentData
            The experiment data that the generator will operate on.

        Notes
        -----
        Override this method in a subclass to perform one-time setup that
        requires access to the full :class:`ExperimentData` (e.g. fitting a
        surrogate model). The default implementation does nothing.
        """
        pass

    # =========================================================================

    @abstractmethod
    def execute(
        self, experiment_sample: ExperimentSample, **kwargs
    ) -> ExperimentSample:
        """Interface function that handles the execution of the data generator

        Parameters
        ----------
        experiment_sample : ExperimentSample
            The experiment sample to run the data generator on
        kwargs : dict
            The optional keyword arguments to pass to the function

        Returns
        -------
        ExperimentSample
            The experiment sample with the response of the data generator
            saved in the experiment_sample

        Raises
        ------
        NotImplementedError
            If the function is not implemented by the user
        """
        raise NotImplementedError(
            "The execute function of the DataGenerator must be "
            "implemented by the user."
        )

    def call(
        self,
        data: ExperimentData,
        mode: str = "sequential",
        pass_id: bool = False,
        **kwargs,
    ) -> ExperimentData:
        """
        Evaluate the data generator.

        Parameters
        ----------
        data : ExperimentData
            The experiment data to process.
        mode : str, optional
            The mode of evaluation, by default 'sequential'
        pass_id : bool, optional
            Whether to pass the id to the execute function, by default False
        **kwargs : dict
            The keyword arguments to pass to execute function

        Returns
        -------
        ExperimentData
            The processed data

        Raises
        ------
        ValueError
            If an invalid mode is specified

        Notes
        -----
        The mode can be one of the following:
            - 'sequential': Run the data generator sequentially
            - 'parallel': Run the data generator in parallel
            - 'cluster': Run the data generator on a cluster
            - 'mpi': Run the data generator using MPI
            - 'cluster_array': Run the data generator on a cluster array

        The 'pass_id' parameter is used to pass the id of the experiment sample
        to the execute function. This is useful when the execute function
        requires the id of the experiment sample to run. By default, this is
        set to False. The id is passed through the 'id' keyword argument.
        """
        data = data._copy(in_place=False, deep=True)

        if mode == "sequential":
            return evaluate_sequential(
                execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
            )
        elif mode == "parallel":
            return evaluate_multiprocessing(
                execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
            )
        elif mode.lower() == "cluster":
            return evaluate_cluster(
                execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
            )
        elif mode.lower() == "mpi":
            return evaluate_mpi(
                execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
            )
        elif mode.lower() == "cluster_array":
            return evaluate_cluster_array(
                execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
            )
        else:
            raise ValueError(f"Invalid parallelization mode specified: {mode}")
arm(data: ExperimentData) -> None ¤

Prepare the data generator before execution.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data that the generator will operate on.

required
Notes

Override this method in a subclass to perform one-time setup that requires access to the full :class:ExperimentData (e.g. fitting a surrogate model). The default implementation does nothing.

Source code in src/f3dasm/_src/core.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def arm(self, data: ExperimentData) -> None:
    """Prepare the data generator before execution.

    Parameters
    ----------
    data : ExperimentData
        The experiment data that the generator will operate on.

    Notes
    -----
    Override this method in a subclass to perform one-time setup that
    requires access to the full :class:`ExperimentData` (e.g. fitting a
    surrogate model). The default implementation does nothing.
    """
    pass
call(data: ExperimentData, mode: str = 'sequential', pass_id: bool = False, **kwargs) -> ExperimentData ¤

Evaluate the data generator.

Parameters:

Name Type Description Default
data ExperimentData

The experiment data to process.

required
mode str

The mode of evaluation, by default 'sequential'

'sequential'
pass_id bool

Whether to pass the id to the execute function, by default False

False
**kwargs dict

The keyword arguments to pass to execute function

required

Returns:

Type Description
ExperimentData

The processed data

Raises:

Type Description
ValueError

If an invalid mode is specified

Notes

The mode can be one of the following: - 'sequential': Run the data generator sequentially - 'parallel': Run the data generator in parallel - 'cluster': Run the data generator on a cluster - 'mpi': Run the data generator using MPI - 'cluster_array': Run the data generator on a cluster array

The 'pass_id' parameter is used to pass the id of the experiment sample to the execute function. This is useful when the execute function requires the id of the experiment sample to run. By default, this is set to False. The id is passed through the 'id' keyword argument.

Source code in src/f3dasm/_src/core.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def call(
    self,
    data: ExperimentData,
    mode: str = "sequential",
    pass_id: bool = False,
    **kwargs,
) -> ExperimentData:
    """
    Evaluate the data generator.

    Parameters
    ----------
    data : ExperimentData
        The experiment data to process.
    mode : str, optional
        The mode of evaluation, by default 'sequential'
    pass_id : bool, optional
        Whether to pass the id to the execute function, by default False
    **kwargs : dict
        The keyword arguments to pass to execute function

    Returns
    -------
    ExperimentData
        The processed data

    Raises
    ------
    ValueError
        If an invalid mode is specified

    Notes
    -----
    The mode can be one of the following:
        - 'sequential': Run the data generator sequentially
        - 'parallel': Run the data generator in parallel
        - 'cluster': Run the data generator on a cluster
        - 'mpi': Run the data generator using MPI
        - 'cluster_array': Run the data generator on a cluster array

    The 'pass_id' parameter is used to pass the id of the experiment sample
    to the execute function. This is useful when the execute function
    requires the id of the experiment sample to run. By default, this is
    set to False. The id is passed through the 'id' keyword argument.
    """
    data = data._copy(in_place=False, deep=True)

    if mode == "sequential":
        return evaluate_sequential(
            execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
        )
    elif mode == "parallel":
        return evaluate_multiprocessing(
            execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
        )
    elif mode.lower() == "cluster":
        return evaluate_cluster(
            execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
        )
    elif mode.lower() == "mpi":
        return evaluate_mpi(
            execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
        )
    elif mode.lower() == "cluster_array":
        return evaluate_cluster_array(
            execute_fn=self.execute, data=data, pass_id=pass_id, **kwargs
        )
    else:
        raise ValueError(f"Invalid parallelization mode specified: {mode}")
execute(experiment_sample: ExperimentSample, **kwargs) -> ExperimentSample ¤

Interface function that handles the execution of the data generator

Parameters:

Name Type Description Default
experiment_sample ExperimentSample

The experiment sample to run the data generator on

required
kwargs dict

The optional keyword arguments to pass to the function

required

Returns:

Type Description
ExperimentSample

The experiment sample with the response of the data generator saved in the experiment_sample

Raises:

Type Description
NotImplementedError

If the function is not implemented by the user

Source code in src/f3dasm/_src/core.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
@abstractmethod
def execute(
    self, experiment_sample: ExperimentSample, **kwargs
) -> ExperimentSample:
    """Interface function that handles the execution of the data generator

    Parameters
    ----------
    experiment_sample : ExperimentSample
        The experiment sample to run the data generator on
    kwargs : dict
        The optional keyword arguments to pass to the function

    Returns
    -------
    ExperimentSample
        The experiment sample with the response of the data generator
        saved in the experiment_sample

    Raises
    ------
    NotImplementedError
        If the function is not implemented by the user
    """
    raise NotImplementedError(
        "The execute function of the DataGenerator must be "
        "implemented by the user."
    )

f3dasm.datagenerator(output_names: Iterable[str], domain: Domain | None = None) -> Callable[[Callable[..., Any]], DataGenerator] ¤

Decorator to convert a function into a DataGenerator subclass.

The decorated function's parameter names must match either an input parameter name in the Domain or an explicit kwarg later passed to call(...); defaulted parameters are optional. A mismatch is caught at arm/call time with a clear ValueError (see issue #268). The function's return values will be stored in the ExperimentSample object under the names specified in output_names.

Parameters:

Name Type Description Default
output_names Iterable[str]

A list of names for the returned values of the decorated function. The function's return values will be stored in the ExperimentSample object under these names.

required
domain Domain

The domain describing the input and output space. If provided, it can be used for saving the data to disk or other domain-specific operations.

None

Returns:

Type Description
Callable[[Callable[..., Any]], DataGenerator]

A decorator that transforms a function into a DataGenerator subclass.

Raises:

Type Description
ValueError

If output_names is not provided.

Examples:

>>> @datagenerator(output_names=['y'])
... def my_function(x0: float, x1: float, x2: float) -> float:
...     return x0**2 + x1**2 + x2**2
...
>>> experiment_data = ExperimentData(domaind=domain)
>>> experiment_data = my_function.call(experiment_data)
Source code in src/f3dasm/_src/core.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def datagenerator(
    output_names: Iterable[str], domain: Domain | None = None
) -> Callable[[Callable[..., Any]], DataGenerator]:
    """
    Decorator to convert a function into a `DataGenerator` subclass.

    The decorated function's parameter names must match either an input
    parameter name in the Domain or an explicit kwarg later passed to
    `call(...)`; defaulted parameters are optional. A mismatch is caught
    at `arm`/`call` time with a clear ``ValueError`` (see issue #268).
    The function's return values will be stored in the `ExperimentSample`
    object under the names specified in `output_names`.

    Parameters
    ----------
    output_names : Iterable[str]
        A list of names for the returned values of the decorated function.
        The function's return values will be stored in the `ExperimentSample`
        object under these names.
    domain : Domain, optional
        The domain describing the input and output space. If provided, it can
        be used for saving the data to disk or other domain-specific
        operations.

    Returns
    -------
    Callable[[Callable[..., Any]], DataGenerator]
        A decorator that transforms a function into a `DataGenerator` subclass.

    Raises
    ------
    ValueError
        If `output_names` is not provided.

    Examples
    --------
    >>> @datagenerator(output_names=['y'])
    ... def my_function(x0: float, x1: float, x2: float) -> float:
    ...     return x0**2 + x1**2 + x2**2
    ...
    >>> experiment_data = ExperimentData(domaind=domain)
    >>> experiment_data = my_function.call(experiment_data)
    """

    if not output_names:
        raise ValueError(
            "If you provide a function as a data generator, you must "
            "provide the names of the return arguments with the `output_names`"
            "attribute."
        )

    if domain is None:
        domain = Domain()

    # If the output names is a single string, convert it to a list
    if isinstance(output_names, str):
        output_names = [output_names]

    def decorator(f: Callable[..., Any]) -> DataGenerator:
        signature = inspect.signature(f)

        class FunctionDataGenerator(DataGenerator):
            """
            Auto-generated DataGenerator subclass from a decorated function.
            """

            def _validate_signature(
                self,
                data: ExperimentData,
                supplied_kwargs: Iterable[str],
            ) -> None:
                """Check ``f``'s required args resolve to a known source.

                A decorated data generator pulls each required argument
                of ``f`` from one of three places at execute time: the
                Domain's input parameters, an explicit kwarg on
                ``DataGenerator.call``, or (if defined) a parameter
                default. When none of those covers a required arg, the
                user previously got the confusing ``TypeError: cannot
                unpack non-iterable NoneType object`` from inside
                ``_run_sample``. Catch the mismatch eagerly instead with
                a message that names the offending arguments and lists
                what's available (see issue #268).
                """
                required = {
                    name
                    for name, p in signature.parameters.items()
                    if p.default is inspect.Parameter.empty
                    and p.kind
                    not in (
                        inspect.Parameter.VAR_POSITIONAL,
                        inspect.Parameter.VAR_KEYWORD,
                    )
                }
                domain_inputs = set(data.domain.input_names)
                missing = required - domain_inputs - set(supplied_kwargs)
                if missing:
                    raise ValueError(
                        f"DataGenerator function `{f.__name__}` declares "
                        f"required argument(s) {sorted(missing)} that "
                        "are not present in the Domain and were not "
                        f"supplied via `call(...)` kwargs. Domain "
                        f"inputs are {sorted(domain_inputs)}. Rename "
                        "your function arguments to match the Domain "
                        "parameter names, give them default values, or "
                        "pass them explicitly to `call(...)`."
                    )

            def arm(self, data: ExperimentData) -> None:
                """Validate that ``f``'s required args match the Domain.

                Equivalent to the validation that ``call`` runs, but
                without knowledge of any kwargs the user will pass
                later. Use this when you want the mismatch to surface
                eagerly during pipeline setup.
                """
                self._validate_signature(data, supplied_kwargs=())

            def call(
                self, data: ExperimentData, **kwargs: Any
            ) -> ExperimentData:
                # `DataGenerator.call` does not auto-invoke `arm`, so we
                # run the signature check here -- with the user's
                # kwargs in scope -- as a safety net for the common
                # `dg.call(data=ed, ...)` pattern (issue #268).
                self._validate_signature(data, supplied_kwargs=kwargs.keys())
                return super().call(data=data, **kwargs)

            def execute(
                self, experiment_sample: ExperimentSample, **kwargs: Any
            ) -> ExperimentSample:
                """
                Executes the data generation process by calling the decorated
                function.

                Parameters
                ----------
                experiment_sample : ExperimentSample
                    The experiment sample containing input data.
                **kwargs : dict
                    Additional keyword arguments to override values in
                    `experiment_sample.input_data`.

                Returns
                -------
                ExperimentSample
                    The modified `ExperimentSample` instance with new stored
                    outputs.
                """
                _input = {
                    name: val.default
                    for name, val in signature.parameters.items()
                    if val.default is not inspect.Parameter.empty
                }

                _input.update(kwargs)

                _input.update(
                    {
                        name: experiment_sample.input_data[name]
                        for name in experiment_sample.input_data
                        if name in signature.parameters
                    }
                )

                # Call the function
                _output = f(**_input)

                # Ensure the output is iterable
                if not isinstance(_output, tuple):
                    _output = (_output,)

                # Store outputs in the experiment sample
                for name, value in zip(output_names, _output, strict=False):
                    if name in domain.output_names:
                        parameter = domain.output_space[name]
                        experiment_sample.store(
                            name=name,
                            object=value,
                            to_disk=parameter.to_disk,
                            store_function=parameter.store_function,
                            load_function=parameter.load_function,
                            load_kwargs=parameter.load_kwargs,
                        )
                    else:
                        experiment_sample.store(name=name, object=value)

                return experiment_sample

        data_generator = FunctionDataGenerator()
        data_generator.output_names = output_names
        data_generator.f = f
        return data_generator

    return decorator