Skip to content

Stage class

The following Stage classes are available to use:

  • Stage
  • DatasetStage
  • CohortStage
  • MultiCohortStage
  • SequencingGroupStage

You can import them from the cpg_flow package:

from cpg_flow.stage import Stage, DatasetStage, CohortStage, MultiCohortStage, SequencingGroupStage

cpg_flow.stage.Stage

Stage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: ABC, Generic[TargetT]

Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).

Source code in src/cpg_flow/stage.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

queue_jobs abstractmethod

queue_jobs(target, inputs)

Adds Hail Batch jobs that process target. Assumes that all the household work is done: checking missing inputs from required stages, checking for possible reuse of existing outputs.

Source code in src/cpg_flow/stage.py
454
455
456
457
458
459
460
@abstractmethod
def queue_jobs(self, target: TargetT, inputs: StageInput) -> StageOutput | None:
    """
    Adds Hail Batch jobs that process `target`.
    Assumes that all the household work is done: checking missing inputs
    from required stages, checking for possible reuse of existing outputs.
    """

expected_outputs abstractmethod

expected_outputs(target)

Get path(s) to files that the stage is expected to generate for a target. Used within in queue_jobs() to pass paths to outputs to job commands, as well as by the workflow to check if the stage's expected outputs already exist and can be reused.

Can be a str, a Path object, or a dictionary of str/Path objects.

Source code in src/cpg_flow/stage.py
462
463
464
465
466
467
468
469
470
471
@abstractmethod
def expected_outputs(self, target: TargetT) -> ExpectedResultT:
    """
    Get path(s) to files that the stage is expected to generate for a `target`.
    Used within in `queue_jobs()` to pass paths to outputs to job commands,
    as well as by the workflow to check if the stage's expected outputs already
    exist and can be reused.

    Can be a str, a Path object, or a dictionary of str/Path objects.
    """

queue_for_multicohort abstractmethod

queue_for_multicohort(multicohort)

Queues jobs for each corresponding target, defined by Stage subclass.

Returns a dictionary of StageOutput objects indexed by target unique_id.

Source code in src/cpg_flow/stage.py
473
474
475
476
477
478
479
480
481
482
@abstractmethod
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.

    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    """

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
743
744
745
746
747
748
749
750
751
752
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.DatasetStage

DatasetStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage, ABC

Dataset-level stage

Source code in src/cpg_flow/stage.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(dataset)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
897
898
899
900
901
@abstractmethod
def expected_outputs(self, dataset: Dataset) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(dataset, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
903
904
905
906
907
@abstractmethod
def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    # iterate directly over the datasets in this multicohort
    for dataset in multicohort.get_datasets():
        action = self._get_action(dataset)
        output_by_target[dataset.target_id] = self._queue_jobs_with_checks(
            dataset,
            action,
        )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
743
744
745
746
747
748
749
750
751
752
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.CohortStage

CohortStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage, ABC

Cohort-level stage (all datasets of a workflow run).

Source code in src/cpg_flow/stage.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(cohort)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
932
933
934
935
936
@abstractmethod
def expected_outputs(self, cohort: Cohort) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(cohort, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
938
939
940
941
942
@abstractmethod
def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    for cohort in multicohort.get_cohorts():
        action = self._get_action(cohort)
        output_by_target[cohort.target_id] = self._queue_jobs_with_checks(
            cohort,
            action,
        )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
743
744
745
746
747
748
749
750
751
752
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.MultiCohortStage

MultiCohortStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage, ABC

MultiCohort-level stage (all datasets of a workflow run).

Source code in src/cpg_flow/stage.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(multicohort)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
966
967
968
969
970
@abstractmethod
def expected_outputs(self, multicohort: MultiCohort) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(multicohort, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
972
973
974
975
976
977
978
979
980
@abstractmethod
def queue_jobs(
    self,
    multicohort: MultiCohort,
    inputs: StageInput,
) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
982
983
984
985
986
987
988
989
990
991
992
993
994
995
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    action = self._get_action(multicohort)
    output_by_target[multicohort.target_id] = self._queue_jobs_with_checks(
        multicohort,
        action,
    )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
743
744
745
746
747
748
749
750
751
752
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.SequencingGroupStage

SequencingGroupStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage[SequencingGroup], ABC

Sequencing Group level stage.

Source code in src/cpg_flow/stage.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(sequencing_group)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
850
851
852
853
854
@abstractmethod
def expected_outputs(self, sequencing_group: SequencingGroup) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(sequencing_group, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
856
857
858
859
860
861
862
863
864
@abstractmethod
def queue_jobs(
    self,
    sequencing_group: SequencingGroup,
    inputs: StageInput,
) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    if not (active_sgs := multicohort.get_sequencing_groups()):
        all_sgs = len(multicohort.get_sequencing_groups(only_active=False))
        logger.warning(
            f'{len(active_sgs)}/{all_sgs} usable (active=True) SGs found in the multicohort. '
            'Check that input_cohorts` or `input_datasets` are provided and not skipped',
        )
        return output_by_target

    # evaluate_stuff en masse
    for sequencing_group in active_sgs:
        action = self._get_action(sequencing_group)
        output_by_target[sequencing_group.target_id] = self._queue_jobs_with_checks(
            sequencing_group,
            action,
        )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
743
744
745
746
747
748
749
750
751
752
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs