Skip to content

Status

Metamist wrappers to report analysis progress.

cpg_flow.status.complete_analysis_job

complete_analysis_job(
    output,
    analysis_type,
    cohort_ids,
    sg_ids,
    project_name,
    meta,
    update_analysis_meta=None,
    tolerate_missing=False,
)

a job to be called within the batch as a pythonJob this will register the analysis outputs from a Stage

PARAMETER DESCRIPTION
output

path to the output file

TYPE: str

analysis_type

metamist analysis type

TYPE: str

sg_ids

all CPG IDs relevant to this target

TYPE: list[str]

project_name

project/dataset name

TYPE: str

meta

any metadata to add

TYPE: dict

update_analysis_meta

function to update analysis meta

TYPE: Callable | None DEFAULT: None

tolerate_missing

if True, allow missing output

TYPE: bool DEFAULT: False

Source code in src/cpg_flow/status.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def complete_analysis_job(
    output: str,
    analysis_type: str,
    cohort_ids: list[str],
    sg_ids: list[str],
    project_name: str,
    meta: dict,
    update_analysis_meta: Callable | None = None,
    tolerate_missing: bool = False,
):
    """
    a job to be called within the batch as a pythonJob
    this will register the analysis outputs from a Stage

    Args:
        output (str): path to the output file
        analysis_type (str): metamist analysis type
        sg_ids (list[str]): all CPG IDs relevant to this target
        project_name (str): project/dataset name
        meta (dict): any metadata to add
        update_analysis_meta (Callable | None): function to update analysis meta
        tolerate_missing (bool): if True, allow missing output
    """

    from cpg_flow.metamist import AnalysisStatus, get_metamist
    from cpg_utils import to_path

    assert isinstance(output, str)
    output_cloudpath = to_path(output)

    if update_analysis_meta is not None:
        meta = meta | update_analysis_meta(output)

    # if SG IDs are listed in the meta, remove them
    # these are already captured in the sg_ids list
    meta.pop('sequencing_groups', None)

    # if the meta has a remove_sgids key, we need to remove those from the list
    # this occurs when samples are soft-filtered from joint-calls in a way that
    # doesn't percolate through to the dataset/cohorts
    # removal here prevents results being registered for samples that were omitted
    if 'remove_sgids' in meta and to_path(meta['remove_sgids']).exists():
        with to_path(meta['remove_sgids']).open() as f:
            exclusion_ids = set(f.read().splitlines())
            print(f'removing {len(exclusion_ids)} samples from analysis')
            print(f'samples for removal: {", ".join(exclusion_ids)}')
            sg_ids = [sg for sg in sg_ids if sg not in exclusion_ids]

    # we know that es indexes are registered names, not files/dirs
    # skip all relevant checks for this output type
    if analysis_type != 'es-index':
        if not output_cloudpath.exists():
            if tolerate_missing:
                print(f"Output {output} doesn't exist, allowing silent return")
                return
            raise ValueError(f"Output {output} doesn't exist")

        # add file size to meta
        if not output_cloudpath.is_dir():
            meta |= {'size': output_cloudpath.stat().st_size}

    a_id = get_metamist().create_analysis(
        output=output,
        type_=analysis_type,
        status=AnalysisStatus('completed'),
        cohort_ids=cohort_ids,
        sequencing_group_ids=sg_ids,
        dataset=project_name,
        meta=meta,
    )
    if a_id is None:
        _msg = f'Creation of Analysis failed (type={analysis_type}, output={output}) in {project_name}'
        print(_msg)
        raise ConnectionError(_msg)
    print(
        f'Created Analysis(id={a_id}, type={analysis_type}, output={output}) in {project_name}',
    )

cpg_flow.status.StatusReporterError

Bases: Exception

Error thrown by StatusReporter.

cpg_flow.status.StatusReporter

Bases: ABC

Status reporter

create_analysis abstractmethod

create_analysis(
    b,
    output,
    analysis_type,
    target,
    jobs=None,
    job_attr=None,
    meta=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    project_name=None,
)

Record analysis entry.

Source code in src/cpg_flow/status.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@abstractmethod
def create_analysis(
    self,
    b: Batch,
    output: str,
    analysis_type: str,
    target: Target,
    jobs: list[Job] | None = None,
    job_attr: dict | None = None,
    meta: dict | None = None,
    update_analysis_meta: Callable | None = None,
    tolerate_missing_output: bool = False,
    project_name: str | None = None,
):
    """
    Record analysis entry.
    """

cpg_flow.status.MetamistStatusReporter

MetamistStatusReporter()

Bases: StatusReporter

Job status reporter. Works through creating metamist Analysis entries.

Source code in src/cpg_flow/status.py
131
132
def __init__(self) -> None:
    super().__init__()

create_analysis

create_analysis(
    b,
    output,
    analysis_type,
    target,
    jobs=None,
    job_attr=None,
    meta=None,
    update_analysis_meta=None,
    tolerate_missing_output=False,
    project_name=None,
)

Create completed analysis job

Source code in src/cpg_flow/status.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def create_analysis(
    self,
    b: Batch,
    output: str,
    analysis_type: str,
    target: Target,
    jobs: list[Job] | None = None,
    job_attr: dict | None = None,
    meta: dict | None = None,
    update_analysis_meta: Callable | None = None,
    tolerate_missing_output: bool = False,
    project_name: str | None = None,
):
    """
    Create completed analysis job
    """

    # no jobs means no output, so no need to create analysis
    if not jobs:
        return

    if meta is None:
        meta = {}

    # find all relevant SG IDs
    # Currently this implementation will only return sg ids or cohort ids
    # The other list of ids will be empty
    # It is unclear if metamist would accept both list of ids and succeed
    cohort_ids = []
    sequencing_group_ids = []
    if target is None:
        raise ValueError('Target is required to create analysis')
    if isinstance(target, MultiCohort):
        cohort_ids = target.get_cohort_ids()
    elif isinstance(target, Cohort):
        cohort_ids = [target.get_cohort_id()]
    else:
        sequencing_group_ids = target.get_sequencing_group_ids()

    py_job = b.new_python_job(
        f'Register analysis output {output}',
        job_attr or {} | {'tool': 'metamist'},
    )
    py_job.image(get_config()['workflow']['driver_image'])
    py_job.call(
        complete_analysis_job,
        str(output),
        analysis_type,
        cohort_ids,
        sequencing_group_ids,
        project_name,
        meta,
        update_analysis_meta,
        tolerate_missing_output,
    )

    py_job.depends_on(*jobs)