Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <util/generic/yexception.h>
#include <util/stream/str.h>
#include <util/string/builder.h>
#include <util/datetime/base.h>
#include <yql/essentials/utils/log/log.h>

#include <thread>
Expand Down Expand Up @@ -480,6 +481,8 @@ class TEasyCurlStream : public TEasyCurl {
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
std::weak_ptr<CURLM> handle,
size_t threshold,
const TCurlInitConfig& config = TCurlInitConfig(),
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
: TEasyCurl(counter, downloadedBytes, uploadededBytes, url, std::move(headers), EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache))
Expand All @@ -488,6 +491,8 @@ class TEasyCurlStream : public TEasyCurl {
, OnFinish(std::move(onFinish))
, Counter(std::make_shared<std::atomic_size_t>(0ULL))
, InflightCounter(inflightCounter)
, Handle(std::move(handle))
, Threshold(threshold)
{}

static TPtr Make(
Expand All @@ -502,10 +507,12 @@ class TEasyCurlStream : public TEasyCurl {
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
std::weak_ptr<CURLM> handle = {},
size_t threshold = 0,
const TCurlInitConfig& config = TCurlInitConfig(),
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
{
return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache));
return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, handle, threshold, std::move(config), std::move(dnsCache));
}

enum class EAction : i8 {
Expand Down Expand Up @@ -565,8 +572,9 @@ class TEasyCurlStream : public TEasyCurl {
size_t Write(void* contents, size_t size, size_t nmemb) final {
MaybeStart(CURLE_OK);
const auto realsize = size * nmemb;
if (!Cancelled)
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter));
if (!Cancelled) {
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter, Handle, Threshold));
}
return realsize;
}

Expand All @@ -583,6 +591,8 @@ class TEasyCurlStream : public TEasyCurl {
bool Paused = false;
bool Cancelled = false;
long HttpResponseCode = 0L;
std::weak_ptr<CURLM> Handle;
size_t Threshold;
};

using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IHTTPGateway::TRetryPolicy::TPtr>;
Expand Down Expand Up @@ -676,8 +686,8 @@ friend class IHTTPGateway;
}

~THTTPMultiGateway() {
curl_multi_wakeup(Handle);
IsStopped = true;
curl_multi_wakeup(Handle.get());
if (Thread.joinable()) {
Thread.join();
}
Expand All @@ -691,23 +701,26 @@ friend class IHTTPGateway;
TCurlInitConfig InitConfig;

void InitCurl() {
// FIXME: NOT SAFE (see man libcurl(3))
const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL);
Comment on lines +704 to 705
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FIXME comment indicates a known safety issue with curl_global_init but doesn't explain what the problem is or how it should be fixed. Consider documenting that curl_global_init is not thread-safe and should be called only once before any threads are started, or reference the specific safety concerns from the libcurl documentation.

Copilot uses AI. Check for mistakes.
if (globalInitResult != CURLE_OK) {
throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl;
}
Handle = curl_multi_init();
Handle = std::shared_ptr<CURLM>(curl_multi_init(), [](auto handle) {
const CURLMcode multiCleanupResult = curl_multi_cleanup(handle);
if (multiCleanupResult != CURLM_OK) {
Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
}
curl_global_cleanup(); // FIXME: NOT SAFE (see man libcurl(3))
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FIXME comment indicates a known safety issue with curl_global_cleanup but doesn't explain what the problem is. Consider documenting that curl_global_cleanup must be called only after all libcurl operations have completed and all handles are destroyed, and it's not thread-safe. This is particularly important since the cleanup is now in a shared_ptr deleter.

Suggested change
curl_global_cleanup(); // FIXME: NOT SAFE (see man libcurl(3))
// WARNING: curl_global_cleanup() must only be called after all libcurl operations have completed
// and all easy and multi handles have been destroyed. This function is not thread-safe and must not
// be called concurrently with any other libcurl function. Since this cleanup is performed in the
// shared_ptr deleter, it is critical to ensure that no other libcurl handles are in use and no other
// threads are using libcurl when this is called. See man libcurl(3) and the libcurl documentation for details.
curl_global_cleanup();

Copilot uses AI. Check for mistakes.
});
if (!Handle) {
throw yexception() << "curl_multi_init error";
}
}

void UninitCurl() {
Y_ABORT_UNLESS(Handle);
const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle);
if (multiCleanupResult != CURLM_OK) {
Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
}
curl_global_cleanup();
Handle.reset();
}

void Perform() {
Expand All @@ -722,22 +735,22 @@ friend class IHTTPGateway;
OutputMemory->Set(OutputSize);

int running = 0;
if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) {
if (const auto c = curl_multi_perform(Handle.get(), &running); CURLM_OK != c) {
Fail(c);
break;
}

if (running < int(handlers)) {
for (int messages = int(handlers) - running; messages;) {
if (const auto msg = curl_multi_info_read(Handle, &messages)) {
if (const auto msg = curl_multi_info_read(Handle.get(), &messages)) {
if(msg->msg == CURLMSG_DONE) {
Done(msg->easy_handle, msg->data.result);
}
}
}
} else {
const int timeoutMs = 300;
if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
if (const auto c = curl_multi_poll(Handle.get(), nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
Fail(c);
break;
}
Expand All @@ -752,7 +765,7 @@ friend class IHTTPGateway;
const auto streamHandle = stream->GetHandle();
switch (stream->GetAction(BuffersSizePerStream)) {
case TEasyCurlStream::EAction::Init:
curl_multi_add_handle(Handle, streamHandle);
curl_multi_add_handle(Handle.get(), streamHandle);
break;
case TEasyCurlStream::EAction::Work:
curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT);
Expand All @@ -761,7 +774,7 @@ friend class IHTTPGateway;
curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE);
break;
case TEasyCurlStream::EAction::Drop:
curl_multi_remove_handle(Handle, streamHandle);
curl_multi_remove_handle(Handle.get(), streamHandle);
Allocated.erase(streamHandle);
break;
case TEasyCurlStream::EAction::None:
Expand All @@ -784,7 +797,7 @@ friend class IHTTPGateway;
const auto handle = Await.front()->GetHandle();
Allocated.emplace(handle, std::move(Await.front()));
Await.pop();
curl_multi_add_handle(Handle, handle);
curl_multi_add_handle(Handle.get(), handle);
}
AwaitQueue->Set(Await.size());
AllocatedMemory->Set(AllocatedSize);
Expand Down Expand Up @@ -859,7 +872,7 @@ friend class IHTTPGateway;

const TIssue error(curl_multi_strerror(result));
while (!works.empty()) {
curl_multi_remove_handle(Handle, works.top()->GetHandle());
curl_multi_remove_handle(Handle.get(), works.top()->GetHandle());
works.top()->Fail(CURLE_OK, error);
works.pop();
}
Expand Down Expand Up @@ -914,7 +927,7 @@ friend class IHTTPGateway;
TOnDownloadFinish onFinish,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
{
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList());
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, Handle, BuffersSizePerStream, InitConfig, DnsGateway.GetDNSCurlList());
const std::unique_lock lock(SyncRef());
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
Expand Down Expand Up @@ -942,20 +955,20 @@ friend class IHTTPGateway;
void Wakeup(size_t sizeLimit) {
AwaitQueue->Set(Await.size());
if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
curl_multi_wakeup(Handle);
curl_multi_wakeup(Handle.get());
}
}

CURLM* GetHandle() const {
return Handle;
return Handle.get();
}

private:
std::mutex& SyncRef() {
return *Sync;
}

CURLM* Handle = nullptr;
std::shared_ptr<CURLM> Handle;

std::queue<TEasyCurlBuffer::TPtr> Await;
std::vector<TEasyCurlStream::TWeakPtr> Streams;
Expand Down Expand Up @@ -1043,28 +1056,34 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con
{}

IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter)
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold)
: TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter)
, Handle(handle), Threshold(threshold)
{
Counter->fetch_add(size());
if (InflightCounter) {
InflightCounter->Add(size());
}
}

IHTTPGateway::TCountedContent::~TCountedContent()
{
Counter->fetch_sub(size());
void IHTTPGateway::TCountedContent::BeforeRelease() {
auto oldSize = Counter->fetch_sub(size());
if (oldSize >= Threshold && oldSize - size() < Threshold) {
Comment on lines +1070 to +1071
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition here. Between fetch_sub and checking the threshold, another thread could also call fetch_sub, causing multiple wakeup calls even though only one thread actually crossed the threshold. While extra wakeups are generally harmless, consider if this could cause performance issues under high concurrency. If so, consider using compare-and-exchange to ensure only one thread triggers the wakeup when crossing the threshold.

Suggested change
auto oldSize = Counter->fetch_sub(size());
if (oldSize >= Threshold && oldSize - size() < Threshold) {
size_t expected = Counter->load();
bool triggered = false;
while (true) {
if (expected < Threshold || expected - size() >= Threshold) {
// No threshold crossing, just subtract
if (Counter->compare_exchange_weak(expected, expected - size())) {
break;
}
// compare_exchange_weak updates expected, so loop
} else {
// Threshold crossing, only one thread should trigger
if (Counter->compare_exchange_weak(expected, expected - size())) {
triggered = true;
break;
}
// compare_exchange_weak updates expected, so loop
}
}
if (triggered) {

Copilot uses AI. Check for mistakes.
if (auto handle = Handle.lock()) {
curl_multi_wakeup(handle.get());
}
}
Comment on lines +1071 to +1075
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threshold check logic has a potential race condition. Between checking oldSize >= Threshold and evaluating oldSize - size() < Threshold, another thread could modify the Counter, making the condition incorrect. This is compounded by the fact that oldSize is the value after the subtraction (fetch_sub returns the previous value), so oldSize - size() is actually the new value after this operation.

However, there's a more fundamental issue: if Threshold is 0 (the default), the condition oldSize >= Threshold && oldSize - size() < Threshold would trigger on every destruction where size() > 0, potentially causing excessive wake-up calls. Consider adding a check: if (Threshold > 0 && oldSize >= Threshold && oldSize - size() < Threshold)

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter is sum of all size(), hence both should not be issue.
Anyway, rare spurious (or even missed) wake-up should not be issue.

if (InflightCounter) {
InflightCounter->Sub(size());
}
}

IHTTPGateway::TCountedContent::~TCountedContent() {
BeforeRelease();
}

TString IHTTPGateway::TCountedContent::Extract() {
Counter->fetch_sub(size());
if (InflightCounter) {
InflightCounter->Sub(size());
}
BeforeRelease();
return TContentBase::Extract();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,20 @@ class IHTTPGateway {

class TCountedContent : public TContentBase {
public:
TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter);
TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold);
~TCountedContent();

TCountedContent(TCountedContent&&) = default;
TCountedContent& operator=(TCountedContent&& src) = default;

TString Extract();
private:
void BeforeRelease();

const std::shared_ptr<std::atomic_size_t> Counter;
const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter;
std::weak_ptr<CURLM> Handle;
const size_t Threshold;
};

using TOnDownloadStart = std::function<void(CURLcode, long)>; // http code.
Expand Down
Loading