Skip to content

๐Ÿ˜ตโ€๐Ÿ’ซ Key Considerations and Limitations

๐Ÿšซ No Forward Discovery

The framework exclusively relies on backward traversal. If a stage is not explicitly or indirectly linked to one of the final stages through the required_stages parameter of the @stage decorator, it will not be included in the workflow. In other words, stages that are not reachable from a final stage are effectively ignored. This backward discovery approach ensures that only the stages directly required for the specified final stages are included, optimizing the workflow by excluding irrelevant or unused stages.

๐Ÿ“ Workflow Definition

The workflow definition serves as a lookup table for the final stages. If a final stage is not listed in this definition, it will not be part of the workflow, as there is no mechanism for forward discovery to identify it.

workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid]

๐Ÿ“œ Config Settings for expected_outputs

The expected_outputs method is called for every stage in the workflow, even if the config.toml configures the stage to be skipped. This ensures that the workflow can validate or reference the expected outputs of all stages.

Since this method may depend on workflow-specific configuration settings, these settings must be present in the workflow configuration, regardless of whether the stage will run. To avoid issues, it is common practice to include dummy values for such settings in the default configuration. This is not the intended behaviour and is marked as an area of improvement in a future release.

โ“ Verifying results of expected_outputs

The API uses the results of the expected_outputs method to determine whether a stage needs to run. A stage is scheduled for execution only if one or more Path objects returned by expected_outputs do not exist in Google Cloud Platform (GCP). If a returned Path object exists, the stage is considered to have already run successfully, and is therefore skipped.

For outputs such as Matrix Tables (.mt), Hail Tables (.ht), or Variant Datasets (.vds), which are complex structures of thousands of files, the check is performed on the object/_SUCCESS file to verify that the output was written completely. However, it has been observed that the object/_SUCCESS file may be written multiple times during processing, contrary to the expectation that it should only be written once after all associated files have been fully processed.

String outputs from expected_outputs

String outputs from the expected_outputs method are not checked by the API. This is because string outputs cannot reliably be assumed to represent valid file paths and may instead correspond to other forms of outputs.

๐Ÿค” Behavior of queue_jobs in relation to expected_outputs

When the expected_outputs check determines that one or more required files do not exist, and the stage is not configured to be skipped, the queue_jobs method is invoked to define the specific work that needs to be scheduled in the workflow.

The queue_jobs method runs within the driver image, before any jobs in the workflow are executed. Because of this, it cannot access or read files generated by earlier stages, as those outputs have not yet been created. The actual outputs from earlier jobs only become available as the jobs are executed during runtime.

โ›“ Explicit dependency between all jobs from queue_jobs

When the queue_jobs method schedules a collection of jobs to Hail Batch, one or more jobs are returned from the method, and the framework sets an explicit dependency between these jobs, and all jobs from the Stages set in the required_stages parameter. Therefore, all jobs that run in a Stage must be returned within queue_jobs to ensure no jobs start out of sequence. As an example:

test_workflows_shared/cpg_flow_test/jobs/filter_evens.py
def filter_evens(
    b: Batch,
    inputs: StageInput,
    previous_stage: Stage,
    sequencing_groups: list[SequencingGroup],
    input_files: dict[str, dict[str, Any]],
    sg_outputs: dict[str, dict[str, Any]],
    output_file_path: str,
) -> list[Job]:
    title = 'Filter Evens'

    # Compute the no evens list for each sequencing group
    sg_jobs = []
    sg_output_files = []
    for sg in sequencing_groups:  # type: ignore
        job = b.new_job(name=title + ': ' + sg.id)
        ...

        cmd = f"""
        ...
        """

        job.command(cmd)
        b.write_output(job.sg_no_evens_file, no_evens_output_file_path)
        sg_jobs.append(job)

    # Merge the no evens lists for all sequencing groups into a single file
    job = b.new_job(name=title)
    job.depends_on(*sg_jobs)
    inputs = ' '.join([b.read_input(f) for f in sg_output_files])
    job.command(f'cat {inputs} >> {job.no_evens_file}')
    b.write_output(job.no_evens_file, output_file_path)

    # ALL jobs are returned back to `queue_jobs`
    # including new jobs created within this job.
    all_jobs = [job, *sg_jobs]
    return all_jobs