๐ตโ๐ซ Key Considerations and Limitations¶
๐ซ No Forward Discovery¶
The framework exclusively relies on backward traversal. If a stage is not explicitly or indirectly linked to one of the final stages through the required_stages
parameter of the @stage
decorator, it will not be included in the workflow. In other words, stages that are not reachable from a final stage are effectively ignored. This backward discovery approach ensures that only the stages directly required for the specified final stages are included, optimizing the workflow by excluding irrelevant or unused stages.
๐ Workflow Definition¶
The workflow definition serves as a lookup table for the final stages. If a final stage is not listed in this definition, it will not be part of the workflow, as there is no mechanism for forward discovery to identify it.
workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid]
๐ Config Settings for expected_outputs
¶
The expected_outputs
method is called for every stage in the workflow, even if the config.toml
configures the stage to be skipped. This ensures that the workflow can validate or reference the expected outputs of all stages.
Since this method may depend on workflow-specific configuration settings, these settings must be present in the workflow configuration, regardless of whether the stage will run. To avoid issues, it is common practice to include dummy values for such settings in the default configuration. This is not the intended behaviour and is marked as an area of improvement in a future release.
โ Verifying results of expected_outputs
¶
The API uses the results of the expected_outputs
method to determine whether a stage needs to run. A stage is scheduled for execution only if one or more Path objects returned by expected_outputs
do not exist in Google Cloud Platform (GCP). If a returned Path object exists, the stage is considered to have already run successfully, and is therefore skipped.
For outputs such as Matrix Tables (.mt), Hail Tables (.ht), or Variant Datasets (.vds), which are complex structures of thousands of files, the check is performed on the object/_SUCCESS
file to verify that the output was written completely. However, it has been observed that the object/_SUCCESS
file may be written multiple times during processing, contrary to the expectation that it should only be written once after all associated files have been fully processed.
String
outputs from expected_outputs
¶
String outputs from the expected_outputs
method are not checked by the API. This is because string outputs cannot reliably be assumed to represent valid file paths and may instead correspond to other forms of outputs.
๐ค Behavior of queue_jobs
in relation to expected_outputs
¶
When the expected_outputs
check determines that one or more required files do not exist, and the stage is not configured to be skipped, the queue_jobs
method is invoked to define the specific work that needs to be scheduled in the workflow.
The queue_jobs
method runs within the driver image, before any jobs in the workflow are executed. Because of this, it cannot access or read files generated by earlier stages, as those outputs have not yet been created. The actual outputs from earlier jobs only become available as the jobs are executed during runtime.
โ Explicit dependency between all jobs from queue_jobs
¶
When the queue_jobs
method schedules a collection of jobs to Hail Batch, one or more jobs are returned from the method, and the framework sets an explicit dependency between these jobs, and all jobs from the Stages
set in the required_stages
parameter. Therefore, all jobs that run in a Stage must be returned within queue_jobs
to ensure no jobs start out of sequence. As an example:
def filter_evens(
b: Batch,
inputs: StageInput,
previous_stage: Stage,
sequencing_groups: list[SequencingGroup],
input_files: dict[str, dict[str, Any]],
sg_outputs: dict[str, dict[str, Any]],
output_file_path: str,
) -> list[Job]:
title = 'Filter Evens'
# Compute the no evens list for each sequencing group
sg_jobs = []
sg_output_files = []
for sg in sequencing_groups: # type: ignore
job = b.new_job(name=title + ': ' + sg.id)
...
cmd = f"""
...
"""
job.command(cmd)
b.write_output(job.sg_no_evens_file, no_evens_output_file_path)
sg_jobs.append(job)
# Merge the no evens lists for all sequencing groups into a single file
job = b.new_job(name=title)
job.depends_on(*sg_jobs)
inputs = ' '.join([b.read_input(f) for f in sg_output_files])
job.command(f'cat {inputs} >> {job.no_evens_file}')
b.write_output(job.no_evens_file, output_file_path)
# ALL jobs are returned back to `queue_jobs`
# including new jobs created within this job.
all_jobs = [job, *sg_jobs]
return all_jobs