Skip to content

Conversation

@zaidoon1
Copy link
Contributor

@zaidoon1 zaidoon1 commented Nov 8, 2025

resolve #190

@marvin-j97 marvin-j97 added this to the 3.0.0 milestone Nov 8, 2025
@zaidoon1 zaidoon1 force-pushed the zaidoon/ingestion-api-inversion-control branch 3 times, most recently from a043736 to 14ef584 Compare November 10, 2025 03:45
@marvin-j97
Copy link
Contributor

I'm currently massively simplifying the flush logic such that is gets really easy to flush active and sealed memtables in one go by just calling Tree::rotate_memtable followed by Tree::flush while holding a lock.
That also simplifies the flushing logic in fjall.

@marvin-j97
Copy link
Contributor

marvin-j97 commented Nov 11, 2025

(Forgot to submit comments)

But as I said, I am currently changing the Flush API which should make the ingestion API much simpler.

(WIP at #194)

@zaidoon1 zaidoon1 force-pushed the zaidoon/ingestion-api-inversion-control branch 3 times, most recently from 1af202a to 87c7d03 Compare November 15, 2025 08:10
@zaidoon1 zaidoon1 force-pushed the zaidoon/ingestion-api-inversion-control branch from 87c7d03 to 36b2651 Compare November 15, 2025 08:38
@zaidoon1 zaidoon1 requested a review from marvin-j97 November 15, 2025 08:48
@zaidoon1 zaidoon1 requested a review from marvin-j97 November 15, 2025 14:08
Comment on lines 150 to 151
let seq = seqno.next();
let mut ingestion = src.ingestion()?.with_seqno(seq);
Copy link
Contributor

@marvin-j97 marvin-j97 Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically if we take a sequence number x, we could have a race condition here where an insert is performed before the ingestion is setup.

Think:

let ingestion_seqno = next() = 1;

insert(next() = 2);

// ingestion now starts
  -> active memtable is flushed with the seqno=2 KV
  -> we finish the ingestion with seqno=1

the seqno=2 KV is now shadowed by seqno=1 -> invariant broken

It is extremely unlikely, but I think it's a possibility. I think that's why I initially let the ingestion take a seqno after getting the version write lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, Internally, the tree uses MVCC with sequence numbers so:

  • Reads pick the highest seqno < snapshot_seqno for a given key.
  • The pre-ingestion flush in Ingestion::new uses SeqNo::MAX as the GC threshold, so it does not drop newer versions.

That means even in the scenario:

  • insert at seq = 2,

  • ingestion run at seq = 1,

a read at SeqNo::MAX still sees the version with seq = 2, not 1.

So there isn’t an actual bug where a lower seq overwrites a higher one.

In any case, i've updated the tests just to be safe.

Copy link
Contributor

@marvin-j97 marvin-j97 Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[test]
fn ingestion_seqno_race_condition() -> lsm_tree::Result<()> {
    let folder = tempfile::tempdir()?;

    let seqno = SequenceNumberCounter::default();
    let tree = Config::new(&folder, seqno.clone()).open()?;

    let ingest = tree.ingestion()?;
    let mut ingest = ingest.with_seqno(seqno.next());

    {
        // Imagine another thread now writing before we get to call finish
        tree.insert("a", "a", seqno.next());
        assert_eq!(b"a", &*tree.get("a", seqno.get())?.unwrap());
        let flush_lock = tree.get_flush_lock();
        tree.rotate_memtable();
        tree.flush(&flush_lock, 0)?;
    }

    ingest.write("a".into(), "b".into())?;
    ingest.finish()?;

    assert_eq!(b"a", &*tree.get("a", seqno.get())?.unwrap());

    Ok(())
}

This reproduces what I meant. If we start an ingestion, but e.g. are scheduled out by the OS, we have seqno=x.
Another thread can now actually acquire seqno=(x+1), and write + flush a table.
Now we get to finish our ingestion. Because we got the seqno "too early", we will shadow the seqno=(x+1) item.

This is... very unlikely though. But it probably just shouldn't be possible.

// path as normal writes: any dirty memtable content is moved into
// tables before building new tables from the ingestion stream.
// This keeps the lookup path ordered as active > sealed > tables.
tree.flush_active_memtable(SeqNo::MAX)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[test]
fn ingestion_dirty_snapshot() -> lsm_tree::Result<()> {
    let folder = tempfile::tempdir()?;

    let seqno = SequenceNumberCounter::default();
    let tree = Config::new(&folder, seqno.clone()).open()?;

    tree.insert("a", "a", seqno.next());
    tree.insert("a", "b", seqno.next());

    let snapshot_seqno = 1;
    assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap());

    let mut ingest = tree.ingestion()?.with_seqno(seqno.next());
    ingest.write("b".into(), "b".into())?;
    ingest.finish()?;

    assert_eq!(b"a", &*tree.get("a", snapshot_seqno)?.unwrap());

    Ok(())
}

Using SeqNo::MAX breaks snapshot reads.

The more secure solution here would be to just use watermark = 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Expose Ingestion API with Inversion of Control

2 participants