Skip to content

Workflow class

Provides a Workflow class and a @stage decorator that allow to define workflows in a declarative fashion.

A Stage object is responsible for creating Hail Batch jobs and declaring outputs (files or metamist analysis objects) that are expected to be produced. Each stage acts on a Target, which can be of the following:

* SequencingGroup - an individual Sequencing Group (e.g. the CRAM of a single sample)
* Dataset - a stratification of SGs in this analysis by Metamist Project (e.g. all SGs in acute-care)
* Cohort - a stratification of SGs in this analysis by Metamist CustomCohort
* MultiCohort - a union of all SGs in this analysis by Metamist CustomCohort

A Workflow object plugs stages together by resolving dependencies between different levels accordingly. Stages are defined in this package, and chained into Workflows by their inter-Stages dependencies. Workflow names are defined in main.py, which provides a way to choose a workflow using a CLI argument.

cpg_flow.workflow.get_workflow

get_workflow(dry_run=False)
Source code in src/cpg_flow/workflow.py
149
150
151
152
153
154
def get_workflow(dry_run: bool = False) -> 'Workflow':
    global _workflow
    if _workflow is None:
        format_logger()
        _workflow = Workflow(dry_run=dry_run)
    return _workflow

cpg_flow.workflow.run_workflow

run_workflow(stages=None, wait=False, dry_run=False)
Source code in src/cpg_flow/workflow.py
157
158
159
160
161
162
163
164
def run_workflow(
    stages: list['StageDecorator'] | None = None,
    wait: bool | None = False,
    dry_run: bool = False,
) -> 'Workflow':
    wfl = get_workflow(dry_run=dry_run)
    wfl.run(stages=stages, wait=wait)
    return wfl

cpg_flow.workflow.Workflow

Workflow(stages=None, dry_run=None)

Encapsulates a Hail Batch object, stages, and a cohort of datasets of sequencing groups. Responsible for orchestrating stages.

Source code in src/cpg_flow/workflow.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def __init__(
    self,
    stages: list['StageDecorator'] | None = None,
    dry_run: bool | None = None,
):
    if _workflow is not None:
        raise ValueError(
            'Workflow already initialised. Use get_workflow() to get the instance',
        )

    self.dry_run = dry_run or get_config(True)['workflow'].get('dry_run')
    self.show_workflow = get_config()['workflow'].get('show_workflow', False)
    self.access_level = get_config()['workflow'].get('access_level', 'test')

    # TODO: should the ['dataset'] be a get? should we rename it to analysis dataset?
    analysis_dataset = get_config(True)['workflow']['dataset']
    name = get_config()['workflow'].get('name', analysis_dataset)
    description = get_config()['workflow'].get('description', name)
    self.name = slugify(name)

    self._output_version: str | None = None
    if output_version := get_config()['workflow'].get('output_version'):
        self._output_version = slugify(output_version)

    self.run_timestamp: str = get_config()['workflow'].get('run_timestamp') or timestamp()

    # Description
    if self._output_version:
        description += f': output_version={self._output_version}'
    description += f': run_timestamp={self.run_timestamp}'
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        description += f' [{sequencing_type}]'
    if not self.dry_run:
        if ds_set := set(d.name for d in get_multicohort().get_datasets()):
            description += ' ' + ', '.join(sorted(ds_set))
        reset_batch()
        get_batch().name = description

    self.status_reporter = None
    if get_config()['workflow'].get('status_reporter') == 'metamist':
        self.status_reporter = MetamistStatusReporter()
    self._stages: list[StageDecorator] | None = stages
    self.queued_stages: list[Stage] = []

dry_run instance-attribute

dry_run = dry_run or get('dry_run')

show_workflow instance-attribute

show_workflow = get('show_workflow', False)

access_level instance-attribute

access_level = get('access_level', 'test')

run_timestamp instance-attribute

run_timestamp = get('run_timestamp') or timestamp()

cohort_prefix

cohort_prefix(cohort, category=None)

Takes a cohort and category as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID e.g. "gs://cpg-project-main/seqr_loader/COH123", or "gs://cpg-project-main-analysis/seqr_loader/COH123"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and id from this Cohort

TYPE: Cohort

category

sub-bucket for this project

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/workflow.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def cohort_prefix(self, cohort: Cohort, category: str | None = None) -> Path:
    """
    Takes a cohort and category as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID
    e.g. "gs://cpg-project-main/seqr_loader/COH123", or "gs://cpg-project-main-analysis/seqr_loader/COH123"

    Args:
        cohort (Cohort): we pull the analysis dataset and id from this Cohort
        category (str | None): sub-bucket for this project

    Returns:
        Path
    """
    return cohort.dataset.prefix(category=category) / self.name / cohort.id

run

run(stages=None, wait=False)

Resolve stages, add and submit Hail Batch jobs. When run_all_implicit_stages is set, all required stages that were not defined explicitly would still be executed.

Source code in src/cpg_flow/workflow.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def run(
    self,
    stages: list['StageDecorator'] | None = None,
    wait: bool | None = False,
):
    """
    Resolve stages, add and submit Hail Batch jobs.
    When `run_all_implicit_stages` is set, all required stages that were not defined
    explicitly would still be executed.
    """
    stages_value = stages or self._stages
    if not stages_value:
        raise WorkflowError('No stages added')
    self.set_stages(stages_value)

    if not self.dry_run:
        get_batch().run(wait=wait)
    else:
        logger.info('Dry run: no jobs submitted')

set_stages

set_stages(requested_stages)

Iterate over stages and call their queue_for_cohort(cohort) methods; through that, creates all Hail Batch jobs through Stage.queue_jobs().

Source code in src/cpg_flow/workflow.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def set_stages(
    self,
    requested_stages: list['StageDecorator'],
):
    """
    Iterate over stages and call their queue_for_cohort(cohort) methods;
    through that, creates all Hail Batch jobs through Stage.queue_jobs().
    """
    # TOML options to configure stages:
    skip_stages = get_config()['workflow'].get('skip_stages', [])
    only_stages = get_config()['workflow'].get('only_stages', [])
    first_stages = get_config()['workflow'].get('first_stages', [])
    last_stages = get_config()['workflow'].get('last_stages', [])

    # Only allow one of only_stages or first_stages/last_stages as they seem
    # to be mutually exclusive.
    if only_stages and (first_stages or last_stages or skip_stages):
        raise WorkflowError(
            "Workflow config parameter 'only_stages' is incompatible with "
            + "'first_stages', 'last_stages' and/or 'skip_stages'",
        )

    logger.info(
        f'End stages for the workflow "{self.name}": {[cls.__name__ for cls in requested_stages]}',
    )
    logger.info('Stages additional configuration:')
    logger.info(f'  workflow/skip_stages: {skip_stages}')
    logger.info(f'  workflow/only_stages: {only_stages}')
    logger.info(f'  workflow/first_stages: {first_stages}')
    logger.info(f'  workflow/last_stages: {last_stages}')

    # Round 1: initialising stage objects.
    stages_dict: dict[str, Stage] = {}
    for cls in requested_stages:
        if cls.__name__ in stages_dict:
            continue
        stages_dict[cls.__name__] = cls()

    # Round 2: depth search to find implicit stages.
    stages_dict = self._resolve_implicit_stages(
        stages_dict=stages_dict,
        skip_stages=skip_stages,
        only_stages=only_stages,
    )

    # Round 3: set "stage.required_stages" fields to each stage.
    for stg in stages_dict.values():
        stg.required_stages = [
            stages_dict[cls.__name__] for cls in stg.required_stages_classes if cls.__name__ in stages_dict
        ]

    # Round 4: determining order of execution.
    stages, dag = self._determine_order_of_execution(stages_dict)

    # Round 5: applying workflow options first_stages and last_stages.
    if first_stages or last_stages:
        logger.info('Applying workflow/first_stages and workflow/last_stages')
        self._process_first_last_stages(stages, dag, first_stages, last_stages)
    elif only_stages:
        logger.info('Applying workflow/only_stages')
        self._process_only_stages(stages, dag, only_stages)

    if all(s.skipped for s in stages):
        raise WorkflowError('No stages to run')

    logger.info('Final workflow graph:')
    for line in _render_graph(
        dag,
        target_stages=[cls.__name__ for cls in requested_stages],
        only_stages=only_stages,
        first_stages=first_stages,
        last_stages=last_stages,
    ):
        logger.info(line)
    # Round 6: actually adding jobs from the stages.
    if not self.dry_run:
        inputs = get_multicohort()  # Would communicate with metamist.
        for i, stg in enumerate(stages):
            logger.info('*' * 60)
            logger.info(f'Stage #{i + 1}: {stg}')
            # pipeline setup is now done in MultiCohort only
            # the legacy version (input_datasets) is still supported
            # that will create a MultiCohort with a single Cohort
            if isinstance(inputs, MultiCohort):
                stg.output_by_target = stg.queue_for_multicohort(inputs)
            else:
                raise WorkflowError(f'Unsupported input type: {inputs}')
            if errors := self._process_stage_errors(stg.output_by_target):
                raise WorkflowError(
                    f'Stage {stg} failed to queue jobs with errors: ' + '\n'.join(errors),
                )
    else:
        self.queued_stages = [stg for stg in stages_dict.values() if not stg.skipped]
        logger.info(f'Queued stages: {self.queued_stages}')

    # Round 7: show the workflow
    self._show_workflow(dag, skip_stages, only_stages, first_stages, last_stages)

cpg_flow.workflow.skip

skip(_fun=None, *, reason=None, assume_outputs_exist=False)

Decorator on top of @stage that sets the self.skipped field to True. By default, expected outputs of a skipped stage will be checked, unless assume_outputs_exist is True.

@skip @stage class MyStage1(SequencingGroupStage): ...

@skip @stage(assume_outputs_exist=True) class MyStage2(SequencingGroupStage): ...

Source code in src/cpg_flow/workflow.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def skip(
    _fun: Optional['StageDecorator'] = None,
    *,
    reason: str | None = None,
    assume_outputs_exist: bool = False,
) -> Union['StageDecorator', Callable[..., 'StageDecorator']]:
    """
    Decorator on top of `@stage` that sets the `self.skipped` field to True.
    By default, expected outputs of a skipped stage will be checked,
    unless `assume_outputs_exist` is True.

    @skip
    @stage
    class MyStage1(SequencingGroupStage):
        ...

    @skip
    @stage(assume_outputs_exist=True)
    class MyStage2(SequencingGroupStage):
        ...
    """

    def decorator_stage(fun) -> 'StageDecorator':
        """Implements decorator."""

        @functools.wraps(fun)
        def wrapper_stage(*args, **kwargs) -> 'Stage':
            """Decorator helper function."""
            s = fun(*args, **kwargs)
            s.skipped = True
            s.assume_outputs_exist = assume_outputs_exist
            return s

        return wrapper_stage

    if _fun is None:
        return decorator_stage
    return decorator_stage(_fun)

cpg_flow.workflow.path_walk

path_walk(expected, collected=None)

recursive walk of expected_out if the object is iterable, walk it this gets around the issue with nested lists and dicts mainly around the use of Array outputs from Cromwell

PARAMETER DESCRIPTION
expected

any type of object containing Paths

TYPE: Any

collected

all collected paths so far

TYPE: set DEFAULT: None

RETURNS DESCRIPTION
set[Path]

a set of all collected Path nodes

Examples:

path_walk({'a': {'b': {'c': Path('d')}}}) {Path('d')} path_walk({'a': {'b': {'c': [Path('d'), Path('e')]}}}) {Path('d'), Path('e')} path_walk({'a': Path('b'),'c': {'d': 'e'}, {'f': Path('g')}})

Source code in src/cpg_flow/workflow.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def path_walk(expected, collected: set | None = None) -> set[Path]:
    """
    recursive walk of expected_out
    if the object is iterable, walk it
    this gets around the issue with nested lists and dicts
    mainly around the use of Array outputs from Cromwell

    Args:
        expected (Any): any type of object containing Paths
        collected (set): all collected paths so far

    Returns:
        a set of all collected Path nodes

    Examples:

    >>> path_walk({'a': {'b': {'c': Path('d')}}})
    {Path('d')}
    >>> path_walk({'a': {'b': {'c': [Path('d'), Path('e')]}}})
    {Path('d'), Path('e')}
    >>> path_walk({'a': Path('b'),'c': {'d': 'e'}, {'f': Path('g')}})
    {Path('b'), Path('g')}
    """
    if collected is None:
        collected = set()

    if expected is None:
        return collected
    if isinstance(expected, dict):
        for value in expected.values():
            collected.update(path_walk(value, collected))
    if isinstance(expected, list | set):
        for value in expected:
            collected.update(path_walk(value, collected))
    if isinstance(expected, str):
        return collected
    if isinstance(expected, Path):
        if expected in collected:
            raise ValueError(f'Duplicate path {expected} in expected_out')
        collected.add(expected)
    return collected