Skip to content
Open
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ napi = { version = "2", default-features = false, features = [
"napi5",
"compat-mode",
] }
nohash-hasher = "0.2.0"
notify = "8.1.0"
once_cell = "1.17.1"
owo-colors = "4.2.2"
Expand Down
6 changes: 4 additions & 2 deletions turbopack/crates/turbo-persistence-tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() -> Result<()> {
bail!("The provided path does not exist: {}", path.display());
}

let db: TurboPersistence<SerialScheduler> = TurboPersistence::open_read_only(path)?;
let db: TurboPersistence<SerialScheduler, 0> = TurboPersistence::open_read_only(path)?;
let meta_info = db
.meta_info()
.context("Failed to retrieve meta information")?;
Expand All @@ -34,12 +34,14 @@ fn main() -> Result<()> {
amqf_size,
amqf_entries,
sst_size,
flags,
key_compression_dictionary_size,
block_count,
} in meta_file.entries
{
println!(
" SST {sequence_number:08}.sst: {min_hash:016x} - {max_hash:016x} (p = 1/{})",
" SST {sequence_number:08}.sst: {flags} {min_hash:016x} - {max_hash:016x} (p = \
1/{})",
u64::MAX / (max_hash - min_hash + 1)
);
println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024);
Expand Down
3 changes: 3 additions & 0 deletions turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ verbose_log = []

[dependencies]
anyhow = { workspace = true }
bitfield = { workspace = true }
dashmap = { workspace = true}
either = { workspace = true }
pot = "3.0.0"
byteorder = { workspace = true }
jiff = "0.2.10"
lzzzz = "1.1.0"
memmap2 = "0.9.5"
nohash-hasher = { workspace = true }
parking_lot = { workspace = true }
qfilter = { version = "0.2.4", features = ["serde"] }
quick_cache = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions turbopack/crates/turbo-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ A meta file can contain metadata about multiple SST files. The metadata is store
- 8 bytes min hash
- 8 bytes max hash
- 8 bytes SST file size
- 4 bytes flags
- bit 0: cold (compacted and not recently accessed)
- bit 1: fresh (not yet compacted)
- 4 bytes end of AMQF offset relative to start of all AMQF data
- 4 bytes end of AMQF offset relative to start of all AMQF data of the "used key hashes" AMQF
- foreach described SST file
- serialized AMQF
- serialized "used key hashes" AMQF

### SST file

Expand Down Expand Up @@ -169,7 +174,7 @@ Compaction chooses a few SST files and runs the merge step of merge sort on tham

Example:

```
``` text
key hash range: | 0 ... u64::MAX |
SST 1: |----------------|
SST 2: |----------------|
Expand All @@ -178,7 +183,7 @@ SST 3: |-----|

can be compacted into:

```
``` text
key hash range: | 0 ... u64::MAX |
SST 1': |-------|
SST 2': |------|
Expand Down Expand Up @@ -206,7 +211,7 @@ Full example:

Example:

```
``` text
key hash range: | 0 ... u64::MAX | Family
SST 1: |-| 1
SST 2: |----------------| 1
Expand Down Expand Up @@ -234,7 +239,7 @@ Then we delete SST files 2, 3, 6 and 4, 5, 8 and 7, 9. The

SST files 1 stays unchanged.

```
``` text
key hash range: | 0 ... u64::MAX | Family
SST 1: |-| 1
SST 10: |-----| 1
Expand Down
37 changes: 28 additions & 9 deletions turbopack/crates/turbo-persistence/src/compaction/selector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::RangeInclusive;

use rustc_hash::FxHashMap;
use smallvec::{SmallVec, smallvec};

use crate::compaction::interval_map::IntervalMap;
Expand All @@ -12,6 +13,12 @@ pub trait Compactable {

/// The size of the compactable database segment in bytes.
fn size(&self) -> u64;

/// The category of the compactable. Overlap between different categories is not considered for
/// compaction.
fn category(&self) -> u8 {
0
}
}

fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
Expand All @@ -37,6 +44,7 @@ fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
extended
}

#[cfg(test)]
#[derive(Debug)]
pub struct CompactableMetrics {
/// The total coverage of the compactables.
Expand All @@ -53,6 +61,7 @@ pub struct CompactableMetrics {
}

/// Computes metrics about the compactables.
#[cfg(test)]
pub fn compute_metrics<T: Compactable>(
compactables: &[T],
full_range: RangeInclusive<u64>,
Expand Down Expand Up @@ -155,7 +164,7 @@ impl DuplicationInfo {
/// Get a value in the range `0..=u64` that represents the estimated amount of duplication
/// across the given range. The units are arbitrary, but linear.
fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
if self.total_size == 0 {
if self.max_size == self.total_size {
return 0;
}
// the maximum numerator value is `u64::MAX + 1`
Expand All @@ -167,6 +176,7 @@ impl DuplicationInfo {
}

/// The estimated size (in bytes) of a database segment containing `range` keys.
#[cfg(test)]
fn size(&self, range: &RangeInclusive<u64>) -> u64 {
if self.total_size == 0 {
return 0;
Expand All @@ -190,11 +200,14 @@ impl DuplicationInfo {
}
}

fn total_duplication_size(duplication: &IntervalMap<Option<DuplicationInfo>>) -> u64 {
fn total_duplication_size(duplication: &IntervalMap<FxHashMap<u8, DuplicationInfo>>) -> u64 {
duplication
.iter()
.flat_map(|(range, info)| Some((range, info.as_ref()?)))
.map(|(range, info)| info.duplication(&range))
.map(|(range, info)| {
info.values()
.map(|info| info.duplication(&range))
.sum::<u64>()
})
.sum()
}

Expand Down Expand Up @@ -238,16 +251,18 @@ pub fn get_merge_segments<T: Compactable>(
}
let start_compactable_range = start_compactable.range();
let start_compactable_size = start_compactable.size();
let start_compactable_category = start_compactable.category();
let mut current_range = start_compactable_range.clone();

// We might need to restart the search if we need to extend the range.
'search: loop {
let mut current_set = smallvec![start_index];
let mut current_size = start_compactable_size;
let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
let mut duplication = IntervalMap::<FxHashMap<u8, DuplicationInfo>>::new();
duplication.update(start_compactable_range.clone(), |dup_info| {
dup_info
.get_or_insert_default()
.entry(start_compactable_category)
.or_default()
.add(start_compactable_size, &start_compactable_range);
});
let mut current_skip = 0;
Expand Down Expand Up @@ -337,8 +352,9 @@ pub fn get_merge_segments<T: Compactable>(
// set.
current_set.push(next_index);
current_size += size;
let category = compactable.category();
duplication.update(range.clone(), |dup_info| {
dup_info.get_or_insert_default().add(size, &range);
dup_info.entry(category).or_default().add(size, &range);
});
}
}
Expand Down Expand Up @@ -633,13 +649,16 @@ mod tests {

let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
println!(
"Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})",
"Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({}), \
duplicated_size: {} ({})",
new_metrics.coverage,
new_metrics.coverage - metrics.coverage,
new_metrics.overlap,
new_metrics.overlap - metrics.overlap,
new_metrics.duplication,
new_metrics.duplication - metrics.duplication
new_metrics.duplication - metrics.duplication,
new_metrics.duplicated_size,
(new_metrics.duplicated_size as f32) - metrics.duplicated_size as f32,
);
} else {
println!("No compaction needed");
Expand Down
1 change: 0 additions & 1 deletion turbopack/crates/turbo-persistence/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{mem::MaybeUninit, sync::Arc};
use anyhow::{Context, Result};
use lzzzz::lz4::{ACC_LEVEL_DEFAULT, decompress, decompress_with_dict};

#[tracing::instrument(level = "trace", skip_all, name = "decompress database block")]
pub fn decompress_into_arc(
uncompressed_length: u32,
block: &[u8],
Expand Down
Loading
Loading