Skip to content

Commit 4d7e844

Browse files
committed
Implement multi-store persistence (OpenDAL) with sled + dashmap.
Write resource blobs to both stores; read from fastest (benchmarked at init) with sled fallback. Add delete across stores. Keeps existing Sled indexes untouched. Preps for full Persistable pattern across profiles.
1 parent d71e33f commit 4d7e844

File tree

5 files changed

+135
-14
lines changed

5 files changed

+135
-14
lines changed

lessons-learned.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Lessons learned from OpenDAL integration and terraphim_persistence pattern
2+
3+
- Consistency first: Switching only reads to OpenDAL while writes remain on Sled breaks read-after-write; dual-write or single-path is required for correctness.
4+
- One abstraction boundary: Use OpenDAL as the single storage interface; let Sled/DashMap/RocksDB be OpenDAL services instead of directly coupling to them.
5+
- Fastest-read via benchmarking: Measuring operator latency at startup and selecting the fastest improves read performance; still need write-all for durability.
6+
- Tokio runtime scope: Avoid constructing runtimes deep inside libraries. Expose async APIs or use appropriate blocking adaptors.
7+
- Migration strategy: Plan backfill and deletion symmetry. When introducing a new backend, provide tools/tests to migrate and keep stores in sync.
8+
- Feature gating services: Keep backend choices behind features to reduce dependency surface and compile times.
9+
- Key normalization: Stable, normalized keys (e.g., `document_<id>.json`) avoid cross-backend issues.
10+
- Testing breadth: Memory-only configs are invaluable for CI; integration tests for each optional backend help catch configurational drift.

lib/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ directories = {version = ">= 2, < 5", optional = true}
2222
html2md = {version = "0.2.14", optional = true}
2323
kuchiki = {version = "0.8.1", optional = true}
2424
lol_html = {version = "0.3.1", optional = true}
25-
opendal = { version = "0.39", features = ["services-sled"] }
25+
opendal = { version = "0.39", features = ["services-sled", "services-dashmap"] }
2626
rand = {version = "0.8"}
2727
regex = "1"
2828
ring = "0.16.19"
@@ -32,6 +32,7 @@ serde = {version = "1", features = ["derive"]}
3232
serde_json = "1"
3333
sled = {version = "0.34", optional = true, features = ["no_logs"]}
3434
tokio = "1.29.1"
35+
uuid = { version = "1", features = ["v4"] }
3536
toml = {version = "0.7", optional = true}
3637
tracing = "0.1"
3738
ureq = "2"

lib/src/db.rs

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ use self::{
4141
val_prop_sub_index::{add_atom_to_reference_index, remove_atom_from_reference_index},
4242
};
4343

44+
// External crates used in persistable multi-store setup
45+
use uuid;
46+
4447
// A function called by the Store when a Commit is accepted
4548
type HandleCommit = Box<dyn Fn(&CommitResponse) + Send + Sync>;
4649

@@ -63,8 +66,10 @@ pub struct Db {
6366
/// Try not to use this directly, but use the Trees.
6467
db: sled::Db,
6568
default_agent: Arc<Mutex<Option<crate::agents::Agent>>>,
66-
/// Stores all resources in OpenDAL. The Key is the Subject as a `string.as_bytes()`, the value a [PropVals]. Propvals must be serialized using [bincode].
67-
dal_resources: opendal::Operator,
69+
/// OpenDAL operators by profile name (e.g., "sled", "dashmap") for resource blobs
70+
dal_ops: std::collections::HashMap<String, opendal::Operator>,
71+
/// Fastest OpenDAL operator chosen via a simple read benchmark
72+
dal_fastest: opendal::Operator,
6873
/// Stores all resources. The Key is the Subject as a `string.as_bytes()`, the value a [PropVals]. Propvals must be serialized using [bincode].
6974
resources: sled::Tree,
7075
/// Index of all Atoms, sorted by {Value}-{Property}-{Subject}.
@@ -92,26 +97,64 @@ impl Db {
9297
/// The server_url is the domain where the db will be hosted, e.g. http://localhost/
9398
/// It is used for distinguishing locally defined items from externally defined ones.
9499
pub fn init(path: &std::path::Path, server_url: String) -> AtomicResult<Db> {
95-
let mut dal_sled = opendal::services::Sled::default();
100+
// Local runtime for async OpenDAL ops during initialization
101+
let rt = tokio::runtime::Runtime::new()?;
96102

103+
// OpenDAL operator: Sled (on-disk)
104+
let mut dal_sled = opendal::services::Sled::default();
97105
let dal_path = path.clone().join("opendal");
98106
dal_sled
99107
.datadir(dal_path.to_str().expect("wrong data dir string"))
100108
.tree("resources_v1");
109+
let sled_op = opendal::Operator::new(dal_sled)
110+
.map_err(|_e| format!("Error operator: {}", _e))?
111+
.layer(opendal::layers::LoggingLayer::default())
112+
.finish();
101113

102-
let dal_op = opendal::Operator::new(dal_sled)
114+
// OpenDAL operator: DashMap (in-memory, fast)
115+
let dash_op = opendal::Operator::new(opendal::services::Dashmap::default())
103116
.map_err(|_e| format!("Error operator: {}", _e))?
104117
.layer(opendal::layers::LoggingLayer::default())
105118
.finish();
106119

120+
// Simple speed check: write+read small payload and measure read latency
121+
fn measure_read_ns(rt: &tokio::runtime::Runtime, op: &opendal::Operator) -> AtomicResult<u128> {
122+
use std::time::Instant;
123+
rt.block_on(async {
124+
let key = format!("bench_{}", uuid::Uuid::new_v4());
125+
let payload = b"atomic-bench".to_vec();
126+
op.write(&key, payload).await.map_err(|e| format!("bench write error: {e}"))?;
127+
let start = Instant::now();
128+
let _ = op.read(&key).await.map_err(|e| format!("bench read error: {e}"))?;
129+
let ns = start.elapsed().as_nanos();
130+
// best-effort cleanup
131+
let _ = op.delete(&key).await;
132+
Ok::<u128, String>(ns)
133+
}).map_err(|e| e.into())
134+
}
135+
136+
let sled_ns = measure_read_ns(&rt, &sled_op)?;
137+
let dash_ns = measure_read_ns(&rt, &dash_op)?;
138+
139+
let (fastest_name, fastest_op) = if dash_ns <= sled_ns {
140+
("dashmap".to_string(), dash_op.clone())
141+
} else {
142+
("sled".to_string(), sled_op.clone())
143+
};
144+
145+
let mut dal_ops = std::collections::HashMap::new();
146+
dal_ops.insert("sled".to_string(), sled_op);
147+
dal_ops.insert("dashmap".to_string(), dash_op);
148+
107149
let db = sled::open(path).map_err(|e|format!("Failed opening DB at this location: {:?} . Is another instance of Atomic Server running? {}", path, e))?;
108150
let resources = db.open_tree("resources_v1").map_err(|e|format!("Failed building resources. Your DB might be corrupt. Go back to a previous version and export your data. {}", e))?;
109151
let reference_index = db.open_tree("reference_index_v1")?;
110152
let query_index = db.open_tree("members_index")?;
111153
let prop_val_sub_index = db.open_tree("prop_val_sub_index")?;
112154
let watched_queries = db.open_tree("watched_queries")?;
113155
let store = Db {
114-
dal_resources: dal_op,
156+
dal_ops,
157+
dal_fastest: fastest_op,
115158
db,
116159
default_agent: Arc::new(Mutex::new(None)),
117160
resources,
@@ -122,7 +165,7 @@ impl Db {
122165
watched_queries,
123166
endpoints: default_endpoints(),
124167
on_commit: None,
125-
runtime: Arc::new(tokio::runtime::Runtime::new()?),
168+
runtime: Arc::new(rt),
126169
};
127170
migrate_maybe(&store).map(|e| format!("Error during migration of database: {:?}", e))?;
128171
crate::populate::populate_base_models(&store)
@@ -165,6 +208,13 @@ impl Db {
165208
#[instrument(skip(self))]
166209
fn set_propvals(&self, subject: &str, propvals: &PropVals) -> AtomicResult<()> {
167210
let resource_bin = bincode::serialize(propvals)?;
211+
// Write to all OpenDAL operators (multi-store), then to primary Sled tree used by indexes
212+
self.runtime.block_on(async {
213+
for (_name, op) in self.dal_ops.iter() {
214+
// Ignore per-operator errors to allow others to succeed; surface first error if needed later
215+
let _ = op.write(subject, resource_bin.clone()).await;
216+
}
217+
});
168218
self.resources.insert(subject.as_bytes(), resource_bin)?;
169219
Ok(())
170220
}
@@ -179,17 +229,21 @@ impl Db {
179229
/// Deals with the binary API of Sled
180230
#[instrument(skip(self))]
181231
fn get_propvals(&self, subject: &str) -> AtomicResult<PropVals> {
182-
let propval_maybe = self.runtime.block_on(async {
183-
self.dal_resources
232+
// Try fastest OpenDAL operator first
233+
let mut propval_maybe = self.runtime.block_on(async {
234+
self.dal_fastest
184235
.read(subject)
185236
.await
186-
.map_err(|e| format!("Can't open {} from store: {}", subject, e))
237+
.map_err(|e| format!("Can't open {} from fastest store: {}", subject, e))
187238
.ok()
188239
});
189-
// let propval_maybe = self
190-
// .resources
191-
// .get(subject.as_bytes())
192-
// .map_err(|e| format!("Can't open {} from store: {}", subject, e))?;
240+
// Fallback to Sled resources tree if OpenDAL miss
241+
if propval_maybe.is_none() {
242+
propval_maybe = self
243+
.resources
244+
.get(subject.as_bytes())
245+
.map_err(|e| format!("Can't open {} from sled resources: {}", subject, e))?;
246+
}
193247
match propval_maybe.as_ref() {
194248
Some(binpropval) => {
195249
let propval: PropVals = bincode::deserialize(binpropval).map_err(|e| {
@@ -612,6 +666,12 @@ impl Storelike for Db {
612666
let remove_atom = crate::Atom::new(subject.into(), prop.clone(), val.clone());
613667
self.remove_atom_from_index(&remove_atom, &resource)?;
614668
}
669+
// Remove from OpenDAL operators (best-effort)
670+
self.runtime.block_on(async {
671+
for (_name, op) in self.dal_ops.iter() {
672+
let _ = op.delete(subject).await;
673+
}
674+
});
615675
let _found = self.resources.remove(subject.as_bytes())?;
616676
} else {
617677
return Err(format!(

memories.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
OpenDAL integration review (atomic-server)
2+
3+
- Branch: origin/opendal; commits include "WIP OpenDAL #433" and "WIP dal".
4+
- Changes: adds OpenDAL (services-sled) to `lib/Cargo.toml`; in `lib/src/db.rs` adds `dal_resources: opendal::Operator`, initializes it with OpenDAL Sled at `<db_path>/opendal` tree `resources_v1`, and switches `get_propvals` to read via `dal_resources.read(subject)` using a new Tokio runtime.
5+
- Gaps / issues:
6+
- Writes still go to Sled via `set_propvals` (OpenDAL not used for write). Reads now use OpenDAL, so read-after-write breaks (OpenDAL store is empty).
7+
- Deletion still removes from Sled tree; OpenDAL items (if any) won’t be deleted.
8+
- New Tokio runtime inside the DB struct risks nested runtimes and increases complexity; prefer async surface or OpenDAL blocking API.
9+
- Parallel index/other code still relies on Sled trees; storage is now split across two backends.
10+
- Recommendation: unify I/O through OpenDAL (including Sled via `services-sled`) OR dual-write until migration completes. Remove in-DB runtime; make read path async or use blocking layer. Add migration/backfill from Sled to OpenDAL and tests for CRUD consistency.
11+
12+
Multi-store pattern (terraphim-ai/terraphim_persistence)
13+
14+
- `Persistable` trait: save to all profiles, load from fastest. Key derived by implementor (e.g., `document_<id>.json`).
15+
- `settings::parse_profiles`: builds OpenDAL operators for profiles (memory, dashmap, rocksdb, redb, sqlite, s3, atomicserver, etc.), benchmarks read latency and picks fastest.
16+
- Implementations provided for `Thesaurus` and `Document`; includes memory-only helpers and tests for memory/rocksdb/redb/sqlite.
17+
- This matches the intended architecture: write-multiplex + read-from-fastest.
18+
19+
Action items alignment
20+
21+
- Port the terraphim pattern to atomic-server: single abstraction via OpenDAL; write to all configured profiles (or at least primary+replica), read from fastest; ensure index/storage consistency and clear migration path.

scratchpad.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Atomic-server OpenDAL PR review checklist
2+
3+
- Code changes
4+
- [x] Added opendal dep w/ services-sled
5+
- [x] Added `dal_resources: Operator` to Db
6+
- [x] Init OpenDAL Sled under `<db>/opendal` tree `resources_v1`
7+
- [x] Read path switched to `dal_resources.read(subject)`
8+
- [ ] Write path still uses Sled `resources.insert`
9+
- [ ] Delete path still uses Sled remove
10+
- [ ] Indexing and queries use Sled trees only
11+
- [ ] Concurrency/runtime: embedded tokio runtime introduced
12+
13+
- Risks
14+
- Read-after-write inconsistency (OpenDAL store empty)
15+
- Double storage cost; unclear source of truth
16+
- Migration undefined; no backfill from existing sled tree
17+
- Runtime nesting problems in servers already using Tokio
18+
19+
- Proposed plan
20+
1) Decide strategy: a) OpenDAL-only with `services-sled` as one profile; or b) dual-write with single-read (fastest) via profiles like terraphim.
21+
2) If (a): wrap all CRUD in OpenDAL; drop direct sled access for propvals; use sled only for indexes until ported.
22+
3) If (b): implement `save_to_all` concept in atomic Db for propvals (write to N stores); pick fastest for read; keep indexes coherent on one canonical store.
23+
4) Remove embedded runtime; move read calls to async surface or blocking shim.
24+
5) Add migration: backfill OpenDAL from Sled existing items. One-time tool or lazy-on-read write-through.
25+
6) Tests: CRUD, consistency across stores, performance benchmark selecting fastest profile.
26+
27+
- Notes
28+
- terraphim-persistence crate already models profiles + speed test + save_to_all + read_fastest. Could reuse patterns and helper code or even the crate.
29+
- Consider features for memory/dashmap/rocksdb/redb/sqlite - align with atomic-server env.

0 commit comments

Comments
 (0)