mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-03-20 09:01:05 +08:00
The commitlog so far assumed that the latest segment is never compressed and can be opened for writing (if it is intact). However, restoring the entire commitlog from cold storage results in all segments being compressed. Make it so the resumption logic reads the metadata from the potentially compressed last segment, and starts a new segment for writing if the latest one was indeed compressed. # Expected complexity level and risk 1.5 # Testing Added a test.
164 lines
4.9 KiB
Rust
164 lines
4.9 KiB
Rust
use log::info;
|
|
use spacetimedb_commitlog::repo::Repo;
|
|
use spacetimedb_commitlog::tests::helpers::enable_logging;
|
|
use spacetimedb_commitlog::{commitlog, payload, repo, Commitlog, Options};
|
|
use spacetimedb_paths::server::CommitLogDir;
|
|
use spacetimedb_paths::FromPathUnchecked;
|
|
use tempfile::tempdir;
|
|
|
|
pub fn gen_payload() -> [u8; 256] {
|
|
rand::random()
|
|
}
|
|
|
|
#[test]
|
|
fn smoke() {
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let n_txs = 500;
|
|
let payload = gen_payload();
|
|
for i in 0..n_txs {
|
|
clog.commit([(i, payload)]).unwrap();
|
|
}
|
|
let committed_offset = clog.flush_and_sync().unwrap();
|
|
|
|
assert_eq!(n_txs - 1, committed_offset.unwrap());
|
|
assert_eq!(
|
|
n_txs as usize,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count()
|
|
);
|
|
// We set max_records_in_commit to 1, so n_commits == n_txs
|
|
assert_eq!(n_txs as usize, clog.commits().map(Result::unwrap).count());
|
|
}
|
|
|
|
#[test]
|
|
fn resets() {
|
|
let root = tempdir().unwrap();
|
|
let mut clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 512,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let payload = gen_payload();
|
|
for i in 0..50 {
|
|
clog.commit([(i, payload)]).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
for offset in (0..50).rev() {
|
|
clog = clog.reset_to(offset).unwrap();
|
|
assert_eq!(
|
|
offset,
|
|
clog.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.last()
|
|
.unwrap()
|
|
.offset
|
|
);
|
|
// We're counting from zero, so offset + 1 is the # of txs.
|
|
assert_eq!(
|
|
offset + 1,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() as u64
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Try to generate commitlogs that will be amenable to compression -
|
|
/// random data doesn't compress well, so try and have there be repetition
|
|
fn compressible_payloads() -> impl Iterator<Item = [u8; 256]> {
|
|
(0..4).map(|_| gen_payload()).cycle()
|
|
}
|
|
|
|
#[test]
|
|
fn compression() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let payloads = compressible_payloads().take(1024).collect::<Vec<_>>();
|
|
for (i, payload) in payloads.iter().enumerate() {
|
|
clog.commit([(i as u64, *payload)]).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
let uncompressed_size = clog.size_on_disk().unwrap();
|
|
|
|
let segments = clog.existing_segment_offsets().unwrap();
|
|
let segments_to_compress = &segments[..segments.len() / 2];
|
|
info!("segments: {segments:?} compressing: {segments_to_compress:?}");
|
|
clog.compress_segments(segments_to_compress).unwrap();
|
|
|
|
let compressed_size = clog.size_on_disk().unwrap();
|
|
assert!(compressed_size.total_bytes < uncompressed_size.total_bytes);
|
|
|
|
assert!(clog
|
|
.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.enumerate()
|
|
.all(|(i, x)| x.offset == i as u64 && x.txdata == payloads[i]));
|
|
}
|
|
|
|
/// When restoring an archived commitlog, all segments are compressed and should
|
|
/// remain immutable.
|
|
///
|
|
/// Tests that this is upheld, i.e. a fresh segment is created when resuming
|
|
/// writes.
|
|
#[test]
|
|
fn all_segments_sealed() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let path = CommitLogDir::from_path_unchecked(root.path());
|
|
let opts = Options {
|
|
max_segment_size: 64 * 1024,
|
|
..<_>::default()
|
|
};
|
|
let num_commits = 1024;
|
|
let repo = repo::Fs::new(path, None).unwrap();
|
|
{
|
|
let mut clog = commitlog::Generic::open(&repo, opts).unwrap();
|
|
for (i, payload) in compressible_payloads().take(num_commits).enumerate() {
|
|
clog.commit([(i as u64, payload)]).unwrap();
|
|
}
|
|
clog.flush().unwrap();
|
|
clog.sync();
|
|
}
|
|
|
|
let segments = repo.existing_offsets().unwrap();
|
|
let num_segments = segments.len();
|
|
|
|
// Compress all segments via the `repo`,
|
|
// to not trigger the assert that the head segment cannot be compressed.
|
|
for segment in segments {
|
|
repo.compress_segment(segment).unwrap();
|
|
}
|
|
|
|
// Re-opening the commitlog should create a fresh segment at offset `num_commits`.
|
|
let _ = commitlog::Generic::<_, [u8; 256]>::open(&repo, opts).unwrap();
|
|
let segments = repo.existing_offsets().unwrap();
|
|
assert_eq!(num_segments + 1, segments.len());
|
|
assert_eq!(segments.last().copied(), Some(num_commits as u64));
|
|
}
|