Skip to content

Resources

The following resources are available for use:

cpg_flow.resources.gcp_machine_name

gcp_machine_name(name, ncpu)

Machine type name in the GCP world

Source code in src/cpg_flow/resources.py
17
18
19
20
21
22
23
def gcp_machine_name(name: str, ncpu: int) -> str:
    """
    Machine type name in the GCP world
    """
    assert name in ["standard", "highmem", "highcpu"], name
    assert _is_power_of_two(ncpu), ncpu
    return f"n1-{name}-{ncpu}"

cpg_flow.resources.MachineType dataclass

MachineType(
    name,
    ncpu,
    mem_gb_per_core,
    price_per_hour,
    disk_size_gb,
)

Hail Batch machine type on GCP

Source code in src/cpg_flow/resources.py
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    name: str,
    ncpu: int,
    mem_gb_per_core: float,
    price_per_hour: float,
    disk_size_gb: int,
):
    self.name = name
    self.max_ncpu = ncpu
    self.mem_gb_per_core = mem_gb_per_core
    self.price_per_hour = price_per_hour
    self.disk_size_gb = disk_size_gb

max_threads

max_threads()

Number of available threads

Source code in src/cpg_flow/resources.py
51
52
53
54
55
def max_threads(self) -> int:
    """
    Number of available threads
    """
    return self.max_ncpu * self.threads_on_cpu

calc_instance_disk_gb

calc_instance_disk_gb()

The maximum available storage on an instance is calculated in batch/batch/utils.py/unreserved_worker_data_disk_size_gib() as the disk size (375G) minus reserved image size (30G) minus reserved storage per core (5G*ncpu = 120G for a 32-core instance),

Source code in src/cpg_flow/resources.py
57
58
59
60
61
62
63
64
65
66
def calc_instance_disk_gb(self) -> int:
    """
    The maximum available storage on an instance is calculated
    in `batch/batch/utils.py/unreserved_worker_data_disk_size_gib()`
    as the disk size (375G) minus reserved image size (30G) minus
    reserved storage per core (5G*ncpu = 120G for a 32-core instance),
    """
    reserved_gb = 30
    reserved_gb_per_core = 5
    return self.disk_size_gb - reserved_gb - reserved_gb_per_core * self.max_ncpu

set_resources

set_resources(
    j,
    fraction=None,
    ncpu=None,
    nthreads=None,
    mem_gb=None,
    storage_gb=None,
)

Set resources to a Job object. If any optional parameters are set, they will be used as a bound to request a fraction of an instance.

Source code in src/cpg_flow/resources.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def set_resources(
    self,
    j: Job,
    fraction: float | None = None,
    ncpu: int | None = None,
    nthreads: int | None = None,
    mem_gb: float | None = None,
    storage_gb: float | None = None,
) -> "JobResource":
    """
    Set resources to a Job object. If any optional parameters are set,
    they will be used as a bound to request a fraction of an instance.
    """
    return self.request_resources(
        fraction=fraction,
        ncpu=ncpu,
        nthreads=nthreads,
        mem_gb=mem_gb,
        storage_gb=storage_gb,
    ).set_to_job(j)

request_resources

request_resources(
    fraction=None,
    ncpu=None,
    nthreads=None,
    mem_gb=None,
    storage_gb=None,
)

Request resources from the machine, satisfying all provided requirements. If not requirements are provided, the minimal amount of cores (self.MIN_NCPU) will be used.

Source code in src/cpg_flow/resources.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def request_resources(
    self,
    fraction: float | None = None,
    ncpu: int | None = None,
    nthreads: int | None = None,
    mem_gb: float | None = None,
    storage_gb: float | None = None,
) -> "JobResource":
    """
    Request resources from the machine, satisfying all provided requirements.
    If not requirements are provided, the minimal amount of cores
    (self.MIN_NCPU) will be used.
    """
    # determining the biggest limit to satisfy, measured in the number of CPUs:
    min_ncpu = max(
        filter(
            None,
            [
                self.adjust_ncpu(ncpu or self.min_cpu),
                self.fraction_to_ncpu(fraction) if fraction else None,
                self.nthreads_to_ncpu(nthreads) if nthreads else None,
                self.mem_gb_to_ncpu(mem_gb) if mem_gb else None,
                self.storage_gb_to_ncpu(storage_gb) if storage_gb else None,
            ],
        ),
    )
    return JobResource(
        machine_type=self,
        ncpu=min_ncpu,
        attach_disk_storage_gb=(
            storage_gb
            if storage_gb and storage_gb > self.calc_instance_disk_gb()
            else None
        ),
    )

fraction_to_ncpu

fraction_to_ncpu(fraction)

Converts fraction to the number of CPU (e.g. fraction=1.0 to take the entire machine, fraction=0.5 to take half of it, etc.).

Source code in src/cpg_flow/resources.py
125
126
127
128
129
130
131
def fraction_to_ncpu(self, fraction: float) -> int:
    """
    Converts fraction to the number of CPU (e.g. fraction=1.0 to take the entire
    machine, fraction=0.5 to take half of it, etc.).
    """
    ncpu = int(math.ceil(self.max_ncpu * fraction))
    return self.adjust_ncpu(ncpu)

mem_gb_to_ncpu

mem_gb_to_ncpu(mem_gb)

Converts memory requirement to the number of CPU requirement.

Source code in src/cpg_flow/resources.py
133
134
135
136
137
138
def mem_gb_to_ncpu(self, mem_gb: float) -> int:
    """
    Converts memory requirement to the number of CPU requirement.
    """
    ncpu = int(math.ceil(mem_gb / self.mem_gb_per_core))
    return self.adjust_ncpu(ncpu)

storage_gb_to_ncpu

storage_gb_to_ncpu(storage_gb)

Converts storage requirement to the number of CPU requirement.

We want to avoid attaching disks: attaching a disk to an existing instance might fail with mkfs.ext4 ... error, see: https://batch.hail.populationgenomics.org.au/batches/7488/jobs/12 So this function will calculate the number of CPU to request so your jobs can be packed to fit the default instance's available storage (calculated with self.calc_instance_disk_gb()).

Source code in src/cpg_flow/resources.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def storage_gb_to_ncpu(self, storage_gb: float) -> int:
    """
    Converts storage requirement to the number of CPU requirement.

    We want to avoid attaching disks: attaching a disk to an existing instance
    might fail with `mkfs.ext4 ...` error, see:
    https://batch.hail.populationgenomics.org.au/batches/7488/jobs/12
    So this function will calculate the number of CPU to request so your jobs
    can be packed to fit the default instance's available storage
    (calculated with self.calc_instance_disk_gb()).
    """
    fraction = storage_gb / self.calc_instance_disk_gb()
    fraction = min(fraction, 1.0)
    return self.fraction_to_ncpu(fraction)

nthreads_to_ncpu

nthreads_to_ncpu(nthreads)

Convert number of threads into number of cores/CPU

Source code in src/cpg_flow/resources.py
155
156
157
158
159
def nthreads_to_ncpu(self, nthreads: int) -> int:
    """
    Convert number of threads into number of cores/CPU
    """
    return self.adjust_ncpu(math.ceil(nthreads / 2))

adjust_ncpu

adjust_ncpu(ncpu)

Adjust request number of CPU to a number allowed by Hail, i.e. the nearest power of 2, not less than the minimal number of cores allowed.

Source code in src/cpg_flow/resources.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def adjust_ncpu(self, ncpu: int) -> int:
    """
    Adjust request number of CPU to a number allowed by Hail, i.e.
    the nearest power of 2, not less than the minimal number of cores allowed.
    """
    if ncpu > self.max_ncpu:
        raise ValueError(
            f"Requesting more cores than available on {self.name} machine: {ncpu}>{self.max_ncpu}",
        )

    ncpu = max(ncpu, MachineType.min_cpu)

    # round to the nearest power of 2 (15 -> 16, 16 -> 16, 17 -> 32)
    return int(pow(2, math.ceil(math.log2(ncpu))))

cpg_flow.resources.STANDARD module-attribute

STANDARD = MachineType(
    "standard",
    ncpu=16,
    mem_gb_per_core=3.75,
    price_per_hour=1.0787,
    disk_size_gb=375,
)

cpg_flow.resources.HIGHMEM module-attribute

HIGHMEM = MachineType(
    "highmem",
    ncpu=16,
    mem_gb_per_core=6.5,
    price_per_hour=1.3431,
    disk_size_gb=375,
)

cpg_flow.resources.JobResource dataclass

JobResource(
    machine_type, ncpu=None, attach_disk_storage_gb=None
)

Represents a fraction of a Hail Batch instance.

@param machine_type: Hail Batch machine pool type @param ncpu: number of CPU request. Will be used to calculate the fraction of the machine to take. If not set, all machine's CPUs will be used. @param attach_disk_storage_gb: if set to > MachineType.max_default_storage_gb, a larger disc will be attached by Hail Batch.

Source code in src/cpg_flow/resources.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def __init__(
    self,
    machine_type: MachineType,
    ncpu: int | None = None,
    attach_disk_storage_gb: float | None = None,
):
    """
    @param machine_type: Hail Batch machine pool type
    @param ncpu: number of CPU request. Will be used to calculate the fraction of
        the machine to take. If not set, all machine's CPUs will be used.
    @param attach_disk_storage_gb: if set to > MachineType.max_default_storage_gb,
        a larger disc will be attached by Hail Batch.
    """
    self.machine_type = machine_type

    self.fraction_of_full: float = 1.0
    if ncpu is not None:
        if ncpu > self.machine_type.max_ncpu:
            raise ValueError(
                f"Max number of CPU on machine {self.machine_type.name} "
                f"is {self.machine_type.max_ncpu}, requested {ncpu}",
            )
        self.fraction_of_full = ncpu / self.machine_type.max_ncpu

    self.attach_disk_storage_gb = None
    if attach_disk_storage_gb is not None:
        if self.fraction_of_full < 1:
            raise ValueError(
                f"Storage can be overridden only when the entire machine is used, "
                f"not a fraction ({self.fraction_of_full}). "
                f"override_storage_gb={attach_disk_storage_gb}",
            )
        self.attach_disk_storage_gb = attach_disk_storage_gb

attach_disk_storage_gb instance-attribute

attach_disk_storage_gb = None

get_mem_gb

get_mem_gb()

Memory resources in GB

Source code in src/cpg_flow/resources.py
241
242
243
244
245
def get_mem_gb(self) -> float:
    """
    Memory resources in GB
    """
    return self.get_ncpu() * self.machine_type.mem_gb_per_core

java_mem_options

java_mem_options(overhead_gb=1)

Returns -Xms -Xmx options to set Java JVM memory usage to use all the memory resources represented. @param overhead_gb: Amount of memory (in decimal GB) to leave available for other purposes.

Source code in src/cpg_flow/resources.py
247
248
249
250
251
252
253
254
255
256
257
258
def java_mem_options(self, overhead_gb: float = 1) -> str:
    """
    Returns -Xms -Xmx options to set Java JVM memory usage to use all the memory
    resources represented.
    @param overhead_gb: Amount of memory (in decimal GB) to leave available for
    other purposes.
    """
    mem_bytes = (self.get_mem_gb() - overhead_gb) * 1_000_000_000
    # Approximate as binary MiB (but not GiB as these options don't support
    # fractional values) so that logs are easier to read
    mem_mib = math.floor(mem_bytes / 1_048_576)
    return f"-Xms{mem_mib}M -Xmx{mem_mib}M"

java_gc_thread_options

java_gc_thread_options(surplus=2)

Returns -XX options to set Java JVM garbage collection threading. @param surplus: Number of threads to leave available for other purposes.

Source code in src/cpg_flow/resources.py
260
261
262
263
264
265
266
def java_gc_thread_options(self, surplus: int = 2) -> str:
    """
    Returns -XX options to set Java JVM garbage collection threading.
    @param surplus: Number of threads to leave available for other purposes.
    """
    gc_threads = self.get_nthreads() - surplus
    return f"-XX:+UseParallelGC -XX:ParallelGCThreads={gc_threads}"

get_ncpu

get_ncpu()

Number of cores/CPU

Source code in src/cpg_flow/resources.py
268
269
270
271
272
def get_ncpu(self) -> int:
    """
    Number of cores/CPU
    """
    return int(self.machine_type.max_ncpu * self.fraction_of_full)

get_nthreads

get_nthreads()

Number of threads

Source code in src/cpg_flow/resources.py
274
275
276
277
278
def get_nthreads(self) -> int:
    """
    Number of threads
    """
    return self.get_ncpu() * MachineType.threads_on_cpu

get_storage_gb

get_storage_gb()

Calculate storage in GB

Source code in src/cpg_flow/resources.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def get_storage_gb(self) -> float:
    """
    Calculate storage in GB
    """
    if self.attach_disk_storage_gb:
        storage_gb = self.attach_disk_storage_gb
    else:
        storage_gb = (
            self.machine_type.calc_instance_disk_gb() * self.fraction_of_full
        )

    # Hail Batch actually requests 5% lower number than the
    # requested one (e.g. "req_storage: 46.25G, actual_storage: 44.0 GiB"),
    # so we will ask for a bigger number.
    return storage_gb * 1.05

set_to_job

set_to_job(j)

Set the resources to a Job object. Return self to allow chaining, e.g.:

nthreads = STANDARD.request_resources(nthreads=4).set_to_job(j).get_nthreads()

Source code in src/cpg_flow/resources.py
296
297
298
299
300
301
302
303
304
305
306
307
def set_to_job(self, j: Job) -> "JobResource":
    """
    Set the resources to a Job object. Return self to allow chaining, e.g.:
    >>> nthreads = STANDARD.request_resources(nthreads=4).set_to_job(j).get_nthreads()
    """

    j.storage(f"{self.get_storage_gb()}G")
    j.cpu(self.get_ncpu())
    j.memory(f"{self.get_mem_gb()}G")

    # Returning self to allow command chaining.
    return self

cpg_flow.resources.storage_for_cram_qc_job

storage_for_cram_qc_job()

Get storage request for a CRAM QC processing job, gb

Source code in src/cpg_flow/resources.py
310
311
312
313
314
315
316
317
318
319
320
def storage_for_cram_qc_job() -> int | None:
    """
    Get storage request for a CRAM QC processing job, gb
    """
    sequencing_type = get_config()["workflow"]["sequencing_type"]
    storage_gb = None  # avoid extra disk by default
    if sequencing_type == "genome":
        storage_gb = 100
    if sequencing_type == "exome":
        storage_gb = 20
    return storage_gb

cpg_flow.resources.joint_calling_scatter_count

joint_calling_scatter_count(sequencing_group_count)

Number of partitions for joint-calling jobs (GenotypeGVCFs, VQSR, VEP), as a function of the sequencing group number.

Source code in src/cpg_flow/resources.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def joint_calling_scatter_count(sequencing_group_count: int) -> int:
    """
    Number of partitions for joint-calling jobs (GenotypeGVCFs, VQSR, VEP),
    as a function of the sequencing group number.
    """
    if scatter_count := get_config()["workflow"].get("scatter_count"):
        return scatter_count

    # Estimating this is challenging because GenotypeGVCFs does not scale
    # linearly with the number of genomes.
    # Values are adjusted based on experience with the actual number of genomes.
    # e.g. 1000 scatter count was too low for 3800 genomes.
    for threshold, scatter_count in {
        4000: 1400,
        3500: 1200,
        3000: 1000,
        2000: 600,
        1000: 400,
        500: 200,
        250: 100,
    }.items():
        if sequencing_group_count >= threshold:
            return scatter_count
    return 50

cpg_flow.resources.storage_for_joint_vcf

storage_for_joint_vcf(
    sequencing_group_count, site_only=True
)

Storage enough to fit and process a joint-called VCF

Source code in src/cpg_flow/resources.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def storage_for_joint_vcf(
    sequencing_group_count: int | None,
    site_only: bool = True,
) -> float | None:
    """
    Storage enough to fit and process a joint-called VCF
    """
    if not sequencing_group_count:
        return None
    if get_config()["workflow"]["sequencing_type"] == "exome":
        gb_per_sequencing_group = 0.1
    else:
        gb_per_sequencing_group = 1.0
        if not site_only:
            gb_per_sequencing_group = 1.5

    return gb_per_sequencing_group * sequencing_group_count