Skip to content

Utils

Utility functions and constants.

cpg_flow.utils.get_logger

get_logger(
    logger_name="cpg_workflows",
    log_level=INFO,
    fmt_string=DEFAULT_LOG_FORMAT,
)

creates a logger instance (so as not to use the root logger) Args: logger_name (str): log_level (int): logging level, defaults to INFO. Can be overridden by config fmt_string (str): format string for this logger, defaults to DEFAULT_LOG_FORMAT Returns: a logger instance, if required create it first

Source code in src/cpg_flow/utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_logger(
    logger_name: str = 'cpg_workflows',
    log_level: int = logging.INFO,
    fmt_string: str = DEFAULT_LOG_FORMAT,
) -> logging.Logger:
    """
    creates a logger instance (so as not to use the root logger)
    Args:
        logger_name (str):
        log_level (int): logging level, defaults to INFO. Can be overridden by config
        fmt_string (str): format string for this logger, defaults to DEFAULT_LOG_FORMAT
    Returns:
        a logger instance, if required create it first
    """

    if logger_name not in LOGGERS:
        # allow a log-level & format override on a name basis
        log_level = config_retrieve(['workflow', 'logger', logger_name, 'level'], log_level)
        fmt_string = config_retrieve(['workflow', 'logger', logger_name, 'format'], fmt_string)

        # create a named logger
        new_logger = logging.getLogger(logger_name)
        new_logger.setLevel(log_level)

        # unless otherwise specified, use coloredlogs
        if config_retrieve(['workflow', 'logger', logger_name, 'use_colored_logs'], True):
            coloredlogs.install(level=log_level, fmt=fmt_string, logger=new_logger)

        # create a stream handler to write output
        stream_handler = logging.StreamHandler()
        stream_handler.setLevel(log_level)

        # create format string for messages
        formatter = logging.Formatter(fmt_string)
        stream_handler.setFormatter(formatter)

        # set the logger to use this handler
        new_logger.addHandler(stream_handler)

        LOGGERS[logger_name] = new_logger

    return LOGGERS[logger_name]

cpg_flow.utils.chunks

chunks(iterable, chunk_size)

Yield successive n-sized chunks from an iterable

PARAMETER DESCRIPTION
iterable

any iterable - tuple, str, list, set

chunk_size

size of intervals to return

RETURNS DESCRIPTION

intervals of requested size across the collection

Source code in src/cpg_flow/utils.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def chunks(iterable, chunk_size):
    """
    Yield successive n-sized chunks from an iterable

    Args:
        iterable (): any iterable - tuple, str, list, set
        chunk_size (): size of intervals to return

    Returns:
        intervals of requested size across the collection
    """

    if isinstance(iterable, set):
        iterable = list(iterable)

    for i in range(0, len(iterable), chunk_size):
        yield iterable[i : (i + chunk_size)]

cpg_flow.utils.generator_chunks

generator_chunks(generator, size)

Iterates across a generator, returning specifically sized chunks

PARAMETER DESCRIPTION
generator

any generator or method implementing yield

size

size of iterator to return

RETURNS DESCRIPTION

a subset of the generator results

Source code in src/cpg_flow/utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def generator_chunks(generator, size):
    """
    Iterates across a generator, returning specifically sized chunks

    Args:
        generator (): any generator or method implementing yield
        size (): size of iterator to return

    Returns:
        a subset of the generator results
    """
    iterator = iter(generator)
    for first in iterator:
        yield list(chain([first], islice(iterator, size - 1)))

cpg_flow.utils.read_hail

read_hail(path)

read a hail object using the appropriate method Args: path (str): path to the input object Returns: hail object (hl.MatrixTable or hl.Table)

Source code in src/cpg_flow/utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def read_hail(path):
    """
    read a hail object using the appropriate method
    Args:
        path (str): path to the input object
    Returns:
        hail object (hl.MatrixTable or hl.Table)
    """
    if path.strip('/').endswith('.ht'):
        t = hl.read_table(str(path))
    else:
        assert path.strip('/').endswith('.mt')
        t = hl.read_matrix_table(str(path))
    LOGGER.info(f'Read data from {path}')
    return t

cpg_flow.utils.checkpoint_hail

checkpoint_hail(
    t, file_name, checkpoint_prefix=None, allow_reuse=False
)

checkpoint method provide with a path and a prefix (GCP directory, can be None) allow_reuse sets whether the checkpoint can be reused - we typically want to avoid reuse, as it means we're continuing a previous failure from an unknown state

PARAMETER DESCRIPTION
t

TYPE: Table | MatrixTable

file_name

name for this checkpoint

TYPE: str

checkpoint_prefix

path to the checkpoint directory

TYPE: str DEFAULT: None

allow_reuse

whether to permit reuse of an existing checkpoint

TYPE: bool DEFAULT: False

Source code in src/cpg_flow/utils.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def checkpoint_hail(
    t: hl.Table | hl.MatrixTable,
    file_name: str,
    checkpoint_prefix: str | None = None,
    allow_reuse=False,
):
    """
    checkpoint method
    provide with a path and a prefix (GCP directory, can be None)
    allow_reuse sets whether the checkpoint can be reused - we
    typically want to avoid reuse, as it means we're continuing a previous
    failure from an unknown state

    Args:
        t (hl.Table | hl.MatrixTable):
        file_name (str): name for this checkpoint
        checkpoint_prefix (str): path to the checkpoint directory
        allow_reuse (bool): whether to permit reuse of an existing checkpoint
    """

    # drop the schema here
    t.describe()

    # log the current number of partitions
    LOGGER.info(f'Checkpointing object as {t.n_partitions()} partitions')

    if checkpoint_prefix is None:
        return t

    path = join(checkpoint_prefix, file_name)
    if can_reuse(path) and allow_reuse:
        LOGGER.info(f'Re-using {path}')
        return read_hail(path)

    LOGGER.info(f'Checkpointing {path}')
    return t.checkpoint(path, overwrite=True)

cpg_flow.utils.exists cached

exists(path, verbose=True)

exists_not_cached that caches the result.

The python code runtime happens entirely during the workflow construction, without waiting for it to finish, so there is no expectation that the object existence status would change during the runtime. This, this function uses @lru_cache to make sure that object existence is checked only once.

Source code in src/cpg_flow/utils.py
174
175
176
177
178
179
180
181
182
183
184
@lru_cache
def exists(path: Path | str, verbose: bool = True) -> bool:
    """
    `exists_not_cached` that caches the result.

    The python code runtime happens entirely during the workflow construction,
    without waiting for it to finish, so there is no expectation that the object
    existence status would change during the runtime. This, this function uses
    `@lru_cache` to make sure that object existence is checked only once.
    """
    return exists_not_cached(path, verbose)

cpg_flow.utils.exists_not_cached

exists_not_cached(path, verbose=True)

Check if the object by path exists, where the object can be: * local file, * local directory, * cloud object, * cloud or local *.mt, *.ht, or *.vds Hail data, in which case it will check for the existence of a corresponding _SUCCESS object instead. @param path: path to the file/directory/object/mt/ht @param verbose: print on each check @return: True if the object exists

Source code in src/cpg_flow/utils.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def exists_not_cached(path: Path | str, verbose: bool = True) -> bool:
    """
    Check if the object by path exists, where the object can be:
        * local file,
        * local directory,
        * cloud object,
        * cloud or local *.mt, *.ht, or *.vds Hail data, in which case it will check
          for the existence of a corresponding _SUCCESS object instead.
    @param path: path to the file/directory/object/mt/ht
    @param verbose: print on each check
    @return: True if the object exists
    """
    path = cast(Path, to_path(path))

    if path.suffix in ['.mt', '.ht']:
        path /= '_SUCCESS'
    if path.suffix in ['.vds']:
        path /= 'variant_data/_SUCCESS'

    if verbose:
        # noinspection PyBroadException
        try:
            res = check_exists_path(path)

        # a failure to detect the parent folder causes a crash
        # instead stick to a core responsibility -
        # existence = False
        except FileNotFoundError as fnfe:
            LOGGER.error(f'Failed checking {path}')
            LOGGER.error(f'{fnfe}')
            return False
        except BaseException:
            traceback.print_exc()
            LOGGER.error(f'Failed checking {path}')
            sys.exit(1)
        LOGGER.debug(f'Checked {path} [' + ('exists' if res else 'missing') + ']')
        return res

    return check_exists_path(path)

cpg_flow.utils.check_exists_path

check_exists_path(test_path)

Check whether a path exists using a cached per-directory listing. NB. reversion to Strings prevents a get call, which is typically forbidden to local users - this prevents this method being used in the metamist audit processes

Source code in src/cpg_flow/utils.py
228
229
230
231
232
233
234
235
def check_exists_path(test_path: Path) -> bool:
    """
    Check whether a path exists using a cached per-directory listing.
    NB. reversion to Strings prevents a get call, which is typically
    forbidden to local users - this prevents this method being used in the
    metamist audit processes
    """
    return basename(str(test_path)) in get_contents_of_path(dirname(str(test_path)))

cpg_flow.utils.get_contents_of_path cached

get_contents_of_path(test_path)

Get the contents of a GCS path, returning non-complete paths, eg:

get_contents_of_path('gs://my-bucket/my-dir/')
'my-file.txt'
Source code in src/cpg_flow/utils.py
238
239
240
241
242
243
244
245
246
247
@lru_cache
def get_contents_of_path(test_path: str) -> set[str]:
    """
    Get the contents of a GCS path, returning non-complete paths, eg:

        get_contents_of_path('gs://my-bucket/my-dir/')
        'my-file.txt'

    """
    return {f.name for f in to_path(test_path.rstrip('/')).iterdir()}

cpg_flow.utils.can_reuse

can_reuse(path, overwrite=False)

Checks if the object at path is good to reuse: * overwrite has the default value of False, * check_intermediates has the default value of True, * object exists.

If path is a collection, it requires all paths to exist.

Source code in src/cpg_flow/utils.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def can_reuse(
    path: list[Path] | Path | str | None,
    overwrite: bool = False,
) -> bool:
    """
    Checks if the object at `path` is good to reuse:
    * overwrite has the default value of False,
    * check_intermediates has the default value of True,
    * object exists.

    If `path` is a collection, it requires all paths to exist.
    """
    if overwrite:
        return False

    if not get_config()['workflow'].get('check_intermediates', True):
        return False

    if not path:
        return False

    paths = path if isinstance(path, list) else [path]
    if not all(exists(fp, overwrite) for fp in paths):
        return False

    LOGGER.debug(f'Reusing existing {path}')
    return True

cpg_flow.utils.timestamp

timestamp(rand_suffix_len=5)

Generate a timestamp string. If rand_suffix_len is set, adds a short random string of this length for uniqueness.

Source code in src/cpg_flow/utils.py
279
280
281
282
283
284
285
286
287
288
289
290
def timestamp(rand_suffix_len: int = 5) -> str:
    """
    Generate a timestamp string. If `rand_suffix_len` is set, adds a short random
    string of this length for uniqueness.
    """
    result = time.strftime('%Y_%m%d_%H%M')
    if rand_suffix_len:
        rand_bit = ''.join(
            choices(string.ascii_uppercase + string.digits, k=rand_suffix_len),
        )
        result += f'_{rand_bit}'
    return result

cpg_flow.utils.slugify

slugify(line)

Slugify a string.

Example:

slugify(u'Héllø W.1') 'hello-w-1'

Source code in src/cpg_flow/utils.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def slugify(line: str):
    """
    Slugify a string.

    Example:
    >>> slugify(u'Héllø W.1')
    'hello-w-1'
    """

    line = unicodedata.normalize('NFKD', line).encode('ascii', 'ignore').decode()
    line = line.strip().lower()
    line = re.sub(
        r'[\s.]+',
        '-',
        line,
    )
    return line

cpg_flow.utils.rich_sequencing_group_id_seds

rich_sequencing_group_id_seds(rich_id_map, file_names)

Helper function to add seds into a command that would extend sequencing group IDs in each file in file_names with an external ID, only if external ID is different from the original.

@param rich_id_map: map used to replace sequencing groups, e.g. {'CPGAA': 'CPGAA|EXTID'} @param file_names: file names and Hail Batch Resource files where to replace IDs @return: bash command that does replacement

Source code in src/cpg_flow/utils.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def rich_sequencing_group_id_seds(
    rich_id_map: dict[str, str],
    file_names: list[str | ResourceFile],
) -> str:
    """
    Helper function to add seds into a command that would extend sequencing group IDs
    in each file in `file_names` with an external ID, only if external ID is
    different from the original.

    @param rich_id_map: map used to replace sequencing groups, e.g. {'CPGAA': 'CPGAA|EXTID'}
    @param file_names: file names and Hail Batch Resource files where to replace IDs
    @return: bash command that does replacement
    """
    cmd = ''
    for sgid, rich_sgid in rich_id_map.items():
        for fname in file_names:
            cmd += f"sed -iBAK 's/{sgid}/{rich_sgid}/g' {fname}"
            cmd += '\n'
    return cmd

cpg_flow.utils.tshirt_mt_sizing

tshirt_mt_sizing(sequencing_type, cohort_size)

Some way of taking the details we have (#SGs, sequencing type) and producing an estimate (with padding) of the MT size on disc used to determine VM provision during ES export and Talos

PARAMETER DESCRIPTION
sequencing_type

cohort_size

RETURNS DESCRIPTION
int

str, the value for job.storage(X)

Source code in src/cpg_flow/utils.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def tshirt_mt_sizing(sequencing_type: str, cohort_size: int) -> int:
    """
    Some way of taking the details we have (#SGs, sequencing type)
    and producing an estimate (with padding) of the MT size on disc
    used to determine VM provision during ES export and Talos

    Args:
        sequencing_type ():
        cohort_size ():

    Returns:
        str, the value for job.storage(X)
    """

    # allow for an override from config
    if preset := config_retrieve(['workflow', 'es_storage'], False):
        return preset

    if (sequencing_type == 'genome' and cohort_size < 100) or (sequencing_type == 'exome' and cohort_size < 1000):
        return 50
    return 500

cpg_flow.utils.get_intervals_from_bed

get_intervals_from_bed(intervals_path)

Read genomic intervals from a bed file. Increment the start position of each interval by 1 to match the 1-based coordinate system used by GATK.

Returns a list of interval strings in the format 'chrN:start-end'.

Source code in src/cpg_flow/utils.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def get_intervals_from_bed(intervals_path: Path) -> list[str]:
    """
    Read genomic intervals from a bed file.
    Increment the start position of each interval by 1 to match the 1-based
    coordinate system used by GATK.

    Returns a list of interval strings in the format 'chrN:start-end'.
    """
    with intervals_path.open('r') as f:
        intervals = []
        for line in f:
            chrom, start, end = line.strip().split('\t')
            intervals.append(f'{chrom}:{int(start)+1}-{end}')
    return intervals

cpg_flow.utils.make_job_name

make_job_name(
    name,
    sequencing_group=None,
    participant_id=None,
    dataset=None,
    part=None,
)

Extend the descriptive job name to reflect job attributes.

Source code in src/cpg_flow/utils.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def make_job_name(
    name: str,
    sequencing_group: str | None = None,
    participant_id: str | None = None,
    dataset: str | None = None,
    part: str | None = None,
) -> str:
    """
    Extend the descriptive job name to reflect job attributes.
    """
    if sequencing_group and participant_id:
        sequencing_group = f'{sequencing_group}/{participant_id}'
    if sequencing_group and dataset:
        name = f'{dataset}/{sequencing_group}: {name}'
    elif dataset:
        name = f'{dataset}: {name}'
    if part:
        name += f', {part}'
    return name

cpg_flow.utils.hash_from_list_of_strings

hash_from_list_of_strings(
    string_list, hash_length=10, suffix=None
)

Create a hash from a list of strings Args: string_list (): hash_length (int): how many characters to use from the hash suffix (str): optional, clarify the type of value which was hashed Returns:

Source code in src/cpg_flow/utils.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def hash_from_list_of_strings(string_list: list[str], hash_length: int = 10, suffix: str | None = None) -> str:
    """
    Create a hash from a list of strings
    Args:
        string_list ():
        hash_length (int): how many characters to use from the hash
        suffix (str): optional, clarify the type of value which was hashed
    Returns:
    """
    hash_portion = hashlib.sha256(' '.join(string_list).encode()).hexdigest()[:hash_length]
    full_hash = f'{hash_portion}_{len(string_list)}'

    if suffix:
        full_hash += f'_{suffix}'
    return full_hash

cpg_flow.utils.write_to_gcs_bucket

write_to_gcs_bucket(contents, path)
Source code in src/cpg_flow/utils.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def write_to_gcs_bucket(contents, path: Path):
    client = storage.Client()

    if not str(path).startswith('gs:/'):
        raise ValueError(f'Path {path} must be a GCS path')

    path = str(path).removeprefix('gs:/').removeprefix('/')
    bucket_name, blob_name = path.split('/', 1)

    bucket = client.bucket(bucket_name)
    if not bucket.exists():
        raise ValueError(f'Bucket {bucket_name} does not exist')

    blob = bucket.blob(blob_name)
    blob.upload_from_string(contents)

    return bucket_name, blob_name