|
10 | 10 |
|
11 | 11 | namespace myrocks { |
12 | 12 |
|
13 | | -////////////////////////////////////////////////////////////////////////////// |
14 | | -// Locking iterator |
15 | | -////////////////////////////////////////////////////////////////////////////// |
16 | | - |
17 | 13 | // |
18 | 14 | // LockingIterator is an iterator that locks the rows before returning, as well |
19 | 15 | // as scanned gaps between the rows. |
@@ -100,183 +96,17 @@ class LockingIterator : public rocksdb::Iterator { |
100 | 96 | } |
101 | 97 |
|
102 | 98 | private: |
103 | | - /* |
104 | | - @brief |
105 | | - Lock range from target to end_key. |
106 | | -
|
107 | | - @detail |
108 | | - In forward-ordered scan, target < end_key. In backward-ordered scan, it's |
109 | | - vice versa. |
110 | | -
|
111 | | - We might have already locked a subset of this range, a subrange that |
112 | | - starts from target and extends to some point between target and end_key. |
113 | | - */ |
114 | | - template <bool forward> void lock_up_to(const rocksdb::Slice& target, |
115 | | - const rocksdb::Slice& end_key) { |
116 | | - const int inv = forward ? 1 : -1; |
117 | | - auto cmp= m_cfh->GetComparator(); |
118 | | - bool endp_arg= m_kd->m_is_reverse_cf; |
119 | | - |
120 | | - if (m_have_locked_until && |
121 | | - cmp->Compare(end_key, rocksdb::Slice(m_locked_until))*inv <= 0) { |
122 | | - // We've already locked this range. The following has happened: |
123 | | - // - m_iter->key() returned $KEY |
124 | | - // - other transaction(s) have inserted row $ROW before the $KEY. |
125 | | - // - we got a range lock on [range_start, $KEY] |
126 | | - // - we've read $ROW and returned. |
127 | | - // Now, we're looking to lock [$ROW, $KEY] but we don't need to, |
128 | | - // we already have a lock on this range. |
129 | | - } else { |
130 | | - if (forward) { |
131 | | - m_status = m_txn->GetRangeLock(m_cfh, |
132 | | - rocksdb::Endpoint(target, endp_arg), |
133 | | - rocksdb::Endpoint(end_key, endp_arg)); |
134 | | - } else { |
135 | | - m_status = m_txn->GetRangeLock(m_cfh, |
136 | | - rocksdb::Endpoint(end_key, endp_arg), |
137 | | - rocksdb::Endpoint(target, endp_arg)); |
138 | | - } |
139 | | - |
140 | | - if (!m_status.ok()) |
141 | | - return; |
142 | | - |
143 | | - // Save the bound where we locked until: |
144 | | - m_have_locked_until= true; |
145 | | - m_locked_until.assign(end_key.data(), end_key.size()); |
146 | | - if (m_lock_count) (*m_lock_count)++; |
147 | | - } |
148 | | - } |
149 | | - |
150 | | - template <bool forward> |
151 | | - void lock_till_iterator_end(const rocksdb::Slice& target) { |
152 | | - rocksdb::Slice end; |
153 | | - uchar buf[Rdb_key_def::INDEX_NUMBER_SIZE]; |
154 | | - uint size; |
155 | | - if (forward) { |
156 | | - if (m_read_opts.iterate_upper_bound) |
157 | | - end = *m_read_opts.iterate_upper_bound; |
158 | | - else { |
159 | | - if (m_kd->m_is_reverse_cf) |
160 | | - m_kd->get_infimum_key(buf, &size); |
161 | | - else |
162 | | - m_kd->get_supremum_key(buf, &size); |
163 | | - |
164 | | - DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE); |
165 | | - end = rocksdb::Slice((const char*)buf, size); |
166 | | - } |
167 | | - } else { |
168 | | - if (m_read_opts.iterate_lower_bound) |
169 | | - end = *m_read_opts.iterate_lower_bound; |
170 | | - else { |
171 | | - if (m_kd->m_is_reverse_cf) |
172 | | - m_kd->get_supremum_key(buf, &size); |
173 | | - else |
174 | | - m_kd->get_infimum_key(buf, &size); |
175 | | - |
176 | | - DBUG_ASSERT(size == Rdb_key_def::INDEX_NUMBER_SIZE); |
177 | | - end = rocksdb::Slice((const char*)buf, size); |
178 | | - } |
179 | | - } |
180 | | - // This will set m_status accordingly |
181 | | - lock_up_to<forward>(target, end); |
182 | | - } |
183 | | - |
184 | | - /* |
185 | | - Lock the range between [target, (current m_iter position)] and position |
186 | | - the iterator on the first record in it. |
187 | | -
|
188 | | - @param call_next false means current iterator position is achieved by |
189 | | - calling Seek(target). |
190 | | - true means one also needs to call Next() |
191 | | - */ |
192 | | - template <bool forward> void Scan(const rocksdb::Slice& target, |
193 | | - bool call_next) { |
194 | | - if (!m_iter->Valid()) { |
195 | | - m_status = m_iter->status(); |
196 | | - m_valid = false; |
197 | | - if (m_status.ok()) { |
198 | | - // m_iter has reached EOF |
199 | | - lock_till_iterator_end<forward>(target); |
200 | | - } |
201 | | - return; |
202 | | - } |
203 | | - |
204 | | - while (1) { |
205 | | - |
206 | | - DEBUG_SYNC(my_core::thd_get_current_thd(), "rocksdb.locking_iter_scan"); |
207 | | - |
208 | | - if (my_core::thd_killed(current_thd)) { |
209 | | - m_status = rocksdb::Status::Aborted(); |
210 | | - m_valid = false; |
211 | | - return; |
212 | | - } |
213 | | - |
214 | | - const int inv = forward ? 1 : -1; |
215 | | - auto cmp= m_cfh->GetComparator(); |
216 | | - |
217 | | - auto end_key = m_iter->key(); |
218 | | - std::string end_key_copy= end_key.ToString(); |
219 | | - |
220 | | - lock_up_to<forward>(target, end_key); |
221 | | - if (!m_status.ok()) { |
222 | | - // Failed to get a lock (most likely lock wait timeout) |
223 | | - m_valid = false; |
224 | | - return; |
225 | | - } |
226 | | - |
227 | | - //Ok, now we have a lock which is inhibiting modifications in the range |
228 | | - // Somebody might have done external modifications, though: |
229 | | - // - removed the key we've found |
230 | | - // - added a key before that key. |
231 | | - |
232 | | - // First, refresh the iterator: |
233 | | - delete m_iter; |
234 | | - m_iter = m_txn->GetIterator(m_read_opts, m_cfh); |
235 | | - |
236 | | - // Then, try seeking to the same row |
237 | | - if (forward) |
238 | | - m_iter->Seek(target); |
239 | | - else |
240 | | - m_iter->SeekForPrev(target); |
241 | | - |
242 | | - if (call_next && m_iter->Valid() && |
243 | | - !cmp->Compare(m_iter->key(), target)) { |
244 | | - if (forward) |
245 | | - m_iter->Next(); |
246 | | - else |
247 | | - m_iter->Prev(); |
248 | | - } |
249 | | - |
250 | | - if (m_iter->Valid()) { |
251 | | - if (cmp->Compare(m_iter->key(), rocksdb::Slice(end_key_copy))*inv <= 0) { |
252 | | - // Ok, the found key is within the locked range. |
253 | | - m_status = rocksdb::Status::OK(); |
254 | | - m_valid= true; |
255 | | - break; |
256 | | - } else { |
257 | | - // We've got a key but it is outside the range we've locked. |
258 | | - // Re-try the lock-and-read step. |
259 | | - continue; |
260 | | - } |
261 | | - } else { |
262 | | - // Some error |
263 | | - m_valid = false; |
264 | | - m_status = m_iter->status(); |
265 | | - if (m_status.ok()) { |
266 | | - // m_iter has reached EOF |
267 | | - lock_till_iterator_end<forward>(target); |
268 | | - } |
269 | | - break; |
270 | | - } |
271 | | - } |
272 | | - } |
| 99 | + void lock_up_to(bool scan_forward, const rocksdb::Slice& target, |
| 100 | + const rocksdb::Slice& end_key); |
| 101 | + void lock_till_iterator_end(bool scan_forward, const rocksdb::Slice& target); |
| 102 | + void Scan(bool scan_forward, const rocksdb::Slice& target, bool call_next); |
273 | 103 |
|
274 | 104 | inline void ScanForward(const rocksdb::Slice& target, bool call_next) { |
275 | | - Scan<true>(target, call_next); |
| 105 | + Scan(true, target, call_next); |
276 | 106 | } |
277 | 107 |
|
278 | 108 | inline void ScanBackward(const rocksdb::Slice& target, bool call_next) { |
279 | | - Scan<false>(target, call_next); |
| 109 | + Scan(false, target, call_next); |
280 | 110 | } |
281 | 111 | }; |
282 | 112 |
|
|
0 commit comments