-
Notifications
You must be signed in to change notification settings - Fork 737
http_gateway: wake up poll when we released inflight buffers #28390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
80a7702
1c29e04
c3e08d0
c69cc25
5aa2498
08c33b8
44239f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |||||||||||||||||||||||||||||||||||||||||||
| #include <util/generic/yexception.h> | ||||||||||||||||||||||||||||||||||||||||||||
| #include <util/stream/str.h> | ||||||||||||||||||||||||||||||||||||||||||||
| #include <util/string/builder.h> | ||||||||||||||||||||||||||||||||||||||||||||
| #include <util/datetime/base.h> | ||||||||||||||||||||||||||||||||||||||||||||
| #include <yql/essentials/utils/log/log.h> | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| #include <thread> | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -480,6 +481,8 @@ class TEasyCurlStream : public TEasyCurl { | |||||||||||||||||||||||||||||||||||||||||||
| IHTTPGateway::TOnNewDataPart onNewData, | ||||||||||||||||||||||||||||||||||||||||||||
| IHTTPGateway::TOnDownloadFinish onFinish, | ||||||||||||||||||||||||||||||||||||||||||||
| const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, | ||||||||||||||||||||||||||||||||||||||||||||
| std::weak_ptr<CURLM> handle, | ||||||||||||||||||||||||||||||||||||||||||||
| size_t threshold, | ||||||||||||||||||||||||||||||||||||||||||||
| const TCurlInitConfig& config = TCurlInitConfig(), | ||||||||||||||||||||||||||||||||||||||||||||
| TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) | ||||||||||||||||||||||||||||||||||||||||||||
| : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, std::move(headers), EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache)) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -488,6 +491,8 @@ class TEasyCurlStream : public TEasyCurl { | |||||||||||||||||||||||||||||||||||||||||||
| , OnFinish(std::move(onFinish)) | ||||||||||||||||||||||||||||||||||||||||||||
| , Counter(std::make_shared<std::atomic_size_t>(0ULL)) | ||||||||||||||||||||||||||||||||||||||||||||
| , InflightCounter(inflightCounter) | ||||||||||||||||||||||||||||||||||||||||||||
| , Handle(std::move(handle)) | ||||||||||||||||||||||||||||||||||||||||||||
| , Threshold(threshold) | ||||||||||||||||||||||||||||||||||||||||||||
| {} | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| static TPtr Make( | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -502,10 +507,12 @@ class TEasyCurlStream : public TEasyCurl { | |||||||||||||||||||||||||||||||||||||||||||
| IHTTPGateway::TOnNewDataPart onNewData, | ||||||||||||||||||||||||||||||||||||||||||||
| IHTTPGateway::TOnDownloadFinish onFinish, | ||||||||||||||||||||||||||||||||||||||||||||
| const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, | ||||||||||||||||||||||||||||||||||||||||||||
| std::weak_ptr<CURLM> handle = {}, | ||||||||||||||||||||||||||||||||||||||||||||
| size_t threshold = 0, | ||||||||||||||||||||||||||||||||||||||||||||
| const TCurlInitConfig& config = TCurlInitConfig(), | ||||||||||||||||||||||||||||||||||||||||||||
| TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) | ||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||
| return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache)); | ||||||||||||||||||||||||||||||||||||||||||||
| return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, handle, threshold, std::move(config), std::move(dnsCache)); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| enum class EAction : i8 { | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -565,8 +572,9 @@ class TEasyCurlStream : public TEasyCurl { | |||||||||||||||||||||||||||||||||||||||||||
| size_t Write(void* contents, size_t size, size_t nmemb) final { | ||||||||||||||||||||||||||||||||||||||||||||
| MaybeStart(CURLE_OK); | ||||||||||||||||||||||||||||||||||||||||||||
| const auto realsize = size * nmemb; | ||||||||||||||||||||||||||||||||||||||||||||
| if (!Cancelled) | ||||||||||||||||||||||||||||||||||||||||||||
| OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter)); | ||||||||||||||||||||||||||||||||||||||||||||
| if (!Cancelled) { | ||||||||||||||||||||||||||||||||||||||||||||
| OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter, Handle, Threshold)); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| return realsize; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -583,6 +591,8 @@ class TEasyCurlStream : public TEasyCurl { | |||||||||||||||||||||||||||||||||||||||||||
| bool Paused = false; | ||||||||||||||||||||||||||||||||||||||||||||
| bool Cancelled = false; | ||||||||||||||||||||||||||||||||||||||||||||
| long HttpResponseCode = 0L; | ||||||||||||||||||||||||||||||||||||||||||||
| std::weak_ptr<CURLM> Handle; | ||||||||||||||||||||||||||||||||||||||||||||
| size_t Threshold; | ||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IHTTPGateway::TRetryPolicy::TPtr>; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -676,7 +686,7 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| ~THTTPMultiGateway() { | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_wakeup(Handle); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_wakeup(Handle.get()); | ||||||||||||||||||||||||||||||||||||||||||||
| IsStopped = true; | ||||||||||||||||||||||||||||||||||||||||||||
| if (Thread.joinable()) { | ||||||||||||||||||||||||||||||||||||||||||||
| Thread.join(); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -695,17 +705,23 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| if (globalInitResult != CURLE_OK) { | ||||||||||||||||||||||||||||||||||||||||||||
| throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| Handle = curl_multi_init(); | ||||||||||||||||||||||||||||||||||||||||||||
| Handle = std::shared_ptr<CURLM>(curl_multi_init(), [](auto handle) { | ||||||||||||||||||||||||||||||||||||||||||||
| const CURLMcode multiCleanupResult = curl_multi_cleanup(handle); | ||||||||||||||||||||||||||||||||||||||||||||
| if (multiCleanupResult != CURLM_OK) { | ||||||||||||||||||||||||||||||||||||||||||||
| Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||
| if (!Handle) { | ||||||||||||||||||||||||||||||||||||||||||||
| throw yexception() << "curl_multi_init error"; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| void UninitCurl() { | ||||||||||||||||||||||||||||||||||||||||||||
| Y_ABORT_UNLESS(Handle); | ||||||||||||||||||||||||||||||||||||||||||||
| const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle); | ||||||||||||||||||||||||||||||||||||||||||||
| if (multiCleanupResult != CURLM_OK) { | ||||||||||||||||||||||||||||||||||||||||||||
| Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; | ||||||||||||||||||||||||||||||||||||||||||||
| auto weakHandle = std::weak_ptr<CURLM>(Handle); | ||||||||||||||||||||||||||||||||||||||||||||
| Handle.reset(); | ||||||||||||||||||||||||||||||||||||||||||||
| while (!weakHandle.expired()) { // short busy-wait in unlikely case of collision with TCountedContent | ||||||||||||||||||||||||||||||||||||||||||||
| Sleep(TDuration::MicroSeconds(1)); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| curl_global_cleanup(); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -722,22 +738,22 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| OutputMemory->Set(OutputSize); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| int running = 0; | ||||||||||||||||||||||||||||||||||||||||||||
| if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) { | ||||||||||||||||||||||||||||||||||||||||||||
| if (const auto c = curl_multi_perform(Handle.get(), &running); CURLM_OK != c) { | ||||||||||||||||||||||||||||||||||||||||||||
| Fail(c); | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| if (running < int(handlers)) { | ||||||||||||||||||||||||||||||||||||||||||||
| for (int messages = int(handlers) - running; messages;) { | ||||||||||||||||||||||||||||||||||||||||||||
| if (const auto msg = curl_multi_info_read(Handle, &messages)) { | ||||||||||||||||||||||||||||||||||||||||||||
| if (const auto msg = curl_multi_info_read(Handle.get(), &messages)) { | ||||||||||||||||||||||||||||||||||||||||||||
| if(msg->msg == CURLMSG_DONE) { | ||||||||||||||||||||||||||||||||||||||||||||
| Done(msg->easy_handle, msg->data.result); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||
| const int timeoutMs = 300; | ||||||||||||||||||||||||||||||||||||||||||||
| if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { | ||||||||||||||||||||||||||||||||||||||||||||
| if (const auto c = curl_multi_poll(Handle.get(), nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { | ||||||||||||||||||||||||||||||||||||||||||||
| Fail(c); | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -752,7 +768,7 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| const auto streamHandle = stream->GetHandle(); | ||||||||||||||||||||||||||||||||||||||||||||
| switch (stream->GetAction(BuffersSizePerStream)) { | ||||||||||||||||||||||||||||||||||||||||||||
| case TEasyCurlStream::EAction::Init: | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_add_handle(Handle, streamHandle); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_add_handle(Handle.get(), streamHandle); | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| case TEasyCurlStream::EAction::Work: | ||||||||||||||||||||||||||||||||||||||||||||
| curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -761,7 +777,7 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE); | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| case TEasyCurlStream::EAction::Drop: | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_remove_handle(Handle, streamHandle); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_remove_handle(Handle.get(), streamHandle); | ||||||||||||||||||||||||||||||||||||||||||||
| Allocated.erase(streamHandle); | ||||||||||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||||||||||
| case TEasyCurlStream::EAction::None: | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -784,7 +800,7 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| const auto handle = Await.front()->GetHandle(); | ||||||||||||||||||||||||||||||||||||||||||||
| Allocated.emplace(handle, std::move(Await.front())); | ||||||||||||||||||||||||||||||||||||||||||||
| Await.pop(); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_add_handle(Handle, handle); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_add_handle(Handle.get(), handle); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| AwaitQueue->Set(Await.size()); | ||||||||||||||||||||||||||||||||||||||||||||
| AllocatedMemory->Set(AllocatedSize); | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -859,7 +875,7 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| const TIssue error(curl_multi_strerror(result)); | ||||||||||||||||||||||||||||||||||||||||||||
| while (!works.empty()) { | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_remove_handle(Handle, works.top()->GetHandle()); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_remove_handle(Handle.get(), works.top()->GetHandle()); | ||||||||||||||||||||||||||||||||||||||||||||
| works.top()->Fail(CURLE_OK, error); | ||||||||||||||||||||||||||||||||||||||||||||
| works.pop(); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -914,7 +930,7 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| TOnDownloadFinish onFinish, | ||||||||||||||||||||||||||||||||||||||||||||
| const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final | ||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||
| auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList()); | ||||||||||||||||||||||||||||||||||||||||||||
| auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, Handle, BuffersSizePerStream, InitConfig, DnsGateway.GetDNSCurlList()); | ||||||||||||||||||||||||||||||||||||||||||||
| const std::unique_lock lock(SyncRef()); | ||||||||||||||||||||||||||||||||||||||||||||
| const auto handle = stream->GetHandle(); | ||||||||||||||||||||||||||||||||||||||||||||
| TEasyCurlStream::TWeakPtr weak = stream; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -942,20 +958,20 @@ friend class IHTTPGateway; | |||||||||||||||||||||||||||||||||||||||||||
| void Wakeup(size_t sizeLimit) { | ||||||||||||||||||||||||||||||||||||||||||||
| AwaitQueue->Set(Await.size()); | ||||||||||||||||||||||||||||||||||||||||||||
| if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) { | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_wakeup(Handle); | ||||||||||||||||||||||||||||||||||||||||||||
| curl_multi_wakeup(Handle.get()); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| CURLM* GetHandle() const { | ||||||||||||||||||||||||||||||||||||||||||||
| return Handle; | ||||||||||||||||||||||||||||||||||||||||||||
| return Handle.get(); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| private: | ||||||||||||||||||||||||||||||||||||||||||||
| std::mutex& SyncRef() { | ||||||||||||||||||||||||||||||||||||||||||||
| return *Sync; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| CURLM* Handle = nullptr; | ||||||||||||||||||||||||||||||||||||||||||||
| std::shared_ptr<CURLM> Handle; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| std::queue<TEasyCurlBuffer::TPtr> Await; | ||||||||||||||||||||||||||||||||||||||||||||
| std::vector<TEasyCurlStream::TWeakPtr> Streams; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1043,28 +1059,34 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con | |||||||||||||||||||||||||||||||||||||||||||
| {} | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, | ||||||||||||||||||||||||||||||||||||||||||||
| const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) | ||||||||||||||||||||||||||||||||||||||||||||
| const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold) | ||||||||||||||||||||||||||||||||||||||||||||
| : TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter) | ||||||||||||||||||||||||||||||||||||||||||||
| , Handle(handle), Threshold(threshold) | ||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||
| Counter->fetch_add(size()); | ||||||||||||||||||||||||||||||||||||||||||||
| if (InflightCounter) { | ||||||||||||||||||||||||||||||||||||||||||||
| InflightCounter->Add(size()); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| IHTTPGateway::TCountedContent::~TCountedContent() | ||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||
| Counter->fetch_sub(size()); | ||||||||||||||||||||||||||||||||||||||||||||
| void IHTTPGateway::TCountedContent::BeforeRelease() { | ||||||||||||||||||||||||||||||||||||||||||||
| auto oldSize = Counter->fetch_sub(size()); | ||||||||||||||||||||||||||||||||||||||||||||
| if (oldSize >= Threshold && oldSize - size() < Threshold) { | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1070
to
+1071
|
||||||||||||||||||||||||||||||||||||||||||||
| auto oldSize = Counter->fetch_sub(size()); | |
| if (oldSize >= Threshold && oldSize - size() < Threshold) { | |
| size_t expected = Counter->load(); | |
| bool triggered = false; | |
| while (true) { | |
| if (expected < Threshold || expected - size() >= Threshold) { | |
| // No threshold crossing, just subtract | |
| if (Counter->compare_exchange_weak(expected, expected - size())) { | |
| break; | |
| } | |
| // compare_exchange_weak updates expected, so loop | |
| } else { | |
| // Threshold crossing, only one thread should trigger | |
| if (Counter->compare_exchange_weak(expected, expected - size())) { | |
| triggered = true; | |
| break; | |
| } | |
| // compare_exchange_weak updates expected, so loop | |
| } | |
| } | |
| if (triggered) { |
Copilot
AI
Nov 7, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The threshold check logic has a potential race condition. Between checking oldSize >= Threshold and evaluating oldSize - size() < Threshold, another thread could modify the Counter, making the condition incorrect. This is compounded by the fact that oldSize is the value after the subtraction (fetch_sub returns the previous value), so oldSize - size() is actually the new value after this operation.
However, there's a more fundamental issue: if Threshold is 0 (the default), the condition oldSize >= Threshold && oldSize - size() < Threshold would trigger on every destruction where size() > 0, potentially causing excessive wake-up calls. Consider adding a check: if (Threshold > 0 && oldSize >= Threshold && oldSize - size() < Threshold)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Counter is sum of all size(), hence both should not be issue.
Anyway, rare spurious (or even missed) wake-up should not be issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
путь к зависаниям. давай таймаут сделаем
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Последствия досрочного выхода из этого цикла хуже, чем бесконечное ожидание. И тут никогда не может быть таймаут, обработка таймаута создаст ложное впечатление, что он возможен