Skip to content

Stage class

The following Stage classes are available to use:

  • Stage
  • DatasetStage
  • CohortStage
  • MultiCohortStage
  • SequencingGroupStage

You can import them from the cpg_flow package:

from cpg_flow.stage import Stage, DatasetStage, CohortStage, MultiCohortStage, SequencingGroupStage

cpg_flow.stage.Stage

Stage(
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False,
)

Bases: Generic[TargetT], ABC

Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).

Source code in src/cpg_flow/stage.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def __init__(
    self,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

queue_jobs abstractmethod

queue_jobs(target, inputs)

Adds Hail Batch jobs that process target. Assumes that all the household work is done: checking missing inputs from required stages, checking for possible reuse of existing outputs.

Source code in src/cpg_flow/stage.py
417
418
419
420
421
422
423
@abstractmethod
def queue_jobs(self, target: TargetT, inputs: StageInput) -> StageOutput | None:
    """
    Adds Hail Batch jobs that process `target`.
    Assumes that all the household work is done: checking missing inputs
    from required stages, checking for possible reuse of existing outputs.
    """

expected_outputs abstractmethod

expected_outputs(target)

Get path(s) to files that the stage is expected to generate for a target. Used within in queue_jobs() to pass paths to outputs to job commands, as well as by the workflow to check if the stage's expected outputs already exist and can be reused.

Can be a str, a Path object, or a dictionary of str/Path objects.

Source code in src/cpg_flow/stage.py
425
426
427
428
429
430
431
432
433
434
@abstractmethod
def expected_outputs(self, target: TargetT) -> ExpectedResultT:
    """
    Get path(s) to files that the stage is expected to generate for a `target`.
    Used within in `queue_jobs()` to pass paths to outputs to job commands,
    as well as by the workflow to check if the stage's expected outputs already
    exist and can be reused.

    Can be a str, a Path object, or a dictionary of str/Path objects.
    """

deprecated_queue_for_cohort

deprecated_queue_for_cohort(cohort)

Queues jobs for each corresponding target, defined by Stage subclass. Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion

Source code in src/cpg_flow/stage.py
437
438
439
440
441
442
443
444
445
446
def deprecated_queue_for_cohort(
    self,
    cohort: Cohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.
    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    unused, ready for deletion
    """
    return {}

queue_for_multicohort abstractmethod

queue_for_multicohort(multicohort)

Queues jobs for each corresponding target, defined by Stage subclass.

Returns a dictionary of StageOutput objects indexed by target unique_id.

Source code in src/cpg_flow/stage.py
448
449
450
451
452
453
454
455
456
457
@abstractmethod
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.

    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    """

make_outputs

make_outputs(
    target,
    data=None,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None,
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
717
718
719
720
721
722
723
724
725
726
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.DatasetStage

DatasetStage(
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False,
)

Bases: Stage, ABC

Dataset-level stage

Source code in src/cpg_flow/stage.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def __init__(
    self,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

deprecated_queue_for_cohort

deprecated_queue_for_cohort(cohort)

Queues jobs for each corresponding target, defined by Stage subclass. Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion

Source code in src/cpg_flow/stage.py
437
438
439
440
441
442
443
444
445
446
def deprecated_queue_for_cohort(
    self,
    cohort: Cohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.
    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    unused, ready for deletion
    """
    return {}

make_outputs

make_outputs(
    target,
    data=None,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None,
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
717
718
719
720
721
722
723
724
725
726
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

expected_outputs abstractmethod

expected_outputs(dataset)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
847
848
849
850
851
@abstractmethod
def expected_outputs(self, dataset: Dataset) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(dataset, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
853
854
855
856
857
@abstractmethod
def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    # iterate directly over the datasets in this multicohort
    for dataset in multicohort.get_datasets():
        action = self._get_action(dataset)
        output_by_target[dataset.target_id] = self._queue_jobs_with_checks(
            dataset,
            action,
        )
    return output_by_target

cpg_flow.stage.CohortStage

CohortStage(
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False,
)

Bases: Stage, ABC

Cohort-level stage (all datasets of a workflow run).

Source code in src/cpg_flow/stage.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def __init__(
    self,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

deprecated_queue_for_cohort

deprecated_queue_for_cohort(cohort)

Queues jobs for each corresponding target, defined by Stage subclass. Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion

Source code in src/cpg_flow/stage.py
437
438
439
440
441
442
443
444
445
446
def deprecated_queue_for_cohort(
    self,
    cohort: Cohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.
    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    unused, ready for deletion
    """
    return {}

make_outputs

make_outputs(
    target,
    data=None,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None,
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
717
718
719
720
721
722
723
724
725
726
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

expected_outputs abstractmethod

expected_outputs(cohort)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
882
883
884
885
886
@abstractmethod
def expected_outputs(self, cohort: Cohort) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(cohort, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
888
889
890
891
892
@abstractmethod
def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    for cohort in multicohort.get_cohorts():
        action = self._get_action(cohort)
        output_by_target[cohort.target_id] = self._queue_jobs_with_checks(
            cohort,
            action,
        )
    return output_by_target

cpg_flow.stage.MultiCohortStage

MultiCohortStage(
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False,
)

Bases: Stage, ABC

MultiCohort-level stage (all datasets of a workflow run).

Source code in src/cpg_flow/stage.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def __init__(
    self,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

deprecated_queue_for_cohort

deprecated_queue_for_cohort(cohort)

Queues jobs for each corresponding target, defined by Stage subclass. Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion

Source code in src/cpg_flow/stage.py
437
438
439
440
441
442
443
444
445
446
def deprecated_queue_for_cohort(
    self,
    cohort: Cohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.
    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    unused, ready for deletion
    """
    return {}

make_outputs

make_outputs(
    target,
    data=None,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None,
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
717
718
719
720
721
722
723
724
725
726
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

expected_outputs abstractmethod

expected_outputs(multicohort)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
916
917
918
919
920
@abstractmethod
def expected_outputs(self, multicohort: MultiCohort) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(multicohort, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
922
923
924
925
926
927
928
929
930
@abstractmethod
def queue_jobs(
    self,
    multicohort: MultiCohort,
    inputs: StageInput,
) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
932
933
934
935
936
937
938
939
940
941
942
943
944
945
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    action = self._get_action(multicohort)
    output_by_target[multicohort.target_id] = self._queue_jobs_with_checks(
        multicohort,
        action,
    )
    return output_by_target

cpg_flow.stage.SequencingGroupStage

SequencingGroupStage(
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False,
)

Bases: Stage[SequencingGroup], ABC

Sequencing Group level stage.

Source code in src/cpg_flow/stage.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def __init__(
    self,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

deprecated_queue_for_cohort

deprecated_queue_for_cohort(cohort)

Queues jobs for each corresponding target, defined by Stage subclass. Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion

Source code in src/cpg_flow/stage.py
437
438
439
440
441
442
443
444
445
446
def deprecated_queue_for_cohort(
    self,
    cohort: Cohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.
    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    unused, ready for deletion
    """
    return {}

make_outputs

make_outputs(
    target,
    data=None,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None,
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
717
718
719
720
721
722
723
724
725
726
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

expected_outputs abstractmethod

expected_outputs(sequencing_group)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
800
801
802
803
804
@abstractmethod
def expected_outputs(self, sequencing_group: SequencingGroup) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(sequencing_group, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
806
807
808
809
810
811
812
813
814
@abstractmethod
def queue_jobs(
    self,
    sequencing_group: SequencingGroup,
    inputs: StageInput,
) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    if not (active_sgs := multicohort.get_sequencing_groups()):
        all_sgs = len(multicohort.get_sequencing_groups(only_active=False))
        LOGGER.warning(
            f'{len(active_sgs)}/{all_sgs} usable (active=True) SGs found in the multicohort. '
            'Check that input_cohorts` or `input_datasets` are provided and not skipped',
        )
        return output_by_target

    # evaluate_stuff en masse
    for sequencing_group in active_sgs:
        action = self._get_action(sequencing_group)
        output_by_target[sequencing_group.target_id] = self._queue_jobs_with_checks(
            sequencing_group,
            action,
        )
    return output_by_target