Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions storage/rocksdb/rdb_locking_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
/* This C++ file's header file */
#include "./rdb_locking_iter.h"

static const ulonglong max_lock_count = 1000;

namespace myrocks {

rocksdb::Iterator* GetLockingIterator(
Expand Down Expand Up @@ -119,11 +121,12 @@ void LockingIterator::lock_up_to(bool scan_forward,
m_have_locked_until= true;
m_locked_until.assign(end_key.data(), end_key.size());
if (m_lock_count) (*m_lock_count)++;
m_self_lock_count++;
}
}

/*
Lock the range from target till the iterator end point that we are scaning
Lock the range from target till the iterator end point that we are scanning
towards. If there's no iterator bound, use index start (or end, depending
on the scan direction)
*/
Expand All @@ -141,7 +144,7 @@ void LockingIterator::lock_till_iterator_end(bool scan_forward,
else
m_kd->get_supremum_key(buf, &size);

DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE);
assert(size == Rdb_key_def::INDEX_NUMBER_SIZE);
end = rocksdb::Slice((const char*)buf, size);
}
} else {
Expand All @@ -153,7 +156,7 @@ void LockingIterator::lock_till_iterator_end(bool scan_forward,
else
m_kd->get_infimum_key(buf, &size);

DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE);
assert(size == Rdb_key_def::INDEX_NUMBER_SIZE);
end = rocksdb::Slice((const char*)buf, size);
}
}
Expand All @@ -171,6 +174,8 @@ void LockingIterator::lock_till_iterator_end(bool scan_forward,
*/
void LockingIterator::Scan(bool scan_forward,
const rocksdb::Slice& target, bool call_next) {


if (!m_iter->Valid()) {
m_status = m_iter->status();
m_valid = false;
Expand All @@ -180,6 +185,10 @@ void LockingIterator::Scan(bool scan_forward,
}
return;
}
if (m_self_lock_count > max_lock_count) {
// already locked the entire index instead
return;
}

while (1) {

Expand All @@ -197,11 +206,19 @@ void LockingIterator::Scan(bool scan_forward,
auto end_key = m_iter->key();
std::string end_key_copy= end_key.ToString();

lock_up_to(scan_forward, target, end_key);
if (!m_status.ok()) {
// Failed to get a lock (most likely lock wait timeout)
m_valid = false;
return;
if (m_self_lock_count == max_lock_count)
{
// we can't handle too many small locks, just lock everything
lock_till_iterator_end(scan_forward, target);
}
if (m_self_lock_count < max_lock_count)
{
lock_up_to(scan_forward, target, end_key);
if (!m_status.ok()) {
// Failed to get a lock (most likely lock wait timeout)
m_valid = false;
return;
}
}

//Ok, now we have a lock which is inhibiting modifications in the range
Expand Down Expand Up @@ -265,7 +282,7 @@ void LockingIterator::Scan(bool scan_forward,
*/

void LockingIterator::SeekToFirst() {
DBUG_ASSERT(0);
assert(0);
m_status = rocksdb::Status::NotSupported("Not implemented");
m_valid = false;
}
Expand All @@ -276,7 +293,7 @@ void LockingIterator::SeekToFirst() {
*/

void LockingIterator::SeekToLast() {
DBUG_ASSERT(0);
assert(0);
m_status = rocksdb::Status::NotSupported("Not implemented");
m_valid = false;
}
Expand Down
1 change: 1 addition & 0 deletions storage/rocksdb/rdb_locking_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class LockingIterator : public rocksdb::Iterator {
bool m_valid;

ulonglong *m_lock_count;
ulonglong m_self_lock_count = 0;

// If true, m_locked_until has a valid key value.
bool m_have_locked_until;
Expand Down