Skip to content

Commit 577c1d8

Browse files
apkardongxinEric
authored andcommitted
Fixing FlowLock::take() calls. (#171)
`take(1)` doesn't just acquire one permit, instead it acquires permit with lowest priority possible. First argument of `take()` is not number of permits, but task priority.
1 parent 194d376 commit 577c1d8

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

src/QLContext.actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ struct CompoundIndexPlugin : IndexPlugin, ReferenceCounted<CompoundIndexPlugin>,
363363
// one finished duplication detecting and index record writing. And thus the following section
364364
// before the `lock.release()` call, needs to be protected using a mutex lock.
365365
ASSERT(self->flowControlLock.present());
366-
Void _ = wait(self->flowControlLock.get()->take(1));
366+
Void _ = wait(self->flowControlLock.get()->take());
367367
state FlowLock::Releaser releaser(*self->flowControlLock.get(), 1);
368368

369369
for (; nvv; ++nvv) {
@@ -489,7 +489,7 @@ struct SimpleIndexPlugin : IndexPlugin, ReferenceCounted<SimpleIndexPlugin>, Fas
489489
// one finished duplication detecting and index record writing. And thus the following section
490490
// before the `lock.release()` call, needs to be protected using a mutex lock.
491491
ASSERT(self->flowControlLock.present());
492-
Void _ = wait(self->flowControlLock.get()->take(1));
492+
Void _ = wait(self->flowControlLock.get()->take());
493493
state FlowLock::Releaser releaser(*self->flowControlLock.get(), 1);
494494

495495
for (const DataValue& v : new_values) {

src/QLPlan.actor.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ ACTOR static Future<Void> doSinglePKLookup(PlanCheckpoint* checkpoint,
520520
if (x >= scanBounds.begin && x < scanBounds.end) {
521521
Optional<DataValue> odv = wait(cx->cx->get(x));
522522
if (odv.present()) {
523-
Void _ = wait(flowControlLock->take(1));
523+
Void _ = wait(flowControlLock->take());
524524
dis.send(Reference<ScanReturnedContext>(new ScanReturnedContext(
525525
cx->cx->getSubContext(begin.encode_key_part()), scanID, StringRef(begin.encode_key_part()))));
526526
}
@@ -549,7 +549,7 @@ ACTOR static Future<Void> doPKScan(PlanCheckpoint* checkpoint,
549549
if (curPK.compare(lastPK)) {
550550
lastPK = Standalone<StringRef>(curPK, kv.arena());
551551
// We are adding a brand new document, so
552-
Void _ = wait(outputLock->take(1));
552+
Void _ = wait(outputLock->take());
553553
output.send(Reference<ScanReturnedContext>(
554554
new ScanReturnedContext(cx->cx->getSubContext(lastPK), scanID, Key(kv.key, kv.arena()))));
555555
}
@@ -715,7 +715,7 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
715715
loop choose {
716716
when(state Reference<ScanReturnedContext> doc =
717717
waitNext(docs)) { // throws end_of_stream when totally finished
718-
Void _ = wait(outerLock->take(1));
718+
Void _ = wait(outerLock->take());
719719
innerLock->release();
720720
output.send(doc);
721721
++nResults;
@@ -800,7 +800,7 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
800800
when(Void _ = wait(committingDocs.empty() ? Never() : committingDocs.front().second)) {
801801
bufferedDocs.push_back(committingDocs.front().first);
802802
committingDocs.pop_front();
803-
innerLock->release(1);
803+
innerLock->release();
804804
}
805805
when(Void _ = wait(timeout)) { break; }
806806
}
@@ -847,7 +847,7 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
847847
innerCheckpoint = next_checkpoint;
848848

849849
while (!bufferedDocs.empty()) {
850-
Void _ = wait(outerLock->take(1));
850+
Void _ = wait(outerLock->take());
851851
Reference<ScanReturnedContext> finishedDoc = bufferedDocs.front();
852852
output.send(finishedDoc);
853853
++oCount;
@@ -947,7 +947,7 @@ ACTOR static Future<Void> doRetry(Reference<Plan> subPlan,
947947
state Reference<ScanReturnedContext> r;
948948
for (const Reference<ScanReturnedContext>& loopThing : ret) {
949949
r = loopThing;
950-
Void _ = wait(outerLock->take(1));
950+
Void _ = wait(outerLock->take());
951951
output.send(r);
952952
}
953953
throw end_of_stream();
@@ -1390,7 +1390,7 @@ ACTOR static Future<Void> doIndexInsert(PlanCheckpoint* checkpoint,
13901390
Reference<MetadataManager> mm) {
13911391
state FlowLock* flowControlLock = checkpoint->getDocumentFinishedLock();
13921392
try {
1393-
Void _ = wait(flowControlLock->take(1));
1393+
Void _ = wait(flowControlLock->take());
13941394
state Reference<UnboundCollectionContext> mcx = wait(mm->getUnboundCollectionContext(tr, ns));
13951395
state Reference<UnboundCollectionContext> unbound = wait(mm->indexesCollection(tr, ns.first));
13961396
state Reference<Plan> getIndexesPlan = getIndexesForCollectionPlan(unbound, ns);

0 commit comments

Comments
 (0)