Skip to content

Target class

The following Target classes are available to use:

  • Target
  • Cohort
  • Dataset
  • MultiCohort
  • SequencingGroup

You can import them from the cpg_flow package:

from cpg_flow.targets import Cohort, Dataset, MultiCohort, SequencingGroup, Target

cpg_flow.targets.Target

Target()

Defines a target that a stage can act upon.

Source code in src/cpg_flow/targets/target.py
49
50
51
52
53
54
55
56
57
58
def __init__(self) -> None:
    # Whether to process even if outputs exist:
    self.forced: bool = False

    # If not set, exclude from the workflow:
    self.active: bool = True

    # create a self.alignment_inputs_hash variable to store the hash of the alignment inputs
    # this begins as None, and is set upon first calling
    self.alignment_inputs_hash: str | None = None

alignment_inputs_hash instance-attribute

alignment_inputs_hash = None

target_id property

target_id

ID should be unique across target of all levels.

We are raising NotImplementedError instead of making it an abstract class, because mypy is not happy about binding TypeVar to abstract classes, see: https://stackoverflow.com/questions/48349054/how-do-you-annotate-the-type-of -an-abstract-class-with-mypy

Specifically,

TypeVar('TargetT', bound=Target)
Will raise:
Only concrete class can be given where "Type[Target]" is expected

get_sequencing_groups

get_sequencing_groups(only_active=True)

Get flat list of all sequencing groups corresponding to this target.

Source code in src/cpg_flow/targets/target.py
60
61
62
63
64
65
66
67
def get_sequencing_groups(
    self,
    only_active: bool = True,
) -> list['SequencingGroup']:
    """
    Get flat list of all sequencing groups corresponding to this target.
    """
    raise NotImplementedError

get_sequencing_group_ids

get_sequencing_group_ids(only_active=True)

Get flat list of all sequencing group IDs corresponding to this target.

Source code in src/cpg_flow/targets/target.py
69
70
71
72
73
def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]:
    """
    Get flat list of all sequencing group IDs corresponding to this target.
    """
    return [s.id for s in self.get_sequencing_groups(only_active=only_active)]

get_alignment_inputs_hash

get_alignment_inputs_hash()

If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts) - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets) - we then set up the Stages, where alignment input hashes are generated - at this point, the alignment inputs are fixed - all calls to get_alignment_inputs_hash() need to return the same value

Source code in src/cpg_flow/targets/target.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_alignment_inputs_hash(self) -> str:
    """
    If this hash has been set, return it, otherwise set it, then return it
    This should be safe as it matches the current usage:
    - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)
        - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)
    - we then set up the Stages, where alignment input hashes are generated
        - at this point, the alignment inputs are fixed
        - all calls to get_alignment_inputs_hash() need to return the same value
    """
    if self.alignment_inputs_hash is None:
        self.set_alignment_inputs_hash()
    if self.alignment_inputs_hash is None:
        raise TypeError('Alignment_inputs_hash was not populated by the setter method')
    return self.alignment_inputs_hash

set_alignment_inputs_hash

set_alignment_inputs_hash()

Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.

Source code in src/cpg_flow/targets/target.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def set_alignment_inputs_hash(self):
    """
    Unique hash string of sample alignment inputs. Useful to decide
    whether the analysis on the target needs to be rerun.
    """
    s = ' '.join(
        sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input),
    )
    h = hashlib.sha256(s.encode()).hexdigest()[:38]
    self.alignment_inputs_hash = f'{h}_{len(self.get_sequencing_group_ids())}'

get_job_attrs

get_job_attrs()

Attributes for Hail Batch job.

Source code in src/cpg_flow/targets/target.py
123
124
125
126
127
def get_job_attrs(self) -> dict:
    """
    Attributes for Hail Batch job.
    """
    raise NotImplementedError

get_job_prefix

get_job_prefix()

Prefix job names.

Source code in src/cpg_flow/targets/target.py
129
130
131
132
133
def get_job_prefix(self) -> str:
    """
    Prefix job names.
    """
    raise NotImplementedError

rich_id_map

rich_id_map()

Map if internal IDs to participant or external IDs, if the latter is provided.

Source code in src/cpg_flow/targets/target.py
135
136
137
138
139
def rich_id_map(self) -> dict[str, str]:
    """
    Map if internal IDs to participant or external IDs, if the latter is provided.
    """
    return {s.id: s.rich_id for s in self.get_sequencing_groups() if s.participant_id != s.id}

cpg_flow.targets.Cohort

Cohort(id=None, name=None)

Bases: Target

Represents a "cohort" target - all sequencing groups from a single CustomCohort (potentially spanning multiple datasets) in the workflow. Analysis dataset name is required and will be used as the default name for the cohort.

Source code in src/cpg_flow/targets/cohort.py
43
44
45
46
47
48
def __init__(self, id: str | None = None, name: str | None = None) -> None:
    super().__init__()
    self.id = id or get_config()['workflow']['dataset']
    self.name = name or get_config()['workflow']['dataset']
    self.analysis_dataset = Dataset(name=get_config()['workflow']['dataset'])
    self._sequencing_group_by_id: dict[str, SequencingGroup] = {}

alignment_inputs_hash instance-attribute

alignment_inputs_hash = None

id instance-attribute

id = id or get_config()['workflow']['dataset']

name instance-attribute

name = name or get_config()['workflow']['dataset']

analysis_dataset instance-attribute

analysis_dataset = Dataset(
    name=get_config()["workflow"]["dataset"]
)

target_id property

target_id

Unique target ID

get_sequencing_group_ids

get_sequencing_group_ids(only_active=True)

Get flat list of all sequencing group IDs corresponding to this target.

Source code in src/cpg_flow/targets/target.py
69
70
71
72
73
def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]:
    """
    Get flat list of all sequencing group IDs corresponding to this target.
    """
    return [s.id for s in self.get_sequencing_groups(only_active=only_active)]

get_alignment_inputs_hash

get_alignment_inputs_hash()

If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts) - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets) - we then set up the Stages, where alignment input hashes are generated - at this point, the alignment inputs are fixed - all calls to get_alignment_inputs_hash() need to return the same value

Source code in src/cpg_flow/targets/target.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_alignment_inputs_hash(self) -> str:
    """
    If this hash has been set, return it, otherwise set it, then return it
    This should be safe as it matches the current usage:
    - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)
        - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)
    - we then set up the Stages, where alignment input hashes are generated
        - at this point, the alignment inputs are fixed
        - all calls to get_alignment_inputs_hash() need to return the same value
    """
    if self.alignment_inputs_hash is None:
        self.set_alignment_inputs_hash()
    if self.alignment_inputs_hash is None:
        raise TypeError('Alignment_inputs_hash was not populated by the setter method')
    return self.alignment_inputs_hash

set_alignment_inputs_hash

set_alignment_inputs_hash()

Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.

Source code in src/cpg_flow/targets/target.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def set_alignment_inputs_hash(self):
    """
    Unique hash string of sample alignment inputs. Useful to decide
    whether the analysis on the target needs to be rerun.
    """
    s = ' '.join(
        sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input),
    )
    h = hashlib.sha256(s.encode()).hexdigest()[:38]
    self.alignment_inputs_hash = f'{h}_{len(self.get_sequencing_group_ids())}'

rich_id_map

rich_id_map()

Map if internal IDs to participant or external IDs, if the latter is provided.

Source code in src/cpg_flow/targets/target.py
135
136
137
138
139
def rich_id_map(self) -> dict[str, str]:
    """
    Map if internal IDs to participant or external IDs, if the latter is provided.
    """
    return {s.id: s.rich_id for s in self.get_sequencing_groups() if s.participant_id != s.id}

get_cohort_id

get_cohort_id()

Get the cohort ID

Source code in src/cpg_flow/targets/cohort.py
58
59
60
def get_cohort_id(self) -> str:
    """Get the cohort ID"""
    return self.id

write_ped_file

write_ped_file(out_path=None, use_participant_id=False)

Create a PED file for all samples in the whole cohort PED is written with no header line to be strict specification compliant

Source code in src/cpg_flow/targets/cohort.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def write_ped_file(
    self,
    out_path: Path | None = None,
    use_participant_id: bool = False,
) -> Path:
    """
    Create a PED file for all samples in the whole cohort
    PED is written with no header line to be strict specification compliant
    """
    datas = []
    for sequencing_group in self.get_sequencing_groups():
        datas.append(
            sequencing_group.pedigree.get_ped_dict(
                use_participant_id=use_participant_id,
            ),
        )
    if not datas:
        raise ValueError(f'No pedigree data found for {self.id}')
    df = pd.DataFrame(datas)

    if out_path is None:
        out_path = self.analysis_dataset.tmp_prefix() / 'ped' / f'{self.get_alignment_inputs_hash()}.ped'

    if not get_config()['workflow'].get('dry_run', False):
        with out_path.open('w') as fp:
            df.to_csv(fp, sep='\t', index=False, header=False)
    return out_path

add_sequencing_group_object

add_sequencing_group_object(s, allow_duplicates=True)

Add a sequencing group object to the Cohort. Args: s: SequencingGroup object allow_duplicates: if True, allow adding the same object twice

Source code in src/cpg_flow/targets/cohort.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def add_sequencing_group_object(
    self,
    s: 'SequencingGroup',
    allow_duplicates: bool = True,
):
    """
    Add a sequencing group object to the Cohort.
    Args:
        s: SequencingGroup object
        allow_duplicates: if True, allow adding the same object twice
    """
    if s.id in self._sequencing_group_by_id:
        if allow_duplicates:
            LOGGER.debug(
                f'SequencingGroup {s.id} already exists in the Cohort {self.name}',
            )
            return self._sequencing_group_by_id[s.id]
        raise ValueError(
            f'SequencingGroup {s.id} already exists in the Cohort {self.name}',
        )
    self._sequencing_group_by_id[s.id] = s

get_sequencing_groups

get_sequencing_groups(only_active=True)

Gets a flat list of all sequencing groups from all datasets. Include only "active" sequencing groups (unless only_active is False)

Source code in src/cpg_flow/targets/cohort.py
112
113
114
115
116
117
118
119
120
def get_sequencing_groups(
    self,
    only_active: bool = True,
) -> list['SequencingGroup']:
    """
    Gets a flat list of all sequencing groups from all datasets.
    Include only "active" sequencing groups (unless only_active is False)
    """
    return [s for s in self._sequencing_group_by_id.values() if (s.active or not only_active)]

get_job_attrs

get_job_attrs()

Attributes for Hail Batch job.

Source code in src/cpg_flow/targets/cohort.py
122
123
124
125
126
127
128
def get_job_attrs(self) -> dict:
    """
    Attributes for Hail Batch job.
    """
    return {
        # 'sequencing_groups': self.get_sequencing_group_ids(),
    }

get_job_prefix

get_job_prefix()

Prefix job names.

Source code in src/cpg_flow/targets/cohort.py
130
131
132
133
134
def get_job_prefix(self) -> str:
    """
    Prefix job names.
    """
    return ''

to_tsv

to_tsv()

Export to a parsable TSV file

Source code in src/cpg_flow/targets/cohort.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def to_tsv(self) -> str:
    """
    Export to a parsable TSV file
    """
    assert self.get_sequencing_groups()
    tsv_path = self.analysis_dataset.tmp_prefix() / 'samples.tsv'
    df = pd.DataFrame(
        {
            's': s.id,
            'gvcf': s.gvcf or '-',
            'sex': s.meta.get('sex') or '-',
            'continental_pop': s.meta.get('continental_pop') or '-',
            'subcontinental_pop': s.meta.get('subcontinental_pop') or '-',
        }
        for s in self.get_sequencing_groups()
    ).set_index('s', drop=False)
    with to_path(tsv_path).open('w') as f:
        df.to_csv(f, index=False, sep='\t', na_rep='NA')
    return tsv_path

cpg_flow.targets.Dataset

Dataset(name)

Bases: Target

Represents a CPG dataset.

Each dataset at the CPG corresponds to * a GCP project: https://github.com/populationgenomics/team-docs/tree/main/storage_policies * a Pulumi stack: https://github.com/populationgenomics/analysis-runner/tree/main/stack * a metamist project

Source code in src/cpg_flow/targets/dataset.py
47
48
49
50
51
52
53
54
def __init__(
    self,
    name: str,
):
    super().__init__()
    self._sequencing_group_by_id: dict[str, SequencingGroup] = {}
    self.name = name
    self.active = True

alignment_inputs_hash instance-attribute

alignment_inputs_hash = None

target_id property

target_id

Unique target ID

get_sequencing_group_ids

get_sequencing_group_ids(only_active=True)

Get flat list of all sequencing group IDs corresponding to this target.

Source code in src/cpg_flow/targets/target.py
69
70
71
72
73
def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]:
    """
    Get flat list of all sequencing group IDs corresponding to this target.
    """
    return [s.id for s in self.get_sequencing_groups(only_active=only_active)]

get_alignment_inputs_hash

get_alignment_inputs_hash()

If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts) - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets) - we then set up the Stages, where alignment input hashes are generated - at this point, the alignment inputs are fixed - all calls to get_alignment_inputs_hash() need to return the same value

Source code in src/cpg_flow/targets/target.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_alignment_inputs_hash(self) -> str:
    """
    If this hash has been set, return it, otherwise set it, then return it
    This should be safe as it matches the current usage:
    - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)
        - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)
    - we then set up the Stages, where alignment input hashes are generated
        - at this point, the alignment inputs are fixed
        - all calls to get_alignment_inputs_hash() need to return the same value
    """
    if self.alignment_inputs_hash is None:
        self.set_alignment_inputs_hash()
    if self.alignment_inputs_hash is None:
        raise TypeError('Alignment_inputs_hash was not populated by the setter method')
    return self.alignment_inputs_hash

set_alignment_inputs_hash

set_alignment_inputs_hash()

Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.

Source code in src/cpg_flow/targets/target.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def set_alignment_inputs_hash(self):
    """
    Unique hash string of sample alignment inputs. Useful to decide
    whether the analysis on the target needs to be rerun.
    """
    s = ' '.join(
        sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input),
    )
    h = hashlib.sha256(s.encode()).hexdigest()[:38]
    self.alignment_inputs_hash = f'{h}_{len(self.get_sequencing_group_ids())}'

rich_id_map

rich_id_map()

Map if internal IDs to participant or external IDs, if the latter is provided.

Source code in src/cpg_flow/targets/target.py
135
136
137
138
139
def rich_id_map(self) -> dict[str, str]:
    """
    Map if internal IDs to participant or external IDs, if the latter is provided.
    """
    return {s.id: s.rich_id for s in self.get_sequencing_groups() if s.participant_id != s.id}

create staticmethod

create(name)

Create a dataset.

Source code in src/cpg_flow/targets/dataset.py
56
57
58
59
60
61
@staticmethod
def create(name: str) -> 'Dataset':
    """
    Create a dataset.
    """
    return Dataset(name=name)

prefix

prefix(**kwargs)

The primary storage path.

Source code in src/cpg_flow/targets/dataset.py
74
75
76
77
78
79
80
81
82
83
84
def prefix(self, **kwargs) -> Path:
    """
    The primary storage path.
    """
    return to_path(
        dataset_path(
            seq_type_subdir(),
            dataset=self.name,
            **kwargs,
        ),
    )

tmp_prefix

tmp_prefix(**kwargs)

Storage path for temporary files.

Source code in src/cpg_flow/targets/dataset.py
86
87
88
89
90
91
92
93
94
95
96
97
def tmp_prefix(self, **kwargs) -> Path:
    """
    Storage path for temporary files.
    """
    return to_path(
        dataset_path(
            seq_type_subdir(),
            dataset=self.name,
            category='tmp',
            **kwargs,
        ),
    )

analysis_prefix

analysis_prefix(**kwargs)

Storage path for analysis files.

Source code in src/cpg_flow/targets/dataset.py
 99
100
101
102
103
104
105
106
107
108
109
110
def analysis_prefix(self, **kwargs) -> Path:
    """
    Storage path for analysis files.
    """
    return to_path(
        dataset_path(
            seq_type_subdir(),
            dataset=self.name,
            category='analysis',
            **kwargs,
        ),
    )

web_prefix

web_prefix(**kwargs)

Path for files served by an HTTP server Matches corresponding URLs returns by self.web_url() URLs.

Source code in src/cpg_flow/targets/dataset.py
112
113
114
115
116
117
118
119
120
121
122
123
124
def web_prefix(self, **kwargs) -> Path:
    """
    Path for files served by an HTTP server Matches corresponding URLs returns by
    self.web_url() URLs.
    """
    return to_path(
        dataset_path(
            seq_type_subdir(),
            dataset=self.name,
            category='web',
            **kwargs,
        ),
    )

web_url

web_url()

URLs matching self.storage_web_path() files serverd by an HTTP server.

Source code in src/cpg_flow/targets/dataset.py
126
127
128
129
130
131
132
133
def web_url(self) -> str | None:
    """
    URLs matching self.storage_web_path() files serverd by an HTTP server.
    """
    return web_url(
        seq_type_subdir(),
        dataset=self.name,
    )

add_sequencing_group

add_sequencing_group(
    id,
    *,
    sequencing_type,
    sequencing_technology,
    sequencing_platform,
    external_id=None,
    participant_id=None,
    meta=None,
    sex=None,
    pedigree=None,
    alignment_input=None
)

Create a new sequencing group and add it to the dataset.

Source code in src/cpg_flow/targets/dataset.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def add_sequencing_group(
    self,
    id: str,  # pylint: disable=redefined-builtin
    *,
    sequencing_type: str,
    sequencing_technology: str,
    sequencing_platform: str,
    external_id: str | None = None,
    participant_id: str | None = None,
    meta: dict | None = None,
    sex: Optional['Sex'] = None,
    pedigree: Optional['PedigreeInfo'] = None,
    alignment_input: AlignmentInput | None = None,
) -> 'SequencingGroup':
    """
    Create a new sequencing group and add it to the dataset.
    """
    if id in self._sequencing_group_by_id:
        LOGGER.debug(
            f'SequencingGroup {id} already exists in the dataset {self.name}',
        )
        return self._sequencing_group_by_id[id]

    force_sgs = get_config()['workflow'].get('force_sgs', set())
    forced = id in force_sgs or external_id in force_sgs or participant_id in force_sgs

    s = SequencingGroup(
        id=id,
        dataset=self,
        external_id=external_id,
        sequencing_type=sequencing_type,
        sequencing_technology=sequencing_technology,
        sequencing_platform=sequencing_platform,
        participant_id=participant_id,
        meta=meta,
        sex=sex,
        pedigree=pedigree,
        alignment_input=alignment_input,
        forced=forced,
    )
    self._sequencing_group_by_id[id] = s
    return s

add_sequencing_group_object

add_sequencing_group_object(s)

Add a sequencing group object to the dataset. Args: s: SequencingGroup object

Source code in src/cpg_flow/targets/dataset.py
178
179
180
181
182
183
184
185
186
187
188
189
def add_sequencing_group_object(self, s: 'SequencingGroup'):
    """
    Add a sequencing group object to the dataset.
    Args:
        s: SequencingGroup object
    """
    if s.id in self._sequencing_group_by_id:
        LOGGER.debug(
            f'SequencingGroup {s.id} already exists in the dataset {self.name}',
        )
    else:
        self._sequencing_group_by_id[s.id] = s

get_sequencing_group_by_id

get_sequencing_group_by_id(id)

Get sequencing group by ID

Source code in src/cpg_flow/targets/dataset.py
191
192
193
194
195
def get_sequencing_group_by_id(self, id: str) -> Optional['SequencingGroup']:
    """
    Get sequencing group by ID
    """
    return self._sequencing_group_by_id.get(id)

get_sequencing_groups

get_sequencing_groups(only_active=True)

Get dataset's sequencing groups. Include only "active" sequencing groups, unless only_active=False

Source code in src/cpg_flow/targets/dataset.py
197
198
199
200
201
202
203
204
def get_sequencing_groups(
    self,
    only_active: bool = True,
) -> list['SequencingGroup']:
    """
    Get dataset's sequencing groups. Include only "active" sequencing groups, unless only_active=False
    """
    return [s for sid, s in self._sequencing_group_by_id.items() if (s.active or not only_active)]

get_job_attrs

get_job_attrs()

Attributes for Hail Batch job.

Source code in src/cpg_flow/targets/dataset.py
206
207
208
209
210
211
212
213
def get_job_attrs(self) -> dict:
    """
    Attributes for Hail Batch job.
    """
    return {
        'dataset': self.name,
        # 'sequencing_groups': self.get_sequencing_group_ids(),
    }

get_job_prefix

get_job_prefix()

Prefix job names.

Source code in src/cpg_flow/targets/dataset.py
215
216
217
218
219
def get_job_prefix(self) -> str:
    """
    Prefix job names.
    """
    return f'{self.name}: '

write_ped_file

write_ped_file(out_path=None, use_participant_id=False)

Create a PED file for all sequencing groups PED is written with no header line to be strict specification compliant

Source code in src/cpg_flow/targets/dataset.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def write_ped_file(
    self,
    out_path: Path | None = None,
    use_participant_id: bool = False,
) -> Path:
    """
    Create a PED file for all sequencing groups
    PED is written with no header line to be strict specification compliant
    """
    datas = []
    for sequencing_group in self.get_sequencing_groups():
        datas.append(
            sequencing_group.pedigree.get_ped_dict(
                use_participant_id=use_participant_id,
            ),
        )
    if not datas:
        raise ValueError(f'No pedigree data found for {self.name}')
    df = pd.DataFrame(datas)

    if out_path is None:
        out_path = self.tmp_prefix() / 'ped' / f'{self.get_alignment_inputs_hash()}.ped'

    if not get_config()['workflow'].get('dry_run', False):
        with out_path.open('w') as fp:
            df.to_csv(fp, sep='\t', index=False, header=False)
    return out_path

cpg_flow.targets.MultiCohort

MultiCohort()

Bases: Target

Represents a "multi-cohort" target - multiple cohorts in the workflow.

Source code in src/cpg_flow/targets/multicohort.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self) -> None:
    super().__init__()

    # Previously MultiCohort.name was an underscore-delimited string of all the input cohorts
    # this was expanding to the point where filenames including this String were too long for *nix
    # instead we can create a hash of the input cohorts, and use that as the name
    # the exact cohorts can be obtained from the config associated with the ar-guid
    input_cohorts = get_config()['workflow'].get('input_cohorts', [])
    if input_cohorts:
        self.name = hash_from_list_of_strings(sorted(input_cohorts), suffix='cohorts')
    else:
        self.name = get_config()['workflow']['dataset']

    assert self.name, 'Ensure cohorts or dataset is defined in the config file.'

    self._cohorts_by_id: dict[str, Cohort] = {}
    self._datasets_by_name: dict[str, Dataset] = {}
    self.analysis_dataset = Dataset(name=get_config()['workflow']['dataset'])

alignment_inputs_hash instance-attribute

alignment_inputs_hash = None

name instance-attribute

name = hash_from_list_of_strings(
    sorted(input_cohorts), suffix="cohorts"
)

analysis_dataset instance-attribute

analysis_dataset = Dataset(
    name=get_config()["workflow"]["dataset"]
)

target_id property

target_id

Unique target ID

get_sequencing_group_ids

get_sequencing_group_ids(only_active=True)

Get flat list of all sequencing group IDs corresponding to this target.

Source code in src/cpg_flow/targets/target.py
69
70
71
72
73
def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]:
    """
    Get flat list of all sequencing group IDs corresponding to this target.
    """
    return [s.id for s in self.get_sequencing_groups(only_active=only_active)]

get_alignment_inputs_hash

get_alignment_inputs_hash()

If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts) - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets) - we then set up the Stages, where alignment input hashes are generated - at this point, the alignment inputs are fixed - all calls to get_alignment_inputs_hash() need to return the same value

Source code in src/cpg_flow/targets/target.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_alignment_inputs_hash(self) -> str:
    """
    If this hash has been set, return it, otherwise set it, then return it
    This should be safe as it matches the current usage:
    - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)
        - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)
    - we then set up the Stages, where alignment input hashes are generated
        - at this point, the alignment inputs are fixed
        - all calls to get_alignment_inputs_hash() need to return the same value
    """
    if self.alignment_inputs_hash is None:
        self.set_alignment_inputs_hash()
    if self.alignment_inputs_hash is None:
        raise TypeError('Alignment_inputs_hash was not populated by the setter method')
    return self.alignment_inputs_hash

set_alignment_inputs_hash

set_alignment_inputs_hash()

Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.

Source code in src/cpg_flow/targets/target.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def set_alignment_inputs_hash(self):
    """
    Unique hash string of sample alignment inputs. Useful to decide
    whether the analysis on the target needs to be rerun.
    """
    s = ' '.join(
        sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input),
    )
    h = hashlib.sha256(s.encode()).hexdigest()[:38]
    self.alignment_inputs_hash = f'{h}_{len(self.get_sequencing_group_ids())}'

get_job_prefix

get_job_prefix()

Prefix job names.

Source code in src/cpg_flow/targets/target.py
129
130
131
132
133
def get_job_prefix(self) -> str:
    """
    Prefix job names.
    """
    raise NotImplementedError

rich_id_map

rich_id_map()

Map if internal IDs to participant or external IDs, if the latter is provided.

Source code in src/cpg_flow/targets/target.py
135
136
137
138
139
def rich_id_map(self) -> dict[str, str]:
    """
    Map if internal IDs to participant or external IDs, if the latter is provided.
    """
    return {s.id: s.rich_id for s in self.get_sequencing_groups() if s.participant_id != s.id}

create_dataset

create_dataset(name)

Create a dataset and add it to the cohort.

Source code in src/cpg_flow/targets/multicohort.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def create_dataset(self, name: str) -> 'Dataset':
    """
    Create a dataset and add it to the cohort.
    """
    if name in self._datasets_by_name:
        return self._datasets_by_name[name]

    if name == self.analysis_dataset.name:
        ds = self.analysis_dataset
    else:
        ds = Dataset(name=name)

    self._datasets_by_name[ds.name] = ds
    return ds

get_cohorts

get_cohorts(only_active=True)

Gets list of all cohorts. Include only "active" cohorts (unless only_active is False)

Source code in src/cpg_flow/targets/multicohort.py
79
80
81
82
83
84
85
86
87
def get_cohorts(self, only_active: bool = True) -> list['Cohort']:
    """
    Gets list of all cohorts.
    Include only "active" cohorts (unless only_active is False)
    """
    cohorts = list(self._cohorts_by_id.values())
    if only_active:
        cohorts = [c for c in cohorts if c.active]
    return cohorts

get_cohort_ids

get_cohort_ids(only_active=True)

Get list of cohort IDs. Include only "active" cohorts (unless only_active is False)

Source code in src/cpg_flow/targets/multicohort.py
89
90
91
92
93
94
def get_cohort_ids(self, only_active: bool = True) -> list['str']:
    """
    Get list of cohort IDs.
    Include only "active" cohorts (unless only_active is False)
    """
    return [c.get_cohort_id() for c in self.get_cohorts(only_active)]

get_cohort_by_id

get_cohort_by_id(id, only_active=True)

Get cohort by id. Include only "active" cohorts (unless only_active is False)

Source code in src/cpg_flow/targets/multicohort.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def get_cohort_by_id(
    self,
    id: str,
    only_active: bool = True,
) -> Optional['Cohort']:
    """
    Get cohort by id.
    Include only "active" cohorts (unless only_active is False)
    """
    cohort = self._cohorts_by_id.get(id)
    if not cohort:
        LOGGER.warning(f'Cohort {id} not found in the multi-cohort')

    if not only_active:  # Return cohort even if it's inactive
        return cohort
    if isinstance(cohort, Cohort) and cohort.active:
        return cohort
    return None

get_datasets

get_datasets(only_active=True)

Gets list of all datasets. Include only "active" datasets (unless only_active is False)

Source code in src/cpg_flow/targets/multicohort.py
115
116
117
118
119
120
121
122
123
def get_datasets(self, only_active: bool = True) -> list['Dataset']:
    """
    Gets list of all datasets.
    Include only "active" datasets (unless only_active is False)
    """
    all_datasets = list(self._datasets_by_name.values())
    if only_active:
        all_datasets = [d for d in all_datasets if d.active and d.get_sequencing_groups()]
    return all_datasets

get_sequencing_groups

get_sequencing_groups(only_active=True)

Gets a flat list of all sequencing groups from all datasets. uses a dictionary to avoid duplicates (we could have the same sequencing group in multiple cohorts) Include only "active" sequencing groups (unless only_active is False)

Source code in src/cpg_flow/targets/multicohort.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def get_sequencing_groups(
    self,
    only_active: bool = True,
) -> list['SequencingGroup']:
    """
    Gets a flat list of all sequencing groups from all datasets.
    uses a dictionary to avoid duplicates (we could have the same sequencing group in multiple cohorts)
    Include only "active" sequencing groups (unless only_active is False)
    """
    all_sequencing_groups: dict[str, SequencingGroup] = {}
    for dataset in self.get_datasets(only_active):
        for sg in dataset.get_sequencing_groups(only_active):
            all_sequencing_groups[sg.id] = sg
    return list(all_sequencing_groups.values())

create_cohort

create_cohort(id, name)

Create a cohort and add it to the multi-cohort.

Source code in src/cpg_flow/targets/multicohort.py
140
141
142
143
144
145
146
147
148
149
150
def create_cohort(self, id: str, name: str) -> 'Cohort':
    """
    Create a cohort and add it to the multi-cohort.
    """
    if id in self._cohorts_by_id:
        LOGGER.debug(f'Cohort {id} already exists in the multi-cohort')
        return self._cohorts_by_id[id]

    c = Cohort(id=id, name=name)
    self._cohorts_by_id[c.id] = c
    return c

add_dataset

add_dataset(d)

Add a Dataset to the MultiCohort Args: d: Dataset object

Source code in src/cpg_flow/targets/multicohort.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def add_dataset(self, d: 'Dataset') -> 'Dataset':
    """
    Add a Dataset to the MultiCohort
    Args:
        d: Dataset object
    """
    if d.name in self._datasets_by_name:
        LOGGER.debug(
            f'Dataset {d.name} already exists in the MultiCohort {self.name}',
        )
    else:
        # We need create a new dataset to avoid manipulating the cohort dataset at this point
        self._datasets_by_name[d.name] = Dataset(d.name)
    return self._datasets_by_name[d.name]

get_dataset_by_name

get_dataset_by_name(name, only_active=True)

Get dataset by name. Include only "active" datasets (unless only_active is False)

Source code in src/cpg_flow/targets/multicohort.py
167
168
169
170
171
172
173
174
175
176
177
def get_dataset_by_name(
    self,
    name: str,
    only_active: bool = True,
) -> Optional['Dataset']:
    """
    Get dataset by name.
    Include only "active" datasets (unless only_active is False)
    """
    ds_by_name = {d.name: d for d in self.get_datasets(only_active)}
    return ds_by_name.get(name)

get_job_attrs

get_job_attrs()

Attributes for Hail Batch job.

Source code in src/cpg_flow/targets/multicohort.py
179
180
181
182
183
184
185
186
187
def get_job_attrs(self) -> dict:
    """
    Attributes for Hail Batch job.
    """
    return {
        # 'sequencing_groups': self.get_sequencing_group_ids(),
        'datasets': [d.name for d in self.get_datasets()],
        'cohorts': [c.id for c in self.get_cohorts()],
    }

write_ped_file

write_ped_file(out_path=None, use_participant_id=False)

Create a PED file for all samples in the whole MultiCohort Duplication of the Cohort method PED is written with no header line to be strict specification compliant

Source code in src/cpg_flow/targets/multicohort.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def write_ped_file(
    self,
    out_path: Path | None = None,
    use_participant_id: bool = False,
) -> Path:
    """
    Create a PED file for all samples in the whole MultiCohort
    Duplication of the Cohort method
    PED is written with no header line to be strict specification compliant
    """
    datas = []
    for sequencing_group in self.get_sequencing_groups():
        datas.append(
            sequencing_group.pedigree.get_ped_dict(
                use_participant_id=use_participant_id,
            ),
        )
    if not datas:
        raise ValueError(f'No pedigree data found for {self.name}')
    df = pd.DataFrame(datas)

    if out_path is None:
        out_path = self.analysis_dataset.tmp_prefix() / 'ped' / f'{self.get_alignment_inputs_hash()}.ped'

    if not get_config()['workflow'].get('dry_run', False):
        with out_path.open('w') as fp:
            df.to_csv(fp, sep='\t', index=False, header=False)
    return out_path

cpg_flow.targets.SequencingGroup

SequencingGroup(
    id,
    dataset,
    *,
    sequencing_type,
    sequencing_technology,
    sequencing_platform,
    external_id=None,
    participant_id=None,
    meta=None,
    sex=None,
    pedigree=None,
    alignment_input=None,
    assays=None,
    forced=False
)

Bases: Target

Represents a sequencing group.

Source code in src/cpg_flow/targets/sequencing_group.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    id: str,
    dataset: 'Dataset',
    *,
    sequencing_type: str,
    sequencing_technology: str,
    sequencing_platform: str,
    external_id: str | None = None,
    participant_id: str | None = None,
    meta: dict | None = None,
    sex: Sex | None = None,
    pedigree: Optional['PedigreeInfo'] = None,
    alignment_input: AlignmentInput | None = None,
    assays: tuple[Assay, ...] | None = None,
    forced: bool = False,
):
    super().__init__()
    self.id = id
    self.name = id
    self._external_id = external_id
    self.sequencing_type = sequencing_type
    self.sequencing_technology = sequencing_technology
    self.sequencing_platform = sequencing_platform

    self.dataset = dataset
    self._participant_id = participant_id
    self.meta: dict = meta or dict()
    self.pedigree: PedigreeInfo = pedigree or PedigreeInfo(
        sequencing_group=self,
        fam_id=self.participant_id,
        sex=sex or Sex.UNKNOWN,
    )
    if sex:
        self.pedigree.sex = sex
    self.alignment_input: AlignmentInput | None = alignment_input
    self.assays: tuple[Assay, ...] | None = assays
    self.forced = forced
    self.active = True
    # Only set if the file exists / found in Metamist:
    self.gvcf: GvcfPath | None = None
    self.cram: CramPath | None = None

alignment_inputs_hash instance-attribute

alignment_inputs_hash = None

pedigree instance-attribute

pedigree = pedigree or PedigreeInfo(
    sequencing_group=self,
    fam_id=participant_id,
    sex=sex or UNKNOWN,
)

participant_id property writable

participant_id

Get ID of participant corresponding to this sequencing group, or substitute it with external ID.

external_id property

external_id

Get external sample ID, or substitute it with the internal ID.

rich_id property

rich_id

ID for reporting purposes: composed of internal as well as external or participant IDs.

make_sv_evidence_path property

make_sv_evidence_path

Path to the evidence root for GATK-SV evidence files.

target_id property

target_id

Unique target ID

get_sequencing_group_ids

get_sequencing_group_ids(only_active=True)

Get flat list of all sequencing group IDs corresponding to this target.

Source code in src/cpg_flow/targets/target.py
69
70
71
72
73
def get_sequencing_group_ids(self, only_active: bool = True) -> list[str]:
    """
    Get flat list of all sequencing group IDs corresponding to this target.
    """
    return [s.id for s in self.get_sequencing_groups(only_active=only_active)]

get_alignment_inputs_hash

get_alignment_inputs_hash()

If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts) - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets) - we then set up the Stages, where alignment input hashes are generated - at this point, the alignment inputs are fixed - all calls to get_alignment_inputs_hash() need to return the same value

Source code in src/cpg_flow/targets/target.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_alignment_inputs_hash(self) -> str:
    """
    If this hash has been set, return it, otherwise set it, then return it
    This should be safe as it matches the current usage:
    - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)
        - at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)
    - we then set up the Stages, where alignment input hashes are generated
        - at this point, the alignment inputs are fixed
        - all calls to get_alignment_inputs_hash() need to return the same value
    """
    if self.alignment_inputs_hash is None:
        self.set_alignment_inputs_hash()
    if self.alignment_inputs_hash is None:
        raise TypeError('Alignment_inputs_hash was not populated by the setter method')
    return self.alignment_inputs_hash

set_alignment_inputs_hash

set_alignment_inputs_hash()

Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.

Source code in src/cpg_flow/targets/target.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def set_alignment_inputs_hash(self):
    """
    Unique hash string of sample alignment inputs. Useful to decide
    whether the analysis on the target needs to be rerun.
    """
    s = ' '.join(
        sorted(' '.join(str(s.alignment_input)) for s in self.get_sequencing_groups() if s.alignment_input),
    )
    h = hashlib.sha256(s.encode()).hexdigest()[:38]
    self.alignment_inputs_hash = f'{h}_{len(self.get_sequencing_group_ids())}'

rich_id_map

rich_id_map()

Map if internal IDs to participant or external IDs, if the latter is provided.

Source code in src/cpg_flow/targets/target.py
135
136
137
138
139
def rich_id_map(self) -> dict[str, str]:
    """
    Map if internal IDs to participant or external IDs, if the latter is provided.
    """
    return {s.id: s.rich_id for s in self.get_sequencing_groups() if s.participant_id != s.id}

get_ped_dict

get_ped_dict(use_participant_id=False)

Returns a dictionary of pedigree fields for this sequencing group, corresponding a PED file entry.

Source code in src/cpg_flow/targets/sequencing_group.py
126
127
128
129
130
131
def get_ped_dict(self, use_participant_id: bool = False) -> dict[str, str]:
    """
    Returns a dictionary of pedigree fields for this sequencing group, corresponding
    a PED file entry.
    """
    return self.pedigree.get_ped_dict(use_participant_id)

make_cram_path

make_cram_path()

Path to a CRAM file. Not checking its existence here.

Source code in src/cpg_flow/targets/sequencing_group.py
133
134
135
136
137
138
139
140
141
142
def make_cram_path(self) -> CramPath:
    """
    Path to a CRAM file. Not checking its existence here.
    """
    path = self.dataset.prefix() / 'cram' / f'{self.id}.cram'
    return CramPath(
        path=path,
        index_path=path.with_suffix('.cram.crai'),
        reference_assembly=reference_path('broad/ref_fasta'),
    )

make_gvcf_path

make_gvcf_path()

Path to a GVCF file. Not checking its existence here.

Source code in src/cpg_flow/targets/sequencing_group.py
144
145
146
147
148
def make_gvcf_path(self) -> GvcfPath:
    """
    Path to a GVCF file. Not checking its existence here.
    """
    return GvcfPath(self.dataset.prefix() / 'gvcf' / f'{self.id}.g.vcf.gz')

get_sequencing_groups

get_sequencing_groups(only_active=True)

Implementing the abstract method.

Source code in src/cpg_flow/targets/sequencing_group.py
162
163
164
165
166
167
168
169
170
171
def get_sequencing_groups(
    self,
    only_active: bool = True,
) -> list['SequencingGroup']:
    """
    Implementing the abstract method.
    """
    if only_active and not self.active:
        return []
    return [self]

get_job_attrs

get_job_attrs()

Attributes for Hail Batch job.

Source code in src/cpg_flow/targets/sequencing_group.py
173
174
175
176
177
178
179
180
181
182
183
184
def get_job_attrs(self) -> dict:
    """
    Attributes for Hail Batch job.
    """
    attrs = {
        'dataset': self.dataset.name,
        'sequencing_group': self.id,
    }
    _participant_id: str | None = self._participant_id or self._external_id
    if _participant_id:
        attrs['participant_id'] = _participant_id
    return attrs

get_job_prefix

get_job_prefix()

Prefix job names.

Source code in src/cpg_flow/targets/sequencing_group.py
186
187
188
189
190
def get_job_prefix(self) -> str:
    """
    Prefix job names.
    """
    return f'{self.dataset.name}/{self.id}: '