Stage
class¶
The following Stage
classes are available to use:
Stage
DatasetStage
CohortStage
MultiCohortStage
SequencingGroupStage
You can import them from the cpg_flow
package:
from cpg_flow.stage import Stage, DatasetStage, CohortStage, MultiCohortStage, SequencingGroupStage
cpg_flow.stage.Stage
¶
Stage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: ABC
, Generic[TargetT]
Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).
Source code in src/cpg_flow/stage.py
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
|
queue_jobs
abstractmethod
¶
queue_jobs(target, inputs)
Adds Hail Batch jobs that process target
.
Assumes that all the household work is done: checking missing inputs
from required stages, checking for possible reuse of existing outputs.
Source code in src/cpg_flow/stage.py
459 460 461 462 463 464 465 |
|
expected_outputs
abstractmethod
¶
expected_outputs(target)
Get path(s) to files that the stage is expected to generate for a target
.
Used within in queue_jobs()
to pass paths to outputs to job commands,
as well as by the workflow to check if the stage's expected outputs already
exist and can be reused.
Can be a str, a Path object, or a dictionary of str/Path objects.
Source code in src/cpg_flow/stage.py
467 468 469 470 471 472 473 474 475 476 |
|
queue_for_multicohort
abstractmethod
¶
queue_for_multicohort(multicohort)
Queues jobs for each corresponding target, defined by Stage subclass.
Returns a dictionary of StageOutput
objects indexed by target unique_id.
Source code in src/cpg_flow/stage.py
478 479 480 481 482 483 484 485 486 487 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
754 755 756 757 758 759 760 761 762 763 |
|
cpg_flow.stage.DatasetStage
¶
DatasetStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage
, ABC
Dataset-level stage
Source code in src/cpg_flow/stage.py
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
|
expected_outputs
abstractmethod
¶
expected_outputs(dataset)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
908 909 910 911 912 |
|
queue_jobs
abstractmethod
¶
queue_jobs(dataset, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
914 915 916 917 918 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
754 755 756 757 758 759 760 761 762 763 |
|
cpg_flow.stage.CohortStage
¶
CohortStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage
, ABC
Cohort-level stage (all datasets of a workflow run).
Source code in src/cpg_flow/stage.py
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
|
expected_outputs
abstractmethod
¶
expected_outputs(cohort)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
943 944 945 946 947 |
|
queue_jobs
abstractmethod
¶
queue_jobs(cohort, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
949 950 951 952 953 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
754 755 756 757 758 759 760 761 762 763 |
|
cpg_flow.stage.MultiCohortStage
¶
MultiCohortStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage
, ABC
MultiCohort-level stage (all datasets of a workflow run).
Source code in src/cpg_flow/stage.py
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
|
expected_outputs
abstractmethod
¶
expected_outputs(multicohort)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
977 978 979 980 981 |
|
queue_jobs
abstractmethod
¶
queue_jobs(multicohort, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
983 984 985 986 987 988 989 990 991 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
754 755 756 757 758 759 760 761 762 763 |
|
cpg_flow.stage.SequencingGroupStage
¶
SequencingGroupStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage[SequencingGroup]
, ABC
Sequencing Group level stage.
Source code in src/cpg_flow/stage.py
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
|
expected_outputs
abstractmethod
¶
expected_outputs(sequencing_group)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
861 862 863 864 865 |
|
queue_jobs
abstractmethod
¶
queue_jobs(sequencing_group, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
867 868 869 870 871 872 873 874 875 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
754 755 756 757 758 759 760 761 762 763 |
|