Skip to content

Stage class

The following Stage classes are available to use:

  • Stage
  • DatasetStage
  • CohortStage
  • MultiCohortStage
  • SequencingGroupStage

You can import them from the cpg_flow package:

from cpg_flow.stage import Stage, DatasetStage, CohortStage, MultiCohortStage, SequencingGroupStage

cpg_flow.stage.Stage

Stage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: ABC, Generic[TargetT]

Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).

Source code in src/cpg_flow/stage.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

queue_jobs abstractmethod

queue_jobs(target, inputs)

Adds Hail Batch jobs that process target. Assumes that all the household work is done: checking missing inputs from required stages, checking for possible reuse of existing outputs.

Source code in src/cpg_flow/stage.py
459
460
461
462
463
464
465
@abstractmethod
def queue_jobs(self, target: TargetT, inputs: StageInput) -> StageOutput | None:
    """
    Adds Hail Batch jobs that process `target`.
    Assumes that all the household work is done: checking missing inputs
    from required stages, checking for possible reuse of existing outputs.
    """

expected_outputs abstractmethod

expected_outputs(target)

Get path(s) to files that the stage is expected to generate for a target. Used within in queue_jobs() to pass paths to outputs to job commands, as well as by the workflow to check if the stage's expected outputs already exist and can be reused.

Can be a str, a Path object, or a dictionary of str/Path objects.

Source code in src/cpg_flow/stage.py
467
468
469
470
471
472
473
474
475
476
@abstractmethod
def expected_outputs(self, target: TargetT) -> ExpectedResultT:
    """
    Get path(s) to files that the stage is expected to generate for a `target`.
    Used within in `queue_jobs()` to pass paths to outputs to job commands,
    as well as by the workflow to check if the stage's expected outputs already
    exist and can be reused.

    Can be a str, a Path object, or a dictionary of str/Path objects.
    """

queue_for_multicohort abstractmethod

queue_for_multicohort(multicohort)

Queues jobs for each corresponding target, defined by Stage subclass.

Returns a dictionary of StageOutput objects indexed by target unique_id.

Source code in src/cpg_flow/stage.py
478
479
480
481
482
483
484
485
486
487
@abstractmethod
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Queues jobs for each corresponding target, defined by Stage subclass.

    Returns a dictionary of `StageOutput` objects indexed by target unique_id.
    """

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
754
755
756
757
758
759
760
761
762
763
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.DatasetStage

DatasetStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage, ABC

Dataset-level stage

Source code in src/cpg_flow/stage.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(dataset)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
908
909
910
911
912
@abstractmethod
def expected_outputs(self, dataset: Dataset) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(dataset, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
914
915
916
917
918
@abstractmethod
def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    # iterate directly over the datasets in this multicohort
    for dataset in multicohort.get_datasets():
        action = self._get_action(dataset)
        output_by_target[dataset.target_id] = self._queue_jobs_with_checks(
            dataset,
            action,
        )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
754
755
756
757
758
759
760
761
762
763
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.CohortStage

CohortStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage, ABC

Cohort-level stage (all datasets of a workflow run).

Source code in src/cpg_flow/stage.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(cohort)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
943
944
945
946
947
@abstractmethod
def expected_outputs(self, cohort: Cohort) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(cohort, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
949
950
951
952
953
@abstractmethod
def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    for cohort in multicohort.get_cohorts():
        action = self._get_action(cohort)
        output_by_target[cohort.target_id] = self._queue_jobs_with_checks(
            cohort,
            action,
        )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
754
755
756
757
758
759
760
761
762
763
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.MultiCohortStage

MultiCohortStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage, ABC

MultiCohort-level stage (all datasets of a workflow run).

Source code in src/cpg_flow/stage.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(multicohort)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
977
978
979
980
981
@abstractmethod
def expected_outputs(self, multicohort: MultiCohort) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(multicohort, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
983
984
985
986
987
988
989
990
991
@abstractmethod
def queue_jobs(
    self,
    multicohort: MultiCohort,
    inputs: StageInput,
) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    action = self._get_action(multicohort)
    output_by_target[multicohort.target_id] = self._queue_jobs_with_checks(
        multicohort,
        action,
    )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
754
755
756
757
758
759
760
761
762
763
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs

cpg_flow.stage.SequencingGroupStage

SequencingGroupStage(
    *,
    name,
    required_stages=None,
    analysis_type=None,
    analysis_keys=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    skipped=False,
    assume_outputs_exist=False,
    forced=False
)

Bases: Stage[SequencingGroup], ABC

Sequencing Group level stage.

Source code in src/cpg_flow/stage.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def __init__(
    self,
    *,
    name: str,
    required_stages: list[StageDecorator] | StageDecorator | None = None,
    analysis_type: str | None = None,
    analysis_keys: list[str] | None = None,
    update_analysis_meta: Callable[[str], dict] | None = None,
    tolerate_missing_output: bool = False,
    skipped: bool = False,
    assume_outputs_exist: bool = False,
    forced: bool = False,
):
    self._name = name
    self.required_stages_classes: list[StageDecorator] = []
    if required_stages:
        if isinstance(required_stages, list):
            self.required_stages_classes.extend(required_stages)
        else:
            self.required_stages_classes.append(required_stages)

    # Dependencies. Populated in workflow.run(), after we know all stages.
    self.required_stages: list[Stage] = []

    self.status_reporter = get_workflow().status_reporter
    # If `analysis_type` is defined, it will be used to create/update Analysis
    # entries in Metamist.
    self.analysis_type = analysis_type
    # If `analysis_keys` are defined, it will be used to extract the value for
    # `Analysis.output` if the Stage.expected_outputs() returns a dict.
    self.analysis_keys = analysis_keys
    # if `update_analysis_meta` is defined, it is called on the `Analysis.output`
    # field, and result is merged into the `Analysis.meta` dictionary.
    self.update_analysis_meta = update_analysis_meta

    self.tolerate_missing_output = tolerate_missing_output

    # Populated with the return value of `add_to_the_workflow()`
    self.output_by_target: dict[str, StageOutput | None] = dict()

    self.skipped = skipped
    self.forced = forced or self.name in get_config()['workflow'].get(
        'force_stages',
        [],
    )
    self.assume_outputs_exist = assume_outputs_exist

status_reporter instance-attribute

status_reporter = status_reporter

analysis_type instance-attribute

analysis_type = analysis_type

analysis_keys instance-attribute

analysis_keys = analysis_keys

update_analysis_meta instance-attribute

update_analysis_meta = update_analysis_meta

tolerate_missing_output instance-attribute

tolerate_missing_output = tolerate_missing_output

output_by_target instance-attribute

output_by_target = dict()

forced instance-attribute

forced = forced or name in get('force_stages', [])

assume_outputs_exist instance-attribute

assume_outputs_exist = assume_outputs_exist

name property

name

Stage name (unique and descriptive stage)

expected_outputs abstractmethod

expected_outputs(sequencing_group)

Override to declare expected output paths.

Source code in src/cpg_flow/stage.py
861
862
863
864
865
@abstractmethod
def expected_outputs(self, sequencing_group: SequencingGroup) -> ExpectedResultT:
    """
    Override to declare expected output paths.
    """

queue_jobs abstractmethod

queue_jobs(sequencing_group, inputs)

Override to add Hail Batch jobs.

Source code in src/cpg_flow/stage.py
867
868
869
870
871
872
873
874
875
@abstractmethod
def queue_jobs(
    self,
    sequencing_group: SequencingGroup,
    inputs: StageInput,
) -> StageOutput | None:
    """
    Override to add Hail Batch jobs.
    """

queue_for_multicohort

queue_for_multicohort(multicohort)

Plug the stage into the workflow.

Source code in src/cpg_flow/stage.py
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
def queue_for_multicohort(
    self,
    multicohort: MultiCohort,
) -> dict[str, StageOutput | None]:
    """
    Plug the stage into the workflow.
    """
    output_by_target: dict[str, StageOutput | None] = dict()
    if not (active_sgs := multicohort.get_sequencing_groups()):
        all_sgs = len(multicohort.get_sequencing_groups(only_active=False))
        logger.warning(
            f'{len(active_sgs)}/{all_sgs} usable (active=True) SGs found in the multicohort. '
            'Check that input_cohorts` or `input_datasets` are provided and not skipped',
        )
        return output_by_target

    # evaluate_stuff en masse
    for sequencing_group in active_sgs:
        action = self._get_action(sequencing_group)
        output_by_target[sequencing_group.target_id] = self._queue_jobs_with_checks(
            sequencing_group,
            action,
        )
    return output_by_target

get_stage_cohort_prefix

get_stage_cohort_prefix(cohort, category=None)

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

PARAMETER DESCRIPTION
cohort

we pull the analysis dataset and name from this Cohort

TYPE: Cohort

category

main, tmp, test, analysis, web

TYPE: str | none DEFAULT: None

RETURNS DESCRIPTION
Path

Path

Source code in src/cpg_flow/stage.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def get_stage_cohort_prefix(
    self,
    cohort: Cohort,
    category: str | None = None,
) -> Path:
    """
    Takes a cohort as an argument, calls through to the Workflow cohort_prefix method
    Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME
    e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"

    Args:
        cohort (Cohort): we pull the analysis dataset and name from this Cohort
        category (str | none): main, tmp, test, analysis, web

    Returns:
        Path
    """
    return get_workflow().cohort_prefix(cohort, category=category) / self.name

make_outputs

make_outputs(
    target,
    data=None,
    *,
    jobs=None,
    meta=None,
    reusable=False,
    skipped=False,
    error_msg=None
)

Create StageOutput for this stage.

Source code in src/cpg_flow/stage.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def make_outputs(
    self,
    target: Target,
    data: ExpectedResultT = None,  # TODO: ExpectedResultT is probably too broad, our code only really support dict
    *,
    jobs: Sequence[Job | None] | Job | None = None,
    meta: dict | None = None,
    reusable: bool = False,
    skipped: bool = False,
    error_msg: str | None = None,
) -> StageOutput:
    """
    Create StageOutput for this stage.
    """
    return StageOutput(
        target=target,
        data=data,
        jobs=jobs,
        meta=meta,
        reusable=reusable,
        skipped=skipped,
        error_msg=error_msg,
        stage=self,
    )

get_job_attrs

get_job_attrs(target=None)

Create Hail Batch Job attributes dictionary

Source code in src/cpg_flow/stage.py
754
755
756
757
758
759
760
761
762
763
def get_job_attrs(self, target: TargetT | None = None) -> dict[str, str]:
    """
    Create Hail Batch Job attributes dictionary
    """
    job_attrs = dict(stage=self.name)
    if sequencing_type := get_config()['workflow'].get('sequencing_type'):
        job_attrs['sequencing_type'] = sequencing_type
    if target:
        job_attrs |= target.get_job_attrs()
    return job_attrs