Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions align_trim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,22 +746,74 @@ def read_pair_generator(bam, region_string=None):
del read_dict[qname]


def create_primer_lookup(ref_len_tuple, pools, amplicons: list[Amplicon], padding=35):
def create_primer_lookup(ref_len_tuple, amplicons: list[Amplicon], padding=35):
"""
Returns a dict of chroms, each containing a (max(pools), chrom_len) shaped array
Create a lookup table for efficient primer position queries across reference genomes.

Each chromosome gets its own 2D lookup array where:
- Rows represent non-overlapping "pools"* of amplicons at their corresponding positions.
- Columns represent genomic positions
- Values are Amplicon objects or None

The function automatically determines the minimum number of rows needed to ensure
no amplicons overlap within the same row when accounting for padding.

* Amplicons are placed in the first available row where they don't overlap, not their pool index.

Parameters
----------
ref_len_tuple : list[tuple[str, int]]
List of tuples containing (chromosome_name, chromosome_length) pairs
from the reference genome
amplicons : list[Amplicon]
List of Amplicon objects containing primer scheme information
padding : int, optional
Number of bases to extend amplicon boundaries on both sides to allow
for fuzzy matching of reads with barcodes/adapters (default: 35)

Returns
-------
dict[str, np.ndarray]
Dictionary mapping chromosome names to 2D numpy arrays of shape (N, chrom_len+1)
where N is the minimum number of rows needed to prevent amplicon overlap.
Array elements are either Amplicon objects or None.


"""
lookups = {}
for chrom, chromlen in ref_len_tuple:
a = np.empty_like(None, shape=(max(pools), chromlen + 1))
lookup_array = np.empty_like(None, shape=(1, chromlen + 1))
for amp in amplicons:
added = False
if amp.chrom == chrom:
a[
amp.ipool,
# If amplicon clashes with any in same pool add new row
amp_slice = lookup_array[
:,
max(amp.amplicon_start - padding, 0) : min(
amp.amplicon_end + padding, chromlen
),
] = amp
lookups[chrom] = a
]
for i, row in enumerate(amp_slice): # Check each row for collision
if row[row != None].size == 0:
lookup_array[
i,
max(amp.amplicon_start - padding, 0) : min(
amp.amplicon_end + padding, chromlen
),
] = amp
added = True
# If not added, create new row, add the amplicon to that then add back to original array
if not added:
new_row = np.empty_like(None, shape=(1, chromlen + 1))
new_row[
0,
max(amp.amplicon_start - padding, 0) : min(
amp.amplicon_end + padding, chromlen
),
] = amp
lookup_array = np.vstack((lookup_array, new_row))

lookups[chrom] = lookup_array
return lookups


Expand Down Expand Up @@ -878,7 +930,9 @@ def go(args):
# Create a lookup table for primer location
ref_lengths = [(r, infile.get_reference_length(r)) for r in infile.references]
primer_lookup = create_primer_lookup(
ref_len_tuple=ref_lengths, pools=pools, amplicons=amplicon_list, padding=35
ref_len_tuple=ref_lengths,
amplicons=amplicon_list,
padding=args.primer_match_threshold,
)

trimmed_segments = {x: {} for x in chroms}
Expand Down Expand Up @@ -1067,7 +1121,7 @@ def main():
"-p",
type=int,
default=35,
help="Fuzzy match primer positions within this threshold",
help="Add -p bases of padding to the outside (5' end of primer) of primer coordinates to allow fuzzy matching for reads with barcodes/adapters. (default: %(default)s)",
)
parser.add_argument(
"--report", "-r", type=Path, help="Output report TSV to filepath"
Expand Down
10 changes: 5 additions & 5 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_align_trim_trim_se_no_norm(self):
pools = set([bl.pool for bl in scheme.bedlines])

ref_lengths = [("MN908947.3", 29903)]
primer_lookup = create_primer_lookup(ref_lengths, pools, amplicons, 35)
primer_lookup = create_primer_lookup(ref_lengths, amplicons, 35)

# Check the out sam is as expected
for record in pysam.AlignmentFile(str(output_sam), "r"):
Expand Down Expand Up @@ -135,7 +135,7 @@ def test_align_trim_trim_se_norm(self):
pools = set([bl.pool for bl in scheme.bedlines])

ref_lengths = [("MN908947.3", 29903)]
primer_lookup = create_primer_lookup(ref_lengths, pools, amplicons, 35)
primer_lookup = create_primer_lookup(ref_lengths, amplicons, 35)

# Check the out sam is as expected
for record in pysam.AlignmentFile(str(output_sam), "r"):
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_align_trim_require_full_length(self):
pools = set([bl.pool for bl in scheme.bedlines])

ref_lengths = [("MN908947.3", 29903)]
primer_lookup = create_primer_lookup(ref_lengths, pools, amplicons, 35)
primer_lookup = create_primer_lookup(ref_lengths, amplicons, 35)

# Check the out sam is as expected
for record in pysam.AlignmentFile(str(output_sam), "r"):
Expand Down Expand Up @@ -275,7 +275,7 @@ def test_align_trim_se_no_trim(self):
pools = set([bl.pool for bl in scheme.bedlines])

ref_lengths = [("MN908947.3", 29903)]
primer_lookup = create_primer_lookup(ref_lengths, pools, amplicons, 35)
primer_lookup = create_primer_lookup(ref_lengths, amplicons, 35)

# Check the out sam is as expected
for record in pysam.AlignmentFile(str(output_sam), "r"):
Expand Down Expand Up @@ -420,7 +420,7 @@ def test_align_trim_paired_full_length(self):
pools = set([bl.pool for bl in scheme.bedlines])

ref_lengths = [("MN908947.3", 29903)]
primer_lookup = create_primer_lookup(ref_lengths, pools, amplicons, 35)
primer_lookup = create_primer_lookup(ref_lengths, amplicons, 35)

# Check the out sam is as expected
for segment1, segment2 in read_pair_generator(
Expand Down
134 changes: 89 additions & 45 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ def test_create_primer_lookup(self):
"""
Test that the primer lookup is created correctly
"""
# Create the primer lookup with 35 padding
# Create the primer lookup with 0 padding
padding = 0
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=self.amplicons,
pools=self.pools,
padding=padding,
)
# Check that the primer lookup is a dictionary
Expand All @@ -51,42 +50,38 @@ def test_create_primer_lookup(self):
# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(max(self.pools), 29903 + 1), # +1 for half open interval
(2, 29903 + 1), # +1 for half open interval
)

# Check each amplicon is present in the lookup and in correct pool
for amplicon in self.amplicons:
# Check amplicon spans correct region, and correct pool
self.assertEqual(
set(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start : amplicon.amplicon_end
]
),
set([amplicon]),
)
contents = []
for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_start - padding : amplicon.amplicon_end + padding
]:
contents.append(set(row))
self.assertIn(set([amplicon]), contents)

# Check padding is aligned correctly
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start - padding - 1
]
)
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_end + padding # -1 +1
]
)
for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_start - padding - 1
]:
self.assertIsNot(row, amplicon)

for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_end + padding # +1 -1
]:
self.assertIsNot(row, amplicon)

def test_create_primer_lookup_padding(self):
"""
Test that the primer lookup is created correctly
"""
# Create the primer lookup with 35 padding
padding = 10
padding = 35
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=self.amplicons,
pools=self.pools,
padding=padding,
)
# Check that the primer lookup is a dictionary
Expand All @@ -100,31 +95,81 @@ def test_create_primer_lookup_padding(self):
# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(max(self.pools), 29903 + 1), # +1 for half open interval
(2, 29903 + 1), # +1 for half open interval
)

# Check each amplicon is present in the lookup and in correct pool
# Check each amplicon is present in the lookup
for amplicon in self.amplicons:
# Check amplicon spans correct region, and correct pool
self.assertEqual(
set(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start : amplicon.amplicon_end
]
# Check amplicon spans correct region
contents = []
for row in primer_lookup[amplicon.chrom][
:,
max(0, amplicon.amplicon_start - padding) : min(
amplicon.amplicon_end + padding, 29903 + 1
),
set([amplicon]),
)
]:
contents.append(set(row))
self.assertIn(set([amplicon]), contents)

# Check padding is aligned correctly
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_start - padding - 1
]
)
self.assertIsNone(
primer_lookup[amplicon.chrom][
amplicon.ipool, amplicon.amplicon_end + padding # -1 +1
]
)
for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_start - padding - 1
]:
self.assertIsNot(row, amplicon)

for row in primer_lookup[amplicon.chrom][
:, amplicon.amplicon_end + padding # +1 -1
]:
self.assertIsNot(row, amplicon)

def test_create_primer_lookup_no_overlap(self):
"""
Test that the primer lookup is created correctly
"""
# Create the primer lookup with a single amplicon
padding = 0
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=[self.amplicons[0]],
padding=padding,
)

# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(1, 29903 + 1), # +1 for half open interval
)

def test_create_primer_lookup_overlap(self):
"""
Test that the primer lookup is created correctly
"""
# Create some fake amplicon
# nCoV-2019_3 overlaps with nCoV-2019_1
scheme = Scheme.from_str(
"MN908947.3 30 54 nCoV-2019_1_LEFT_1 1 + ACCAACCAACTTTCGATCTCTTGT\n"
"MN908947.3 385 410 nCoV-2019_1_RIGHT_1 1 - CATCTTTAAGATGTTGACGTGCCTC\n"
"MN908947.3 320 342 nCoV-2019_2_LEFT_1 2 + CTGTTTTACAGGTTCGCGACGT\n"
"MN908947.3 704 726 nCoV-2019_2_RIGHT_1 2 - TAAGGATCAGTGCCAAGCTCGT\n"
"MN908947.3 385 400 nCoV-2019_3_LEFT_1 1 + CGGTAATAAAGGAGCTGGTGGC\n"
"MN908947.3 800 820 nCoV-2019_3_RIGHT_1 1 - AAGGTGTCTGCAATTCATAGCTCT\n"
)
scheme.bedlines = merge_primers(scheme.bedlines)
amplicons = create_amplicons(scheme.bedlines)

# Create the primer lookup with a single amplicon
padding = 0
primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=amplicons,
padding=padding,
)

# Check the size of the primer lookup
self.assertEqual(
primer_lookup["MN908947.3"].shape,
(3, 29903 + 1), # +1 for half open interval
)


class TestFindPrimerWithLookup(unittest.TestCase):
Expand All @@ -139,7 +184,6 @@ def setUp(self):
self.primer_lookup = create_primer_lookup(
ref_len_tuple=self.ref_len,
amplicons=self.amplicons,
pools=self.pools,
padding=0,
)

Expand Down
Loading