Skip to content

Commit 88e76ca

Browse files
committed
Add redb, rocksdb and fs OpenDAL services to persistable layer; tests and benches.
- Initialize optional operators behind features and include in dual-write/read-fastest selection - Test that all configured profiles receive blobs and are enumerated - Add Criterion benches for op write/read per operator and key paths
1 parent c8db3cf commit 88e76ca

File tree

4 files changed

+127
-25
lines changed

4 files changed

+127
-25
lines changed

lib/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ directories = {version = ">= 2, < 5", optional = true}
2222
html2md = {version = "0.2.14", optional = true}
2323
kuchiki = {version = "0.8.1", optional = true}
2424
lol_html = {version = "0.3.1", optional = true}
25-
opendal = { version = "0.39", features = ["services-sled", "services-dashmap"] }
25+
opendal = { version = "0.39", features = ["services-sled", "services-dashmap", "services-rocksdb", "services-redb", "services-fs"] }
2626
rand = {version = "0.8"}
2727
regex = "1"
2828
ring = "0.16.19"
@@ -50,3 +50,6 @@ config = ["directories", "toml"]
5050
db = ["sled", "bincode"]
5151
html = ["kuchiki", "lol_html", "html2md"]
5252
rdf = ["rio_api", "rio_turtle"]
53+
persist-rocksdb = ["opendal/services-rocksdb"]
54+
persist-redb = ["opendal/services-redb"]
55+
persist-fs = ["opendal/services-fs"]

lib/benches/benchmarks.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
55
use atomic_lib::utils::random_string;
66
use atomic_lib::*;
7-
use criterion::{criterion_group, criterion_main, Criterion};
7+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
8+
use std::time::Duration;
89

910
fn random_atom() -> Atom {
1011
Atom::new(
@@ -21,17 +22,21 @@ fn random_resource(atom: &Atom) -> Resource {
2122
}
2223

2324
fn criterion_benchmark(c: &mut Criterion) {
25+
let mut g = c.benchmark_group("persistable");
26+
g.sample_size(20);
27+
g.measurement_time(Duration::from_secs(10));
28+
2429
let store = Db::init_temp("bench").unwrap();
2530

26-
c.bench_function("add_atom_to_index", |b| {
31+
g.bench_function("add_atom_to_index", |b| {
2732
b.iter(|| {
2833
let atom = random_atom();
2934
let resource = random_resource(&random_atom());
3035
store.add_atom_to_index(&atom, &resource).unwrap();
3136
})
3237
});
3338

34-
c.bench_function("add_resource", |b| {
39+
g.bench_function("add_resource", |b| {
3540
b.iter(|| {
3641
let resource = random_resource(&random_atom());
3742
store
@@ -40,7 +45,7 @@ fn criterion_benchmark(c: &mut Criterion) {
4045
})
4146
});
4247

43-
c.bench_function("resource.save()", |b| {
48+
g.bench_function("resource.save()", |b| {
4449
b.iter(|| {
4550
let mut resource = random_resource(&random_atom());
4651
resource.save(&store).unwrap();
@@ -51,35 +56,55 @@ fn criterion_benchmark(c: &mut Criterion) {
5156
.get_resource_extended("https://localhost/collections", false, None)
5257
.unwrap();
5358

54-
c.bench_function("resource.to_json_ad()", |b| {
59+
g.bench_function("resource.to_json_ad()", |b| {
5560
b.iter(|| {
5661
big_resource.to_json_ad().unwrap();
5762
})
5863
});
5964

60-
c.bench_function("resource.to_json_ld()", |b| {
65+
g.bench_function("resource.to_json_ld()", |b| {
6166
b.iter(|| {
6267
big_resource.to_json_ld(&store).unwrap();
6368
})
6469
});
6570

66-
c.bench_function("resource.to_json()", |b| {
71+
g.bench_function("resource.to_json()", |b| {
6772
b.iter(|| {
6873
big_resource.to_json(&store).unwrap();
6974
})
7075
});
7176

72-
c.bench_function("resource.to_n_triples()", |b| {
77+
g.bench_function("resource.to_n_triples()", |b| {
7378
b.iter(|| {
7479
big_resource.to_n_triples(&store).unwrap();
7580
})
7681
});
7782

78-
c.bench_function("all_resources()", |b| {
83+
g.bench_function("all_resources()", |b| {
7984
b.iter(|| {
80-
let _all = store.all_resources(false).collect::<Vec<Resource>>();
85+
let _all = black_box(store.all_resources(false).collect::<Vec<Resource>>());
8186
})
8287
});
88+
89+
// Persistable operator benchmarks: write/read single blob via each configured operator
90+
for (name, op) in store.dal_ops.iter() {
91+
let key = format!("bench_{}", name);
92+
let data = vec![0u8; 16 * 1024];
93+
g.bench_function(&format!("op_write_{}", name), |b| {
94+
b.iter(|| {
95+
// We ignore errors because operators may share keys across iterations
96+
let _ = store
97+
.runtime
98+
.block_on(async { op.write(&key, data.clone()).await });
99+
})
100+
});
101+
g.bench_function(&format!("op_read_{}", name), |b| {
102+
b.iter(|| {
103+
let _ = store.runtime.block_on(async { op.read(&key).await });
104+
})
105+
});
106+
}
107+
g.finish();
83108
}
84109

85110
criterion_group!(benches, criterion_benchmark);

lib/src/db.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,39 @@ impl Db {
117117
.layer(opendal::layers::LoggingLayer::default())
118118
.finish();
119119

120+
// OpenDAL operator: RocksDB (on-disk kv)
121+
#[cfg(feature = "persist-rocksdb")]
122+
let rocks_op = {
123+
let mut b = opendal::services::Rocksdb::default();
124+
b.datadir(path.join("opendal_rocksdb").to_str().expect("rocks path"));
125+
opendal::Operator::new(b)
126+
.map_err(|_e| format!("Error operator: {}", _e))?
127+
.layer(opendal::layers::LoggingLayer::default())
128+
.finish()
129+
};
130+
131+
// OpenDAL operator: ReDB (embedded db)
132+
#[cfg(feature = "persist-redb")]
133+
let redb_op = {
134+
let mut b = opendal::services::Redb::default();
135+
b.datadir(path.join("opendal_redb").to_str().expect("redb path"));
136+
opendal::Operator::new(b)
137+
.map_err(|_e| format!("Error operator: {}", _e))?
138+
.layer(opendal::layers::LoggingLayer::default())
139+
.finish()
140+
};
141+
142+
// OpenDAL operator: FS (filesystem flat files)
143+
#[cfg(feature = "persist-fs")]
144+
let fs_op = {
145+
let mut b = opendal::services::Fs::default();
146+
b.root(path.join("opendal_fs").to_str().expect("fs path"));
147+
opendal::Operator::new(b)
148+
.map_err(|_e| format!("Error operator: {}", _e))?
149+
.layer(opendal::layers::LoggingLayer::default())
150+
.finish()
151+
};
152+
120153
// Simple speed check: write+read small payload and measure read latency
121154
fn measure_read_ns(
122155
rt: &tokio::runtime::Runtime,
@@ -144,16 +177,40 @@ impl Db {
144177

145178
let sled_ns = measure_read_ns(&rt, &sled_op)?;
146179
let dash_ns = measure_read_ns(&rt, &dash_op)?;
147-
148-
let fastest_op = if dash_ns <= sled_ns {
180+
#[cfg(feature = "persist-rocksdb")]
181+
let rocks_ns = measure_read_ns(&rt, &rocks_op)?;
182+
#[cfg(feature = "persist-redb")]
183+
let redb_ns = measure_read_ns(&rt, &redb_op)?;
184+
#[cfg(feature = "persist-fs")]
185+
let fs_ns = measure_read_ns(&rt, &fs_op)?;
186+
187+
let mut fastest_op = if dash_ns <= sled_ns {
149188
dash_op.clone()
150189
} else {
151190
sled_op.clone()
152191
};
192+
#[cfg(feature = "persist-rocksdb")]
193+
if rocks_ns < measure_read_ns(&rt, &fastest_op)? {
194+
fastest_op = rocks_op.clone();
195+
}
196+
#[cfg(feature = "persist-redb")]
197+
if redb_ns < measure_read_ns(&rt, &fastest_op)? {
198+
fastest_op = redb_op.clone();
199+
}
200+
#[cfg(feature = "persist-fs")]
201+
if fs_ns < measure_read_ns(&rt, &fastest_op)? {
202+
fastest_op = fs_op.clone();
203+
}
153204

154205
let mut dal_ops = std::collections::HashMap::new();
155206
dal_ops.insert("sled".to_string(), sled_op);
156207
dal_ops.insert("dashmap".to_string(), dash_op);
208+
#[cfg(feature = "persist-rocksdb")]
209+
dal_ops.insert("rocksdb".to_string(), rocks_op);
210+
#[cfg(feature = "persist-redb")]
211+
dal_ops.insert("redb".to_string(), redb_op);
212+
#[cfg(feature = "persist-fs")]
213+
dal_ops.insert("fs".to_string(), fs_op);
157214

158215
let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?;
159216
let resources = db.open_tree("resources_v1").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?;

lib/src/db/test.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -434,17 +434,19 @@ fn persistable_multi_store_dual_write_and_fallback() {
434434
r.save(&store).unwrap();
435435
let subject = r.get_subject().to_string();
436436

437-
// Both OpenDAL operators should have the blob
438-
let dash = store.dal_ops.get("dashmap").expect("dashmap op");
439-
let sled = store.dal_ops.get("sled").expect("sled op");
440-
let dash_bytes = store
441-
.runtime
442-
.block_on(async { dash.read(&subject).await.expect("dash read") });
443-
let sled_bytes = store
444-
.runtime
445-
.block_on(async { sled.read(&subject).await.expect("sled read") });
446-
assert!(dash_bytes.len() > 0);
447-
assert_eq!(dash_bytes, sled_bytes, "stores should contain same bytes");
437+
// All configured OpenDAL operators should have the blob
438+
let mut blobs: Vec<Vec<u8>> = Vec::new();
439+
for (name, op) in store.dal_ops.iter() {
440+
let bytes = store
441+
.runtime
442+
.block_on(async { op.read(&subject).await.expect("op read") });
443+
assert!(bytes.len() > 0, "{} empty", name);
444+
blobs.push(bytes);
445+
}
446+
// Compare all blobs to the first
447+
for b in blobs.iter() {
448+
assert_eq!(b, &blobs[0], "stores should contain same bytes");
449+
}
448450

449451
// Delete from the fastest store, ensure fallback (sled tree) still serves resource
450452
store
@@ -453,7 +455,7 @@ fn persistable_multi_store_dual_write_and_fallback() {
453455
.ok();
454456
// Should still be able to fetch via sled resources fallback
455457
let fetched = store.get_resource(&subject).expect("fallback should work");
456-
assert_eq!(fetched.get_subject(), subject);
458+
assert_eq!(fetched.get_subject(), &subject);
457459

458460
// Remove from sled resources tree and ensure not found now
459461
store
@@ -506,6 +508,21 @@ fn persistable_collections_still_work() {
506508
assert!(res.count >= 8, "count should include all members");
507509
}
508510

511+
#[test]
512+
fn persistable_profiles_available_and_benchmarked() {
513+
let store = Db::init_temp("persistable_profiles").unwrap();
514+
// Expect core profiles
515+
assert!(store.dal_ops.contains_key("sled"));
516+
assert!(store.dal_ops.contains_key("dashmap"));
517+
// Optional ones if features enabled
518+
#[cfg(feature = "persist-rocksdb")]
519+
assert!(store.dal_ops.contains_key("rocksdb"));
520+
#[cfg(feature = "persist-redb")]
521+
assert!(store.dal_ops.contains_key("redb"));
522+
#[cfg(feature = "persist-fs")]
523+
assert!(store.dal_ops.contains_key("fs"));
524+
}
525+
509526
#[test]
510527
/// Changing these values actually correctly updates the index.
511528
fn index_invalidate_cache() {

0 commit comments

Comments
 (0)