Provides a Workflow
class and a @stage
decorator that allow to define workflows
in a declarative fashion.
A Stage
object is responsible for creating Hail Batch jobs and declaring outputs
(files or metamist analysis objects) that are expected to be produced. Each stage
acts on a Target
, which can be of the following:
* SequencingGroup - an individual Sequencing Group (e.g. the CRAM of a single sample)
* Dataset - a stratification of SGs in this analysis by Metamist Project (e.g. all SGs in acute-care)
* Cohort - a stratification of SGs in this analysis by Metamist CustomCohort
* MultiCohort - a union of all SGs in this analysis by Metamist CustomCohort
A Workflow
object plugs stages together by resolving dependencies between different levels accordingly. Stages are
defined in this package, and chained into Workflows by their inter-Stages dependencies. Workflow names are defined in
main.py, which provides a way to choose a workflow using a CLI argument.
get_workflow(dry_run=False)
Source code in src/cpg_flow/workflow.py
| def get_workflow(dry_run: bool = False) -> 'Workflow':
global _workflow
if _workflow is None:
format_logger()
_workflow = Workflow(dry_run=dry_run)
return _workflow
|
run_workflow(stages=None, wait=False, dry_run=False)
Source code in src/cpg_flow/workflow.py
157
158
159
160
161
162
163
164 | def run_workflow(
stages: list['StageDecorator'] | None = None,
wait: bool | None = False,
dry_run: bool = False,
) -> 'Workflow':
wfl = get_workflow(dry_run=dry_run)
wfl.run(stages=stages, wait=wait)
return wfl
|
Workflow(stages=None, dry_run=None)
Encapsulates a Hail Batch object, stages, and a cohort of datasets of sequencing groups.
Responsible for orchestrating stages.
Source code in src/cpg_flow/workflow.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 | def __init__(
self,
stages: list['StageDecorator'] | None = None,
dry_run: bool | None = None,
):
if _workflow is not None:
raise ValueError(
'Workflow already initialised. Use get_workflow() to get the instance',
)
self.dry_run = dry_run or get_config(True)['workflow'].get('dry_run')
self.show_workflow = get_config()['workflow'].get('show_workflow', False)
self.access_level = get_config()['workflow'].get('access_level', 'test')
# TODO: should the ['dataset'] be a get? should we rename it to analysis dataset?
analysis_dataset = get_config(True)['workflow']['dataset']
name = get_config()['workflow'].get('name', analysis_dataset)
description = get_config()['workflow'].get('description', name)
self.name = slugify(name)
self._output_version: str | None = None
if output_version := get_config()['workflow'].get('output_version'):
self._output_version = slugify(output_version)
self.run_timestamp: str = get_config()['workflow'].get('run_timestamp') or timestamp()
# Description
if self._output_version:
description += f': output_version={self._output_version}'
description += f': run_timestamp={self.run_timestamp}'
if sequencing_type := get_config()['workflow'].get('sequencing_type'):
description += f' [{sequencing_type}]'
if not self.dry_run:
if ds_set := set(d.name for d in get_multicohort().get_datasets()):
description += ' ' + ', '.join(sorted(ds_set))
reset_batch()
get_batch().name = description
self.status_reporter = None
if get_config()['workflow'].get('status_reporter') == 'metamist':
self.status_reporter = MetamistStatusReporter()
self._stages: list[StageDecorator] | None = stages
self.queued_stages: list[Stage] = []
|
dry_run = dry_run or get('dry_run')
show_workflow = get('show_workflow', False)
access_level = get('access_level', 'test')
run_timestamp = get('run_timestamp') or timestamp()
cohort_prefix(cohort, category=None)
Takes a cohort and category as an argument, calls through to the Workflow cohort_prefix method
Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID
e.g. "gs://cpg-project-main/seqr_loader/COH123", or "gs://cpg-project-main-analysis/seqr_loader/COH123"
PARAMETER |
DESCRIPTION |
cohort
|
we pull the analysis dataset and id from this Cohort
TYPE:
Cohort
|
category
|
sub-bucket for this project
TYPE:
str | None
DEFAULT:
None
|
Source code in src/cpg_flow/workflow.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325 | def cohort_prefix(self, cohort: Cohort, category: str | None = None) -> Path:
"""
Takes a cohort and category as an argument, calls through to the Workflow cohort_prefix method
Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID
e.g. "gs://cpg-project-main/seqr_loader/COH123", or "gs://cpg-project-main-analysis/seqr_loader/COH123"
Args:
cohort (Cohort): we pull the analysis dataset and id from this Cohort
category (str | None): sub-bucket for this project
Returns:
Path
"""
return cohort.dataset.prefix(category=category) / self.name / cohort.id
|
run(stages=None, wait=False)
Resolve stages, add and submit Hail Batch jobs.
When run_all_implicit_stages
is set, all required stages that were not defined
explicitly would still be executed.
Source code in src/cpg_flow/workflow.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345 | def run(
self,
stages: list['StageDecorator'] | None = None,
wait: bool | None = False,
):
"""
Resolve stages, add and submit Hail Batch jobs.
When `run_all_implicit_stages` is set, all required stages that were not defined
explicitly would still be executed.
"""
stages_value = stages or self._stages
if not stages_value:
raise WorkflowError('No stages added')
self.set_stages(stages_value)
if not self.dry_run:
get_batch().run(wait=wait)
else:
logger.info('Dry run: no jobs submitted')
|
set_stages(requested_stages)
Iterate over stages and call their queue_for_cohort(cohort) methods;
through that, creates all Hail Batch jobs through Stage.queue_jobs().
Source code in src/cpg_flow/workflow.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564 | def set_stages(
self,
requested_stages: list['StageDecorator'],
):
"""
Iterate over stages and call their queue_for_cohort(cohort) methods;
through that, creates all Hail Batch jobs through Stage.queue_jobs().
"""
# TOML options to configure stages:
skip_stages = get_config()['workflow'].get('skip_stages', [])
only_stages = get_config()['workflow'].get('only_stages', [])
first_stages = get_config()['workflow'].get('first_stages', [])
last_stages = get_config()['workflow'].get('last_stages', [])
# Only allow one of only_stages or first_stages/last_stages as they seem
# to be mutually exclusive.
if only_stages and (first_stages or last_stages or skip_stages):
raise WorkflowError(
"Workflow config parameter 'only_stages' is incompatible with "
+ "'first_stages', 'last_stages' and/or 'skip_stages'",
)
logger.info(
f'End stages for the workflow "{self.name}": {[cls.__name__ for cls in requested_stages]}',
)
logger.info('Stages additional configuration:')
logger.info(f' workflow/skip_stages: {skip_stages}')
logger.info(f' workflow/only_stages: {only_stages}')
logger.info(f' workflow/first_stages: {first_stages}')
logger.info(f' workflow/last_stages: {last_stages}')
# Round 1: initialising stage objects.
stages_dict: dict[str, Stage] = {}
for cls in requested_stages:
if cls.__name__ in stages_dict:
continue
stages_dict[cls.__name__] = cls()
# Round 2: depth search to find implicit stages.
stages_dict = self._resolve_implicit_stages(
stages_dict=stages_dict,
skip_stages=skip_stages,
only_stages=only_stages,
)
# Round 3: set "stage.required_stages" fields to each stage.
for stg in stages_dict.values():
stg.required_stages = [
stages_dict[cls.__name__] for cls in stg.required_stages_classes if cls.__name__ in stages_dict
]
# Round 4: determining order of execution.
stages, dag = self._determine_order_of_execution(stages_dict)
# Round 5: applying workflow options first_stages and last_stages.
if first_stages or last_stages:
logger.info('Applying workflow/first_stages and workflow/last_stages')
self._process_first_last_stages(stages, dag, first_stages, last_stages)
elif only_stages:
logger.info('Applying workflow/only_stages')
self._process_only_stages(stages, dag, only_stages)
if all(s.skipped for s in stages):
raise WorkflowError('No stages to run')
logger.info('Final workflow graph:')
for line in _render_graph(
dag,
target_stages=[cls.__name__ for cls in requested_stages],
only_stages=only_stages,
first_stages=first_stages,
last_stages=last_stages,
):
logger.info(line)
# Round 6: actually adding jobs from the stages.
if not self.dry_run:
inputs = get_multicohort() # Would communicate with metamist.
for i, stg in enumerate(stages):
logger.info('*' * 60)
logger.info(f'Stage #{i + 1}: {stg}')
# pipeline setup is now done in MultiCohort only
# the legacy version (input_datasets) is still supported
# that will create a MultiCohort with a single Cohort
if isinstance(inputs, MultiCohort):
stg.output_by_target = stg.queue_for_multicohort(inputs)
else:
raise WorkflowError(f'Unsupported input type: {inputs}')
if errors := self._process_stage_errors(stg.output_by_target):
raise WorkflowError(
f'Stage {stg} failed to queue jobs with errors: ' + '\n'.join(errors),
)
else:
self.queued_stages = [stg for stg in stages_dict.values() if not stg.skipped]
logger.info(f'Queued stages: {self.queued_stages}')
# Round 7: show the workflow
self._show_workflow(dag, skip_stages, only_stages, first_stages, last_stages)
|
Bases: Enum
Indicates what a stage should do with a specific target.
skip(_fun=None, *, reason=None, assume_outputs_exist=False)
Decorator on top of @stage
that sets the self.skipped
field to True.
By default, expected outputs of a skipped stage will be checked,
unless assume_outputs_exist
is True.
@skip
@stage
class MyStage1(SequencingGroupStage):
...
@skip
@stage(assume_outputs_exist=True)
class MyStage2(SequencingGroupStage):
...
Source code in src/cpg_flow/workflow.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 | def skip(
_fun: Optional['StageDecorator'] = None,
*,
reason: str | None = None,
assume_outputs_exist: bool = False,
) -> Union['StageDecorator', Callable[..., 'StageDecorator']]:
"""
Decorator on top of `@stage` that sets the `self.skipped` field to True.
By default, expected outputs of a skipped stage will be checked,
unless `assume_outputs_exist` is True.
@skip
@stage
class MyStage1(SequencingGroupStage):
...
@skip
@stage(assume_outputs_exist=True)
class MyStage2(SequencingGroupStage):
...
"""
def decorator_stage(fun) -> 'StageDecorator':
"""Implements decorator."""
@functools.wraps(fun)
def wrapper_stage(*args, **kwargs) -> 'Stage':
"""Decorator helper function."""
s = fun(*args, **kwargs)
s.skipped = True
s.assume_outputs_exist = assume_outputs_exist
return s
return wrapper_stage
if _fun is None:
return decorator_stage
return decorator_stage(_fun)
|
path_walk(expected, collected=None)
recursive walk of expected_out
if the object is iterable, walk it
this gets around the issue with nested lists and dicts
mainly around the use of Array outputs from Cromwell
PARAMETER |
DESCRIPTION |
expected
|
any type of object containing Paths
TYPE:
Any
|
collected
|
all collected paths so far
TYPE:
set
DEFAULT:
None
|
RETURNS |
DESCRIPTION |
set[Path]
|
a set of all collected Path nodes
|
Examples:
path_walk({'a': {'b': {'c': Path('d')}}})
{Path('d')}
path_walk({'a': {'b': {'c': [Path('d'), Path('e')]}}})
{Path('d'), Path('e')}
path_walk({'a': Path('b'),'c': {'d': 'e'}, {'f': Path('g')}})
Source code in src/cpg_flow/workflow.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | def path_walk(expected, collected: set | None = None) -> set[Path]:
"""
recursive walk of expected_out
if the object is iterable, walk it
this gets around the issue with nested lists and dicts
mainly around the use of Array outputs from Cromwell
Args:
expected (Any): any type of object containing Paths
collected (set): all collected paths so far
Returns:
a set of all collected Path nodes
Examples:
>>> path_walk({'a': {'b': {'c': Path('d')}}})
{Path('d')}
>>> path_walk({'a': {'b': {'c': [Path('d'), Path('e')]}}})
{Path('d'), Path('e')}
>>> path_walk({'a': Path('b'),'c': {'d': 'e'}, {'f': Path('g')}})
{Path('b'), Path('g')}
"""
if collected is None:
collected = set()
if expected is None:
return collected
if isinstance(expected, dict):
for value in expected.values():
collected.update(path_walk(value, collected))
if isinstance(expected, list | set):
for value in expected:
collected.update(path_walk(value, collected))
if isinstance(expected, str):
return collected
if isinstance(expected, Path):
if expected in collected:
raise ValueError(f'Duplicate path {expected} in expected_out')
collected.add(expected)
return collected
|