Stage
class¶
The following Stage
classes are available to use:
Stage
DatasetStage
CohortStage
MultiCohortStage
SequencingGroupStage
You can import them from the cpg_flow
package:
from cpg_flow.stage import Stage, DatasetStage, CohortStage, MultiCohortStage, SequencingGroupStage
cpg_flow.stage.Stage
¶
Stage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: ABC
, Generic[TargetT]
Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).
Source code in src/cpg_flow/stage.py
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
|
queue_jobs
abstractmethod
¶
queue_jobs(target, inputs)
Adds Hail Batch jobs that process target
.
Assumes that all the household work is done: checking missing inputs
from required stages, checking for possible reuse of existing outputs.
Source code in src/cpg_flow/stage.py
454 455 456 457 458 459 460 |
|
expected_outputs
abstractmethod
¶
expected_outputs(target)
Get path(s) to files that the stage is expected to generate for a target
.
Used within in queue_jobs()
to pass paths to outputs to job commands,
as well as by the workflow to check if the stage's expected outputs already
exist and can be reused.
Can be a str, a Path object, or a dictionary of str/Path objects.
Source code in src/cpg_flow/stage.py
462 463 464 465 466 467 468 469 470 471 |
|
queue_for_multicohort
abstractmethod
¶
queue_for_multicohort(multicohort)
Queues jobs for each corresponding target, defined by Stage subclass.
Returns a dictionary of StageOutput
objects indexed by target unique_id.
Source code in src/cpg_flow/stage.py
473 474 475 476 477 478 479 480 481 482 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
743 744 745 746 747 748 749 750 751 752 |
|
cpg_flow.stage.DatasetStage
¶
DatasetStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage
, ABC
Dataset-level stage
Source code in src/cpg_flow/stage.py
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
|
expected_outputs
abstractmethod
¶
expected_outputs(dataset)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
897 898 899 900 901 |
|
queue_jobs
abstractmethod
¶
queue_jobs(dataset, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
903 904 905 906 907 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
743 744 745 746 747 748 749 750 751 752 |
|
cpg_flow.stage.CohortStage
¶
CohortStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage
, ABC
Cohort-level stage (all datasets of a workflow run).
Source code in src/cpg_flow/stage.py
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
|
expected_outputs
abstractmethod
¶
expected_outputs(cohort)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
932 933 934 935 936 |
|
queue_jobs
abstractmethod
¶
queue_jobs(cohort, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
938 939 940 941 942 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
743 744 745 746 747 748 749 750 751 752 |
|
cpg_flow.stage.MultiCohortStage
¶
MultiCohortStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage
, ABC
MultiCohort-level stage (all datasets of a workflow run).
Source code in src/cpg_flow/stage.py
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
|
expected_outputs
abstractmethod
¶
expected_outputs(multicohort)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
966 967 968 969 970 |
|
queue_jobs
abstractmethod
¶
queue_jobs(multicohort, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
972 973 974 975 976 977 978 979 980 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
982 983 984 985 986 987 988 989 990 991 992 993 994 995 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
743 744 745 746 747 748 749 750 751 752 |
|
cpg_flow.stage.SequencingGroupStage
¶
SequencingGroupStage(
*,
name,
required_stages=None,
analysis_type=None,
analysis_keys=None,
update_analysis_meta=None,
tolerate_missing_output=False,
skipped=False,
assume_outputs_exist=False,
forced=False
)
Bases: Stage[SequencingGroup]
, ABC
Sequencing Group level stage.
Source code in src/cpg_flow/stage.py
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
|
expected_outputs
abstractmethod
¶
expected_outputs(sequencing_group)
Override to declare expected output paths.
Source code in src/cpg_flow/stage.py
850 851 852 853 854 |
|
queue_jobs
abstractmethod
¶
queue_jobs(sequencing_group, inputs)
Override to add Hail Batch jobs.
Source code in src/cpg_flow/stage.py
856 857 858 859 860 861 862 863 864 |
|
queue_for_multicohort
¶
queue_for_multicohort(multicohort)
Plug the stage into the workflow.
Source code in src/cpg_flow/stage.py
866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 |
|
get_stage_cohort_prefix
¶
get_stage_cohort_prefix(cohort, category=None)
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. "gs://cpg-project-main/seqr_loader/COH123/MyStage"
PARAMETER | DESCRIPTION |
---|---|
cohort
|
we pull the analysis dataset and name from this Cohort
TYPE:
|
category
|
main, tmp, test, analysis, web
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Path
|
Path |
Source code in src/cpg_flow/stage.py
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
|
make_outputs
¶
make_outputs(
target,
data=None,
*,
jobs=None,
meta=None,
reusable=False,
skipped=False,
error_msg=None
)
Create StageOutput for this stage.
Source code in src/cpg_flow/stage.py
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
|
get_job_attrs
¶
get_job_attrs(target=None)
Create Hail Batch Job attributes dictionary
Source code in src/cpg_flow/stage.py
743 744 745 746 747 748 749 750 751 752 |
|