TileDB-VCF Tutorial

This notebook will cover how to:

1. Setup

Packages

import os
import tiledb
import tiledb.cloud
import tiledbvcf
import numpy as np

print(
    f"tiledb v{tiledb.version.version}\n"
    f"numpy v{np.__version__}\n"
    # f"tiledb-vcf v{tiledbvcf.version}\n"
    f"tiledb-cloud v{tiledb.cloud.version.version}\n"
)
tiledb v0.21.3
numpy v1.21.6
tiledb-cloud v0.10.0

Setup TileDB’s virtual file system.

vfs = tiledb.VFS(config=tiledb.Config())

2. Storing VCF Data with TileDB

Specify a location for the new array.

array_uri = "/tmp/demo-arraylocal"

Have we run this notebook before? If so, delete the existing array to start fresh.

if (vfs.is_dir(array_uri)):
    print(f"Deleting existing array '{array_uri}'")
    vfs.remove_dir(array_uri)
    print("Done.")
Deleting existing array '/tmp/demo-arraylocal'
Done.

Example VCF Files

TileDB-VCF inherits TileDB’s native support for working with remote object stores, so we can ingest samples directly from remote services like AWS S3, Google Cloud, and Azure. Here, we’ll use single sample BCF files with chromosome 1 data for phase 3 samples from the 1KG project stored in a publicly readable S3 bucket.

vcf_bucket = "s3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1"

For demonstration purposes we’ll ingest individual batches of samples one at a time. To do this, we’ll create a list of S3 URIs pointing to 5 sample BCF files.

batch1_samples = ["HG00096.bcf", "HG00097.bcf", "HG00099.bcf", "HG00100.bcf", "HG00101.bcf"]
batch1_uris = [f"{vcf_bucket}/{s}" for s in batch1_samples]
batch1_uris
['s3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00096.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00097.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00099.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00100.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00101.bcf']

2. Create and Populate a TileDB-VCF Dataset

Create a New Dataset

Before we can ingest VCF data into TileDB we must first create our new dataset. First, we’ll create a TileDB-VCF Dataset instance that opens a connection to our desired array_uri in write mode.

ds = tiledbvcf.Dataset(uri=array_uri, mode="w")
ds
<tiledbvcf.dataset.Dataset at 0x7efc1c203350>

Next, we’ll create the empty TileDB-VCF dataset.

Note: We can optionally pass a VCF file to the vcf_attrs argument to automatically materialize all of the INFO and FORMAT fields as separate attributes in the array (rather than htslib-encoded blobs), which can improve query performance.

ds.create_dataset(vcf_attrs=batch1_uris[0], enable_allele_count=True, enable_variant_stats=True)

# verify the array exists
os.listdir(array_uri)
['allele_count',
 'metadata',
 '__meta',
 'variant_stats',
 '__group',
 '__tiledb_group.tdb',
 'data']

Ingest Initial Batch of 5 VCF Files

With the empty TileDB-VCF dataset created we can now ingest our first batch of samples.

%%time
ds.ingest_samples(sample_uris = batch1_uris)
CPU times: user 10.2 s, sys: 1.55 s, total: 11.8 s
Wall time: 44.8 s

This should take approximately 30s when ingesting from S3. Note most of this time is spent streaming the data, ingesting local copies of these files takes about 12s.

We can verify the samples were ingested by listing the sample IDs contained within the new array:

ds = tiledbvcf.Dataset(array_uri, mode = "r")
ds.samples()
['HG00096', 'HG00097', 'HG00099', 'HG00100', 'HG00101']

Ingest a Second Batch of 5 VCF Files

Update the existing TileDB-VCF dataset with a second batch of samples.

batch2_samples = ["HG00102.bcf", "HG00103.bcf", "HG00105.bcf", "HG00106.bcf", "HG00107.bcf"]
batch2_uris = [f"{vcf_bucket}/{s}" for s in batch2_samples]
batch2_uris
['s3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00102.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00103.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00105.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00106.bcf',
 's3://tiledb-inc-demo-data/examples/notebooks/vcfs/1kgp3-chr1/HG00107.bcf']

The process is identical: we need to reopen the Dataset in write mode and then pass another list of VCF URIs to ingest_samples().

%%time 
ds = tiledbvcf.Dataset(uri=array_uri, mode="w") #Incremental update to the array, previous data is not touched 
ds.ingest_samples(sample_uris = batch2_uris)
CPU times: user 10.2 s, sys: 1.57 s, total: 11.8 s
Wall time: 46.9 s
ds = tiledbvcf.Dataset(array_uri, mode = "r") #Open array in read mode, and print out the samples we just ingested to verify
ds.samples()
['HG00096',
 'HG00097',
 'HG00099',
 'HG00100',
 'HG00101',
 'HG00102',
 'HG00103',
 'HG00105',
 'HG00106',
 'HG00107']

Key takeaways: - new samples are easily added to existing datasets - incremental updates are extremely efficient

3. Basic TileDB-VCF Query

Open the array in read mode, and display attributes

%%time
regions = ["1:11106535-11262551"]
CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 9.54 µs

Query for all variants located within MTOR1 for a particular sample and return the results as a Pandas.Dataframe, which each row is a variant indexed by the chromosome, chromosome position, and sample.

%%time
df = ds.read(
    regions = regions,
    samples = ["HG00096"],
    attrs = ["sample_name", "contig", "pos_start", "pos_end", "fmt_GT"],
)
df
CPU times: user 19.9 ms, sys: 52.3 ms, total: 72.2 ms
Wall time: 61.2 ms
sample_name contig pos_start pos_end fmt_GT
0 HG00096 1 11107439 11107439 [1, 1]
1 HG00096 1 11111976 11111979 [0, 1]
2 HG00096 1 11112836 11112836 [0, 1]
3 HG00096 1 11115299 11115299 [0, 1]
4 HG00096 1 11117536 11117536 [0, 1]
... ... ... ... ... ...
119 HG00096 1 11254006 11254007 [0, 1]
120 HG00096 1 11260848 11260848 [0, 1]
121 HG00096 1 11261929 11261929 [0, 1]
122 HG00096 1 11262020 11262020 [0, 1]
123 HG00096 1 11262310 11262310 [0, 1]

124 rows × 5 columns

4. Exporting to (g)VCF

Here we’re running the same query for an individual sample but exporting the results to a new VCF file.

ds.export(
    regions = ['1:10177-24924'],
    samples = ['HG00101'],
    output_format = 'v',
    output_dir = '/tmp/'
)

We can bcftools to examine the newly generated VCF.

!bcftools view --no-header /tmp/HG00101.vcf | head -10
1   10352   rs555500075 T   TA  100 PASS    END=10352;DP=88915;AN=2;VT=INDEL;AC=1;AA=|||unknown(NO_COVERAGE)    GT  1|0
1   10616   rs376342519 CCGCCGTTGCAAAGGCGCGCCG  C   100 PASS    END=10637;DP=2365;AN=2;VT=INDEL;AC=2    GT  1|1
1   13116   rs62635286  T   G   100 PASS    END=13116;DP=22340;AN=2;VT=SNP;AC=1;AA=t||| GT  1|0
1   13118   rs200579949 A   G   100 PASS    END=13118;DP=21395;AN=2;VT=SNP;AC=1;AA=a||| GT  1|0
1   14930   rs75454623  A   G   100 PASS    END=14930;DP=42231;AN=2;VT=SNP;AC=1;AA=a||| GT  1|0
1   15211   rs78601809  T   G   100 PASS    END=15211;DP=32245;AN=2;VT=SNP;AC=1;AA=t||| GT  1|0
1   15274   rs62636497  A   G,T 100 PASS    END=15274;MULTI_ALLELIC=SNP;DP=23255;AN=2;VT=SNP;AC=1,1;AA=g||| GT  1|2
1   15903   rs557514207 G   GC  100 PASS    END=15903;DP=7012;EX_TARGET=INDE;AN=2;VT=INDEL;AC=1;AA=ccc|CC|CCC|deletion  GT  0|1
1   16949   rs199745162 A   C   100 PASS    END=16949;DP=41674;EX_TARGET=SNP;AN=2;VT=SNP;AC=1;AA=a|||   GT  0|1
1   18849   rs533090414 C   G   100 PASS    END=18849;DP=4700;AN=2;VT=SNP;AC=2;AA=g|||  GT  1|1

Now let’s modify the query to again include multiple samples and then export the results as a multi-sample VCF file.

%%time
ds.export(
    regions = ['1:10177-24924'],
    samples = ds.samples()[0:5],
    merge = True,
    output_format = 'v',
    output_path = '/tmp/combined.vcf',
)
CPU times: user 32.4 ms, sys: 84.5 ms, total: 117 ms
Wall time: 110 ms

Again, we can use bcftools to examine it.

!bcftools view --no-header /tmp/combined.vcf | head -10
1   10177   rs367896724 A   AC  100 PASS    AC=4;VT=INDEL;AN=8;DP=412608;AA=|||unknown(NO_COVERAGE);END=10177   GT  1|0 0|1 0|1 1|0 ./.
1   10352   rs555500075 T   TA  100 PASS    AC=5;VT=INDEL;AN=10;DP=444575;AA=|||unknown(NO_COVERAGE);END=10352  GT  1|0 1|0 0|1 0|1 1|0
1   10616   rs376342519 CCGCCGTTGCAAAGGCGCGCCG  C   100 PASS    AC=10;VT=INDEL;AN=10;DP=11825;END=10637 GT  1|1 1|1 1|1 1|1 1|1
1   13110   rs540538026 G   A   100 PASS    AC=1;VT=SNP;AN=2;DP=23422;AA=g|||;END=13110 GT  ./. 1|0 ./. ./. ./.
1   13116   rs62635286  T   G   100 PASS    AC=2;VT=SNP;AN=4;DP=44680;AA=t|||;END=13116 GT  ./. 1|0 ./. ./. 1|0
1   13118   rs200579949 A   G   100 PASS    AC=2;VT=SNP;AN=4;DP=42790;AA=a|||;END=13118 GT  ./. 1|0 ./. ./. 1|0
1   14464   rs546169444 A   T   100 PASS    AC=3;VT=SNP;AN=4;DP=53522;AA=a|||;END=14464 GT  1|1 ./. 1|0 ./. ./.
1   14599   rs531646671 T   A   100 PASS    AC=2;VT=SNP;AN=4;DP=64162;AA=t|||;END=14599 GT  ./. 0|1 1|0 ./. ./.
1   14604   rs541940975 A   G   100 PASS    AC=2;VT=SNP;AN=4;DP=58462;AA=a|||;END=14604 GT  ./. 0|1 1|0 ./. ./.
1   14930   rs75454623  A   G   100 PASS    AC=5;VT=SNP;AN=10;DP=211155;AA=a|||;END=14930   GT  1|0 0|1 0|1 1|0 1|0

Key take aways: - slice efficiently by genomic region and/or sample - selectively retrieve specific VCF attributes - queries are returned as pyarrow Tables or pandas DataFrames - query and export results to single and combined VCFs

5. Partitioned Queries

Large queries can be split into partitions of regions, samples, or both by setting the appropriate partitioning parameter, which consists of 2 integers: 1. The partition index 2. The number of partitions

Query Sample Partition 1

Open the array in read mode with the sample_partition parameter set to query the 1st of 2 sample partitions.

cfg = tiledbvcf.ReadConfig(sample_partition=(0, 2))
ds = tiledbvcf.Dataset(array_uri, mode="r", cfg=cfg)
%%time

df = ds.read(
    attrs = ["sample_name", "contig", "pos_start", "pos_end", "alleles", "fmt_GT"],
    regions = ["1:43337848-43352772"],
)

df
CPU times: user 8.24 ms, sys: 72 ms, total: 80.2 ms
Wall time: 67.1 ms
sample_name contig pos_start pos_end alleles fmt_GT
0 HG00096 1 43337897 43337897 [A, G] [1, 0]
1 HG00096 1 43339092 43339092 [C, T] [1, 1]
2 HG00097 1 43339092 43339092 [C, T] [1, 1]
3 HG00099 1 43339092 43339092 [C, T] [1, 1]
4 HG00100 1 43339092 43339092 [C, T] [1, 1]
... ... ... ... ... ... ...
105 HG00099 1 43352685 43352685 [A, G] [1, 1]
106 HG00100 1 43352685 43352685 [A, G] [1, 1]
107 HG00101 1 43352685 43352685 [A, G] [1, 1]
108 HG00099 1 43352723 43352723 [C, G] [1, 0]
109 HG00100 1 43352723 43352723 [C, G] [1, 0]

110 rows × 6 columns

Query Sample Partition 2

Re-open the array with the sample_partition parameter set to query the 2nd sample partition (i.e., samples 6–10).

cfg = tiledbvcf.ReadConfig(sample_partition=(1, 2)) #This will return the second sub-set of the samples 
ds = tiledbvcf.Dataset(array_uri, mode="r", cfg=cfg)
%%time

df = ds.read(
    attrs = ["sample_name", "contig", "pos_start", "pos_end", "alleles", "fmt_GT"],
    regions = ["1:43337848-43352772"],
)
df
CPU times: user 14.2 ms, sys: 66.8 ms, total: 81 ms
Wall time: 67.7 ms
sample_name contig pos_start pos_end alleles fmt_GT
0 HG00102 1 43337978 43337978 [T, C] [0, 1]
1 HG00102 1 43339092 43339092 [C, T] [1, 1]
2 HG00103 1 43339092 43339092 [C, T] [1, 1]
3 HG00105 1 43339092 43339092 [C, T] [1, 1]
4 HG00106 1 43339092 43339092 [C, T] [1, 1]
... ... ... ... ... ... ...
112 HG00106 1 43352685 43352685 [A, G] [1, 1]
113 HG00107 1 43352685 43352685 [A, G] [1, 1]
114 HG00102 1 43352723 43352723 [C, G] [1, 0]
115 HG00103 1 43352723 43352723 [C, G] [1, 0]
116 HG00107 1 43352723 43352723 [C, G] [0, 1]

117 rows × 6 columns

Key takeaways: - TileDB support fast, parallel slicing - TileDB-VCF provides a convenient API to to partition queries across sample/genomic dimensions

6. Parallel Queries with TileDB

Setup

Import the Delayed module from tiledb.cloud. This is allows us to convert a normal python function into one that can be Delayed and serverlessly executed.

from tiledb.cloud.compute import Delayed

Run a Basic Query Serverlessly

Create UDF that wraps tiledbvcf.read().

def vcf_query(uri, attrs, regions, samples=None, sample_partition=(0, 1)):
    import tiledbvcf
    import pyarrow
    cfg = tiledbvcf.ReadConfig(sample_partition=sample_partition)
    vcf_ds = tiledbvcf.Dataset(uri, mode="r", cfg=cfg)
    
    results = [vcf_ds.read_arrow(attrs=attrs, samples=samples, regions=regions)]
    while not vcf_ds.read_completed():
        results.append(vcf_ds.continue_read_arrow())

    return pyarrow.concat_tables(results, promote=False)

Create a delayed instance of this UDF and specify the parameters we want to run it with.

array_uri = "tiledb://TileDB-Inc/vcf-1kg-nygc"

query_attrs = ["sample_name", "contig", "pos_start", "pos_end", "fmt_GT"]
query_samples = ["HG00096", "HG00097", "HG00099", "HG00100", "HG00101", "HG00102", "HG00103", "HG00105", "HG00106", "HG00107", "HG00108", "HG00109", "HG00110", "HG00111", "HG00112", "HG00113", "HG00114", "HG00115", "HG00116", "HG00117", "HG00118", "HG00119", "HG00120", "HG00121", "HG00122", "HG00123", "HG00125", "HG00126", "HG00127", "HG00128", "HG00129", "HG00130", "HG00131", "HG00132", "HG00133", "HG00136", "HG00137", "HG00138", "HG00139", "HG00140", "HG00141", "HG00142", "HG00143", "HG00145", "HG00146", "HG00148", "HG00149", "HG00150", "HG00151", "HG00154", "HG00155", "HG00157", "HG00158", "HG00159", "HG00160", "HG00171", "HG00173", "HG00174", "HG00176", "HG00177", "HG00178", "HG00179", "HG00180", "HG00181", "HG00182", "HG00183", "HG00185", "HG00186", "HG00187", "HG00188", "HG00189", "HG00190", "HG00231", "HG00232", "HG00233", "HG00234", "HG00235", "HG00236", "HG00237", "HG00238", "HG00239", "HG00240", "HG00242", "HG00243", "HG00244", "HG00245", "HG00246", "HG00250", "HG00251", "HG00252", "HG00253", "HG00254", "HG00255", "HG00256", "HG00257", "HG00258", "HG00259", "HG00260", "HG00261", "HG00262"]
query_regions = ["chr1:43337848-43352772"]

delayed_read = Delayed(
        func_exec=vcf_query,
        result_format=tiledb.cloud.UDFResultType.ARROW,
    )(
        uri=array_uri, 
        attrs=query_attrs, 
        regions=query_regions,
        samples=query_samples[:10],
)
delayed_read.visualize()

Call compute() to serverlessly execute the function.

%%time
res1 = delayed_read.compute()
res1.to_pandas()
CPU times: user 15.1 ms, sys: 3.82 ms, total: 19 ms
Wall time: 8.21 s
sample_name contig pos_start pos_end fmt_GT
0 HG00099 chr1 43337743 43338411 [0, 0]
1 HG00106 chr1 43337749 43338055 [0, 0]
2 HG00102 chr1 43337756 43338088 [0, 0]
3 HG00097 chr1 43337791 43338053 [0, 0]
4 HG00100 chr1 43337812 43337850 [0, 0]
... ... ... ... ... ...
22442 HG00105 chr1 43352768 43352779 [0, 0]
22443 HG00106 chr1 43352769 43352769 [0, 0]
22444 HG00106 chr1 43352770 43352791 [0, 0]
22445 HG00100 chr1 43352771 43352771 [0, 0]
22446 HG00100 chr1 43352772 43352781 [0, 0]

22447 rows × 5 columns

Key takeaways: - TileDB-VCF Datasets can be sliced directly from remote object stores - TileDB provides a flexible platform for building serverless data processing pipelines

Run Multiple Batches of Queries in Parallel

Add another UDF to combine the outputs into a single result.

def combine_results(dfs): #Taking resluts and combining them into one table
    import pyarrow as pa
    out = pa.concat_tables([x for x in dfs if x is not None])
    return out

Assemble and visualize the task graph.

nparts = 10
delayed_results = []

for i in range(nparts): #splitting up 100 samples into 10 partitions, that will run in parallel, only thing that changes is the sample index
    delayed_results.append(
        Delayed(
            func_exec=vcf_query, 
            result_format=tiledb.cloud.UDFResultType.ARROW,
        )(
            uri=array_uri, 
            attrs=query_attrs,
            regions=query_regions,
            samples=query_samples,
            sample_partition=(i, nparts),
        )
    )
delayed_results = Delayed(combine_results, local=True)(delayed_results) #Taking all the results and combining them into a single table, via the `combine_results` function

delayed_results.visualize()
%%time
res2 = delayed_results.compute()
res2.to_pandas()
CPU times: user 112 ms, sys: 26.7 ms, total: 139 ms
Wall time: 9.29 s
sample_name contig pos_start pos_end fmt_GT
0 HG00099 chr1 43337743 43338411 [0, 0]
1 HG00106 chr1 43337749 43338055 [0, 0]
2 HG00102 chr1 43337756 43338088 [0, 0]
3 HG00097 chr1 43337791 43338053 [0, 0]
4 HG00100 chr1 43337812 43337850 [0, 0]
... ... ... ... ... ...
216254 HG00260 chr1 43352762 43352785 [0, 0]
216255 HG00262 chr1 43352762 43352766 [0, 0]
216256 HG00261 chr1 43352766 43352769 [0, 0]
216257 HG00262 chr1 43352767 43352775 [0, 0]
216258 HG00261 chr1 43352770 43352779 [0, 0]

216259 rows × 5 columns

Key takeaways: - TileDB’s serverless infrastructure makes it easy to setup and run large scale distributed queries