Skip to content

Utils

Utility functions and constants.

cpg_flow.utils.format_logger

format_logger(
    log_level=no,
    fmt_string=DEFAULT_LOG_FORMAT,
    coloured=COLOURED_LOGS,
)

loguru is a cleaner interface than the standard logging module, but it doesn't allow for multiple instances instead of calling a get_logger function which returns a logger, we assume that any module using logging has imported from loguru import logger to get access to the logger.

loguru.logger is also resistant to deepcopy, so there really is only a single global instance, meaning that the display/formatting of the logger is global to the entire process, and should only be set once.

This helper method formats the logger instance with the given parameters, stripping out any previous handlers Because the global logger instance is modified, there is no return value

from loguru import logger from cpg_flow.utils import format_logger format_logger(log_level=10, fmt_string='{time} {level} {message}', coloured=True) logger.info('This is an info message')

PARAMETER DESCRIPTION
log_level

logging level, defaults to INFO. Can be overridden by config

TYPE: int DEFAULT: no

fmt_string

format string for this logger, defaults to DEFAULT_LOG_FORMAT

TYPE: str DEFAULT: DEFAULT_LOG_FORMAT

coloured

whether to colour the logger output

TYPE: bool DEFAULT: COLOURED_LOGS

Source code in src/cpg_flow/utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def format_logger(
    log_level: int = logger.level('INFO').no,
    fmt_string: str = DEFAULT_LOG_FORMAT,
    coloured: bool = COLOURED_LOGS,
) -> None:
    """
    loguru is a cleaner interface than the standard logging module, but it doesn't allow for multiple instances
    instead of calling a get_logger function which returns a logger, we assume that any module using logging has
    imported `from loguru import logger` to get access to the logger.

    loguru.logger is also resistant to deepcopy, so there really is only a single global instance, meaning that the
    display/formatting of the logger is global to the entire process, and should only be set once.

    This helper method formats the logger instance with the given parameters, stripping out any previous handlers
    Because the global logger instance is modified, there is no return value

    >>> from loguru import logger
    >>> from cpg_flow.utils import format_logger
    >>> format_logger(log_level=10, fmt_string='{time} {level} {message}', coloured=True)
    >>> logger.info('This is an info message')

    Args:
        log_level (int): logging level, defaults to INFO. Can be overridden by config
        fmt_string (str): format string for this logger, defaults to DEFAULT_LOG_FORMAT
        coloured (bool): whether to colour the logger output
    """

    # Remove any previous loguru handlers
    logger.remove()

    # Add loguru handler with given format and level
    logger.add(
        sys.stdout,
        level=log_level,
        format=fmt_string,
        colorize=coloured,
        enqueue=True,
    )

cpg_flow.utils.chunks

chunks(iterable, chunk_size)

Yield successive n-sized chunks from an iterable

PARAMETER DESCRIPTION
iterable

any iterable - tuple, str, list, set

chunk_size

size of intervals to return

RETURNS DESCRIPTION
Iterator[Any]

intervals of requested size across the collection

Source code in src/cpg_flow/utils.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def chunks(iterable, chunk_size) -> Iterator[Any]:
    """
    Yield successive n-sized chunks from an iterable

    Args:
        iterable (): any iterable - tuple, str, list, set
        chunk_size (): size of intervals to return

    Returns:
        intervals of requested size across the collection
    """

    if isinstance(iterable, set):
        iterable = list(iterable)

    for i in range(0, len(iterable), chunk_size):
        yield iterable[i : (i + chunk_size)]

cpg_flow.utils.generator_chunks

generator_chunks(generator, size)

Iterates across a generator, returning specifically sized chunks

PARAMETER DESCRIPTION
generator

any generator or method implementing yield

size

size of iterator to return

RETURNS DESCRIPTION
Iterator[list[Any]]

a subset of the generator results

Source code in src/cpg_flow/utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def generator_chunks(generator, size) -> Iterator[list[Any]]:
    """
    Iterates across a generator, returning specifically sized chunks

    Args:
        generator (): any generator or method implementing yield
        size (): size of iterator to return

    Returns:
        a subset of the generator results
    """
    iterator = iter(generator)
    for first in iterator:
        yield list(chain([first], islice(iterator, size - 1)))

cpg_flow.utils.read_hail

read_hail(path)

read a hail object using the appropriate method Args: path (str): path to the input object Returns: hail object (hl.MatrixTable or hl.Table)

Source code in src/cpg_flow/utils.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def read_hail(path):
    """
    read a hail object using the appropriate method
    Args:
        path (str): path to the input object
    Returns:
        hail object (hl.MatrixTable or hl.Table)
    """
    if path.strip('/').endswith('.ht'):
        t = hl.read_table(str(path))
    else:
        assert path.strip('/').endswith('.mt')
        t = hl.read_matrix_table(str(path))
    logger.info(f'Read data from {path}')
    return t

cpg_flow.utils.checkpoint_hail

checkpoint_hail(
    t, file_name, checkpoint_prefix=None, allow_reuse=False
)

checkpoint method provide with a path and a prefix (GCP directory, can be None) allow_reuse sets whether the checkpoint can be reused - we typically want to avoid reuse, as it means we're continuing a previous failure from an unknown state

PARAMETER DESCRIPTION
t

TYPE: Table | MatrixTable

file_name

name for this checkpoint

TYPE: str

checkpoint_prefix

path to the checkpoint directory

TYPE: str DEFAULT: None

allow_reuse

whether to permit reuse of an existing checkpoint

TYPE: bool DEFAULT: False

Source code in src/cpg_flow/utils.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def checkpoint_hail(
    t: hl.Table | hl.MatrixTable,
    file_name: str,
    checkpoint_prefix: str | None = None,
    allow_reuse=False,
):
    """
    checkpoint method
    provide with a path and a prefix (GCP directory, can be None)
    allow_reuse sets whether the checkpoint can be reused - we
    typically want to avoid reuse, as it means we're continuing a previous
    failure from an unknown state

    Args:
        t (hl.Table | hl.MatrixTable):
        file_name (str): name for this checkpoint
        checkpoint_prefix (str): path to the checkpoint directory
        allow_reuse (bool): whether to permit reuse of an existing checkpoint
    """

    # drop the schema here
    t.describe()

    # log the current number of partitions
    logger.info(f'Checkpointing object as {t.n_partitions()} partitions')

    if checkpoint_prefix is None:
        return t

    path = join(checkpoint_prefix, file_name)
    if can_reuse(path) and allow_reuse:
        logger.info(f'Re-using {path}')
        return read_hail(path)

    logger.info(f'Checkpointing {path}')
    return t.checkpoint(path, overwrite=True)

cpg_flow.utils.exists cached

exists(path, verbose=True)

exists_not_cached that caches the result.

The python code runtime happens entirely during the workflow construction, without waiting for it to finish, so there is no expectation that the object existence status would change during the runtime. This, this function uses @lru_cache to make sure that object existence is checked only once.

Source code in src/cpg_flow/utils.py
167
168
169
170
171
172
173
174
175
176
177
@lru_cache
def exists(path: Path | str, verbose: bool = True) -> bool:
    """
    `exists_not_cached` that caches the result.

    The python code runtime happens entirely during the workflow construction,
    without waiting for it to finish, so there is no expectation that the object
    existence status would change during the runtime. This, this function uses
    `@lru_cache` to make sure that object existence is checked only once.
    """
    return exists_not_cached(path, verbose)

cpg_flow.utils.exists_not_cached

exists_not_cached(path, verbose=True)

Check if the object by path exists, where the object can be: * local file, * local directory, * cloud object, * cloud or local *.mt, *.ht, or *.vds Hail data, in which case it will check for the existence of a corresponding _SUCCESS object instead. @param path: path to the file/directory/object/mt/ht @param verbose: print on each check @return: True if the object exists

Source code in src/cpg_flow/utils.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def exists_not_cached(path: Path | str, verbose: bool = True) -> bool:
    """
    Check if the object by path exists, where the object can be:
        * local file,
        * local directory,
        * cloud object,
        * cloud or local *.mt, *.ht, or *.vds Hail data, in which case it will check
          for the existence of a corresponding _SUCCESS object instead.
    @param path: path to the file/directory/object/mt/ht
    @param verbose: print on each check
    @return: True if the object exists
    """
    path = cast('Path', to_path(path))

    if path.suffix in {'.mt', '.ht'}:
        path /= '_SUCCESS'
    if path.suffix == '.vds':
        path /= 'variant_data/_SUCCESS'

    if verbose:
        # noinspection PyBroadException
        try:
            res = check_exists_path(path)

        # a failure to detect the parent folder causes a crash
        # instead stick to a core responsibility -
        # existence = False
        except FileNotFoundError as fnfe:
            logger.error(f'Failed checking {path}')
            logger.error(f'{fnfe}')
            return False
        except BaseException:
            traceback.print_exc()
            logger.error(f'Failed checking {path}')
            sys.exit(1)
        logger.debug(f'Checked {path} [' + ('exists' if res else 'missing') + ']')
        return res

    return check_exists_path(path)

cpg_flow.utils.check_exists_path

check_exists_path(test_path)

Check whether a path exists using a cached per-directory listing. NB. reversion to Strings prevents a get call, which is typically forbidden to local users - this prevents this method being used in the metamist audit processes

Source code in src/cpg_flow/utils.py
221
222
223
224
225
226
227
228
def check_exists_path(test_path: Path) -> bool:
    """
    Check whether a path exists using a cached per-directory listing.
    NB. reversion to Strings prevents a get call, which is typically
    forbidden to local users - this prevents this method being used in the
    metamist audit processes
    """
    return basename(str(test_path)) in get_contents_of_path(dirname(str(test_path)))

cpg_flow.utils.get_contents_of_path cached

get_contents_of_path(test_path)

Get the contents of a GCS path, returning non-complete paths, eg:

get_contents_of_path('gs://my-bucket/my-dir/')
'my-file.txt'
Source code in src/cpg_flow/utils.py
231
232
233
234
235
236
237
238
239
240
@lru_cache
def get_contents_of_path(test_path: str) -> set[str]:
    """
    Get the contents of a GCS path, returning non-complete paths, eg:

        get_contents_of_path('gs://my-bucket/my-dir/')
        'my-file.txt'

    """
    return {f.name for f in to_path(test_path.rstrip('/')).iterdir()}

cpg_flow.utils.can_reuse

can_reuse(path, overwrite=False)

Checks if the object at path is good to reuse: * overwrite has the default value of False, * check_intermediates has the default value of True, * object exists.

If path is a collection, it requires all paths to exist.

Source code in src/cpg_flow/utils.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def can_reuse(
    path: list[Path] | Path | str | None,
    overwrite: bool = False,
) -> bool:
    """
    Checks if the object at `path` is good to reuse:
    * overwrite has the default value of False,
    * check_intermediates has the default value of True,
    * object exists.

    If `path` is a collection, it requires all paths to exist.
    """
    if overwrite:
        return False

    if not get_config()['workflow'].get('check_intermediates', True):
        return False

    if not path:
        return False

    paths = path if isinstance(path, list) else [path]
    if not all(exists(fp, overwrite) for fp in paths):
        return False

    logger.debug(f'Reusing existing {path}')
    return True

cpg_flow.utils.timestamp

timestamp(rand_suffix_len=5)

Generate a timestamp string. If rand_suffix_len is set, adds a short random string of this length for uniqueness.

Source code in src/cpg_flow/utils.py
272
273
274
275
276
277
278
279
280
281
282
283
def timestamp(rand_suffix_len: int = 5) -> str:
    """
    Generate a timestamp string. If `rand_suffix_len` is set, adds a short random
    string of this length for uniqueness.
    """
    result = time.strftime('%Y_%m%d_%H%M')
    if rand_suffix_len:
        rand_bit = ''.join(
            choices(string.ascii_uppercase + string.digits, k=rand_suffix_len),
        )
        result += f'_{rand_bit}'
    return result

cpg_flow.utils.slugify

slugify(line)

Slugify a string.

Example:

slugify(u'Héllø W.1') 'hello-w-1'

Source code in src/cpg_flow/utils.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def slugify(line: str):
    """
    Slugify a string.

    Example:
    >>> slugify(u'Héllø W.1')
    'hello-w-1'
    """

    line = unicodedata.normalize('NFKD', line).encode('ascii', 'ignore').decode()
    line = line.strip().lower()
    line = re.sub(
        r'[\s.]+',
        '-',
        line,
    )
    return line

cpg_flow.utils.rich_sequencing_group_id_seds

rich_sequencing_group_id_seds(rich_id_map, file_names)

Helper function to add seds into a command that would extend sequencing group IDs in each file in file_names with an external ID, only if external ID is different from the original.

@param rich_id_map: map used to replace sequencing groups, e.g. {'CPGAA': 'CPGAA|EXTID'} @param file_names: file names and Hail Batch Resource files where to replace IDs @return: bash command that does replacement

Source code in src/cpg_flow/utils.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def rich_sequencing_group_id_seds(
    rich_id_map: dict[str, str],
    file_names: list[str | ResourceFile],
) -> str:
    """
    Helper function to add seds into a command that would extend sequencing group IDs
    in each file in `file_names` with an external ID, only if external ID is
    different from the original.

    @param rich_id_map: map used to replace sequencing groups, e.g. {'CPGAA': 'CPGAA|EXTID'}
    @param file_names: file names and Hail Batch Resource files where to replace IDs
    @return: bash command that does replacement
    """
    cmd = ''
    for sgid, rich_sgid in rich_id_map.items():
        for fname in file_names:
            cmd += f"sed -iBAK 's/{sgid}/{rich_sgid}/g' {fname}"
            cmd += '\n'
    return cmd

cpg_flow.utils.tshirt_mt_sizing

tshirt_mt_sizing(sequencing_type, cohort_size)

Some way of taking the details we have (#SGs, sequencing type) and producing an estimate (with padding) of the MT size on disc used to determine VM provision during ES export and Talos

PARAMETER DESCRIPTION
sequencing_type

cohort_size

RETURNS DESCRIPTION
int

str, the value for job.storage(X)

Source code in src/cpg_flow/utils.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def tshirt_mt_sizing(sequencing_type: str, cohort_size: int) -> int:
    """
    Some way of taking the details we have (#SGs, sequencing type)
    and producing an estimate (with padding) of the MT size on disc
    used to determine VM provision during ES export and Talos

    Args:
        sequencing_type ():
        cohort_size ():

    Returns:
        str, the value for job.storage(X)
    """

    # allow for an override from config
    if preset := config_retrieve(['workflow', 'es_storage'], False):
        return preset

    if (sequencing_type == 'genome' and cohort_size < 100) or (sequencing_type == 'exome' and cohort_size < 1000):
        return 50
    return 500

cpg_flow.utils.get_intervals_from_bed

get_intervals_from_bed(intervals_path)

Read genomic intervals from a bed file. Increment the start position of each interval by 1 to match the 1-based coordinate system used by GATK.

Returns a list of interval strings in the format 'chrN:start-end'.

Source code in src/cpg_flow/utils.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def get_intervals_from_bed(intervals_path: Path) -> list[str]:
    """
    Read genomic intervals from a bed file.
    Increment the start position of each interval by 1 to match the 1-based
    coordinate system used by GATK.

    Returns a list of interval strings in the format 'chrN:start-end'.
    """
    with intervals_path.open('r') as f:
        intervals = []
        for line in f:
            chrom, start, end = line.strip().split('\t')
            intervals.append(f'{chrom}:{int(start) + 1}-{end}')
    return intervals

cpg_flow.utils.make_job_name

make_job_name(
    name,
    sequencing_group=None,
    participant_id=None,
    dataset=None,
    part=None,
)

Extend the descriptive job name to reflect job attributes.

Source code in src/cpg_flow/utils.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def make_job_name(
    name: str,
    sequencing_group: str | None = None,
    participant_id: str | None = None,
    dataset: str | None = None,
    part: str | None = None,
) -> str:
    """
    Extend the descriptive job name to reflect job attributes.
    """
    if sequencing_group and participant_id:
        sequencing_group = f'{sequencing_group}/{participant_id}'
    if sequencing_group and dataset:
        name = f'{dataset}/{sequencing_group}: {name}'
    elif dataset:
        name = f'{dataset}: {name}'
    if part:
        name += f', {part}'
    return name

cpg_flow.utils.hash_from_list_of_strings

hash_from_list_of_strings(
    string_list, hash_length=10, suffix=None
)

Create a hash from a list of strings Args: string_list (): hash_length (int): how many characters to use from the hash suffix (str): optional, clarify the type of value which was hashed Returns:

Source code in src/cpg_flow/utils.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def hash_from_list_of_strings(string_list: list[str], hash_length: int = 10, suffix: str | None = None) -> str:
    """
    Create a hash from a list of strings
    Args:
        string_list ():
        hash_length (int): how many characters to use from the hash
        suffix (str): optional, clarify the type of value which was hashed
    Returns:
    """
    hash_portion = hashlib.sha256(' '.join(string_list).encode()).hexdigest()[:hash_length]
    full_hash = f'{hash_portion}_{len(string_list)}'

    if suffix:
        full_hash += f'_{suffix}'
    return full_hash

cpg_flow.utils.write_to_gcs_bucket

write_to_gcs_bucket(contents, path)
Source code in src/cpg_flow/utils.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def write_to_gcs_bucket(contents, path: Path):
    client = storage.Client()

    if not str(path).startswith('gs:/'):
        raise ValueError(f'Path {path} must be a GCS path')

    new_path = str(path).removeprefix('gs:/').removeprefix('/')
    bucket_name, blob_name = new_path.split('/', 1)

    bucket = client.bucket(bucket_name)
    if not bucket.exists():
        raise ValueError(f'Bucket {bucket_name} does not exist')

    blob = bucket.blob(blob_name)
    blob.upload_from_string(contents)

    return bucket_name, blob_name