diff --git a/.github/workflows/ci-build-binary-artifacts.yaml b/.github/workflows/ci-build-binary-artifacts.yaml index a2b7be84..63644e5a 100644 --- a/.github/workflows/ci-build-binary-artifacts.yaml +++ b/.github/workflows/ci-build-binary-artifacts.yaml @@ -148,6 +148,7 @@ jobs: mkdir -p $BUILD_DIR cmake -B $BUILD_DIR \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET=${{ matrix.triplet }} \ -DCMAKE_INSTALL_PREFIX=${{ env.INSTALL_DIR }} \ @@ -174,6 +175,7 @@ jobs: mkdir -p $BUILD_DIR cmake -B $BUILD_DIR \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET=${{ matrix.triplet }} \ -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR_DEBUG \ diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 50a5c807..56309e9a 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -191,6 +191,7 @@ jobs: cmake \ -B ./build-1 \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET="${{ matrix.triplet }}" \ -DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \ @@ -232,6 +233,7 @@ jobs: cmake \ -B ./build-2 \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET="${{ matrix.triplet }}" \ -DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \ diff --git a/CMakeLists.txt b/CMakeLists.txt index 5bde2b77..662e84af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,8 +19,11 @@ cmake_minimum_required(VERSION 3.13) +option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) + option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) if (INTEGRATE_VCPKG) + set(USE_ASIO ON) set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") endif () @@ -129,6 +132,10 @@ if (INTEGRATE_VCPKG) $,zstd::libzstd_shared,zstd::libzstd_static> Snappy::snappy ) + if (USE_ASIO) + find_package(asio CONFIG REQUIRED) + set(COMMON_LIBS ${COMMON_LIBS} asio::asio) + endif () add_definitions(-DHAS_ZSTD -DHAS_SNAPPY) if (MSVC) find_package(dlfcn-win32 CONFIG REQUIRED) @@ -140,6 +147,10 @@ else () include(./LegacyFindPackages.cmake) endif () +if (USE_ASIO) + add_definitions(-DUSE_ASIO) +endif () + set(LIB_NAME $ENV{PULSAR_LIBRARY_NAME}) if (NOT LIB_NAME) set(LIB_NAME pulsar) diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake index 10c9fa7c..5004545b 100644 --- a/LegacyFindPackages.cmake +++ b/LegacyFindPackages.cmake @@ -176,10 +176,6 @@ if (Boost_MAJOR_VERSION EQUAL 1 AND Boost_MINOR_VERSION LESS 69) MESSAGE(STATUS "Linking with Boost:System") endif() -if (MSVC) - set(BOOST_COMPONENTS ${BOOST_COMPONENTS} date_time) -endif() - if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9) # GCC 4.8.2 implementation of std::regex is buggy set(BOOST_COMPONENTS ${BOOST_COMPONENTS} regex) diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc index d90cc874..7233b2c9 100644 --- a/lib/AckGroupingTrackerEnabled.cc +++ b/lib/AckGroupingTrackerEnabled.cc @@ -117,7 +117,7 @@ void AckGroupingTrackerEnabled::close() { this->flush(); std::lock_guard lock(this->mutexTimer_); if (this->timer_) { - boost::system::error_code ec; + ASIO_ERROR ec; this->timer_->cancel(ec); } } @@ -168,9 +168,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() { std::lock_guard lock(this->mutexTimer_); this->timer_ = this->executor_->createDeadlineTimer(); - this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); + this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); auto self = shared_from_this(); - this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void { + this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void { if (!ec) { this->flush(); this->scheduleTimer(); diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h index ec1d66be..b04f4059 100644 --- a/lib/AckGroupingTrackerEnabled.h +++ b/lib/AckGroupingTrackerEnabled.h @@ -22,18 +22,17 @@ #include #include -#include #include #include #include #include "AckGroupingTracker.h" +#include "AsioTimer.h" namespace pulsar { class ClientImpl; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; class HandlerBase; diff --git a/lib/AsioDefines.h b/lib/AsioDefines.h new file mode 100644 index 00000000..2e89812c --- /dev/null +++ b/lib/AsioDefines.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// This header defines common macros to use Asio or Boost.Asio. +#pragma once + +#ifdef USE_ASIO +#define ASIO ::asio +#define ASIO_ERROR asio::error_code +#define ASIO_SUCCESS (ASIO_ERROR{}) +#define ASIO_SYSTEM_ERROR asio::system_error +#else +#define ASIO boost::asio +#define ASIO_ERROR boost::system::error_code +#define ASIO_SUCCESS boost::system::errc::make_error_code(boost::system::errc::success) +#define ASIO_SYSTEM_ERROR boost::system::system_error +#endif diff --git a/lib/TimeUtils.cc b/lib/AsioTimer.h similarity index 71% rename from lib/TimeUtils.cc rename to lib/AsioTimer.h index 7eecb86b..d0c3de58 100644 --- a/lib/TimeUtils.cc +++ b/lib/AsioTimer.h @@ -16,17 +16,16 @@ * specific language governing permissions and limitations * under the License. */ +#pragma once -#include "TimeUtils.h" +#ifdef USE_ASIO +#include +#else +#include +#endif -namespace pulsar { +#include -ptime TimeUtils::now() { return microsec_clock::universal_time(); } +#include "AsioDefines.h" -int64_t TimeUtils::currentTimeMillis() { - static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1)); - - time_duration diff = now() - time_t_epoch; - return diff.total_milliseconds(); -} -} // namespace pulsar \ No newline at end of file +using DeadlineTimerPtr = std::shared_ptr; diff --git a/lib/Backoff.cc b/lib/Backoff.cc index fdb13592..e2c43d1c 100644 --- a/lib/Backoff.cc +++ b/lib/Backoff.cc @@ -21,6 +21,9 @@ #include /* time */ #include +#include + +#include "TimeUtils.h" namespace pulsar { @@ -33,8 +36,8 @@ TimeDuration Backoff::next() { // Check for mandatory stop if (!mandatoryStopMade_) { - const boost::posix_time::ptime& now = boost::posix_time::microsec_clock::universal_time(); - TimeDuration timeElapsedSinceFirstBackoff = boost::posix_time::milliseconds(0); + auto now = TimeUtils::now(); + TimeDuration timeElapsedSinceFirstBackoff = std::chrono::nanoseconds(0); if (initial_ == current) { firstBackoffTime_ = now; } else { diff --git a/lib/Backoff.h b/lib/Backoff.h index a59c00f5..d9d7fae3 100644 --- a/lib/Backoff.h +++ b/lib/Backoff.h @@ -20,12 +20,12 @@ #define _PULSAR_BACKOFF_HEADER_ #include -#include +#include #include -namespace pulsar { +#include "TimeUtils.h" -using TimeDuration = boost::posix_time::time_duration; +namespace pulsar { class PULSAR_PUBLIC Backoff { public: @@ -38,7 +38,7 @@ class PULSAR_PUBLIC Backoff { const TimeDuration max_; TimeDuration next_; TimeDuration mandatoryStop_; - boost::posix_time::ptime firstBackoffTime_; + decltype(std::chrono::high_resolution_clock::now()) firstBackoffTime_; std::mt19937 rng_; bool mandatoryStopMade_ = false; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 61aa7f73..82ab4928 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -23,6 +23,7 @@ #include #include +#include "AsioDefines.h" #include "Commands.h" #include "ConnectionPool.h" #include "ConsumerImpl.h" @@ -39,7 +40,7 @@ DECLARE_LOG_OBJECT() -using namespace boost::asio::ip; +using namespace ASIO::ip; namespace pulsar { @@ -162,19 +163,13 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication, const std::string& clientVersion, ConnectionPool& pool, size_t poolIndex) - : operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())), + : operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())), authentication_(authentication), serverProtocolVersion_(proto::ProtocolVersion_MIN), executor_(executor), resolver_(executor_->createTcpResolver()), socket_(executor_->createSocket()), -#if BOOST_VERSION >= 107000 - strand_(boost::asio::make_strand(executor_->getIOService().get_executor())), -#elif BOOST_VERSION >= 106600 - strand_(executor_->getIOService().get_executor()), -#else - strand_(executor_->getIOService()), -#endif + strand_(ASIO::make_strand(executor_->getIOService().get_executor())), logicalAddress_(logicalAddress), physicalAddress_(physicalAddress), cnxString_("[ -> " + physicalAddress + "] "), @@ -203,11 +198,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: } if (clientConfiguration.isUseTls()) { -#if BOOST_VERSION >= 105400 - boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client); -#else - boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client); -#endif + ASIO::ssl::context ctx(ASIO::ssl::context::tlsv12_client); Url serviceUrl; Url proxyUrl; Url::parse(physicalAddress, serviceUrl); @@ -219,10 +210,10 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_); } if (clientConfiguration.isTlsAllowInsecureConnection()) { - ctx.set_verify_mode(boost::asio::ssl::context::verify_none); + ctx.set_verify_mode(ASIO::ssl::context::verify_none); isTlsAllowInsecureConnection_ = true; } else { - ctx.set_verify_mode(boost::asio::ssl::context::verify_peer); + ctx.set_verify_mode(ASIO::ssl::context::verify_peer); std::string trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath(); if (!trustCertFilePath.empty()) { @@ -252,12 +243,12 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); throw ResultAuthenticationError; } - ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem); - ctx.use_certificate_file(tlsCertificates, boost::asio::ssl::context::pem); + ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); + ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem); } else { if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) { - ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem); - ctx.use_certificate_file(tlsCertificates, boost::asio::ssl::context::pem); + ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); + ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem); } } @@ -266,14 +257,13 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); - tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost)); + tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost)); } LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) { - boost::system::error_code ec{static_cast(::ERR_get_error()), - boost::asio::error::get_ssl_category()}; - LOG_ERROR(boost::system::system_error{ec}.what() << ": Error while setting TLS SNI"); + ASIO_ERROR ec{static_cast(::ERR_get_error()), ASIO::error::get_ssl_category()}; + LOG_ERROR(ec.message() << ": Error while setting TLS SNI"); return; } } @@ -310,9 +300,9 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC // Only send keep-alive probes if the broker supports it keepAliveTimer_ = executor_->createDeadlineTimer(); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds)); auto weakSelf = weak_from_this(); - keepAliveTimer_->async_wait([weakSelf](const boost::system::error_code&) { + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); if (self) { self->handleKeepAliveTimeout(); @@ -357,13 +347,12 @@ void ClientConnection::startConsumerStatsTimer(std::vector consumerSta if (consumerStatsRequestTimer_) { consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - consumerStatsRequestTimer_->async_wait( - [weakSelf, consumerStatsRequests](const boost::system::error_code& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleConsumerStatsTimeout(err, consumerStatsRequests); - } - }); + consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleConsumerStatsTimeout(err, consumerStatsRequests); + } + }); } lock.unlock(); // Complex logic since promises need to be fulfilled outside the lock @@ -375,19 +364,19 @@ void ClientConnection::startConsumerStatsTimer(std::vector consumerSta /// The number of unacknowledged probes to send before considering the connection dead and notifying the /// application layer -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_count; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_count; /// The interval between subsequential keepalive probes, regardless of what the connection has exchanged in /// the meantime -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_interval; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_interval; /// The interval between the last data packet sent (simple ACKs are not considered data) and the first /// keepalive /// probe; after the connection is marked to need keepalive, this counter is not used any further #ifdef __APPLE__ -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_idle; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_idle; #else -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_idle; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_idle; #endif /* @@ -396,15 +385,14 @@ typedef boost::asio::detail::socket_option::integer t * if async_connect without any error, connected_ would be set to true * at this point the connection is deemed valid to be used by clients of this class */ -void ClientConnection::handleTcpConnected(const boost::system::error_code& err, - tcp::resolver::iterator endpointIterator) { +void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { if (!err) { std::stringstream cnxStringStream; try { cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; cnxString_ = cnxStringStream.str(); - } catch (const boost::system::system_error& e) { + } catch (const ASIO_SYSTEM_ERROR& e) { LOG_ERROR("Failed to get endpoint: " << e.what()); close(ResultRetryable); return; @@ -424,7 +412,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, state_ = TcpConnected; lock.unlock(); - boost::system::error_code error; + ASIO_ERROR error; socket_->set_option(tcp::no_delay(true), error); if (error) { LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); @@ -457,7 +445,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, if (tlsSocket_) { if (!isTlsAllowInsecureConnection_) { - boost::system::error_code err; + ASIO_ERROR err; Url service_url; if (!Url::parse(physicalAddress_, service_url)) { LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); @@ -466,26 +454,21 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } } auto weakSelf = weak_from_this(); - auto callback = [weakSelf](const boost::system::error_code& err) { + auto callback = [weakSelf](const ASIO_ERROR& err) { auto self = weakSelf.lock(); if (self) { self->handleHandshake(err); } }; -#if BOOST_VERSION >= 106600 - tlsSocket_->async_handshake(boost::asio::ssl::stream::client, - boost::asio::bind_executor(strand_, callback)); -#else - tlsSocket_->async_handshake(boost::asio::ssl::stream::client, - strand_.wrap(callback)); -#endif + tlsSocket_->async_handshake(ASIO::ssl::stream::client, + ASIO::bind_executor(strand_, callback)); } else { - handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success)); + handleHandshake(ASIO_SUCCESS); } } else if (endpointIterator != tcp::resolver::iterator()) { LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); // The connection failed. Try the next endpoint in the list. - boost::system::error_code closeError; + ASIO_ERROR closeError; socket_->close(closeError); // ignore the error of close if (closeError) { LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); @@ -497,15 +480,14 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, connectTimeoutTask_->start(); tcp::endpoint endpoint = *endpointIterator; auto weakSelf = weak_from_this(); - socket_->async_connect(endpoint, - [weakSelf, endpointIterator](const boost::system::error_code& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleTcpConnected(err, endpointIterator); - } - }); + socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleTcpConnected(err, endpointIterator); + } + }); } else { - if (err == boost::asio::error::operation_aborted) { + if (err == ASIO::error::operation_aborted) { // TCP connect timeout, which is not retryable close(); } else { @@ -518,7 +500,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } } -void ClientConnection::handleHandshake(const boost::system::error_code& err) { +void ClientConnection::handleHandshake(const ASIO_ERROR& err) { if (err) { LOG_ERROR(cnxString_ << "Handshake failed: " << err.message()); close(); @@ -537,13 +519,12 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) { // Send CONNECT command to broker auto self = shared_from_this(); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) { + customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { handleSentPulsarConnect(err, buffer); })); } -void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err, - const SharedBuffer& buffer) { +void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer& buffer) { if (isClosed()) { return; } @@ -557,8 +538,7 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& readNextCommand(); } -void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err, - const SharedBuffer& buffer) { +void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer& buffer) { if (isClosed()) { return; } @@ -580,7 +560,7 @@ void ClientConnection::tcpConnectAsync() { return; } - boost::system::error_code err; + ASIO_ERROR err; Url service_url; std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_; if (!Url::parse(hostUrl, service_url)) { @@ -599,17 +579,15 @@ void ClientConnection::tcpConnectAsync() { LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); auto weakSelf = weak_from_this(); - resolver_->async_resolve( - query, [weakSelf](const boost::system::error_code& err, tcp::resolver::iterator iterator) { - auto self = weakSelf.lock(); - if (self) { - self->handleResolve(err, iterator); - } - }); + resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) { + auto self = weakSelf.lock(); + if (self) { + self->handleResolve(err, iterator); + } + }); } -void ClientConnection::handleResolve(const boost::system::error_code& err, - tcp::resolver::iterator endpointIterator) { +void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { if (err) { std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); @@ -642,13 +620,12 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, if (endpointIterator != tcp::resolver::iterator()) { LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // << " to " << endpointIterator->endpoint()); - socket_->async_connect(*endpointIterator, - [weakSelf, endpointIterator](const boost::system::error_code& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleTcpConnected(err, endpointIterator); - } - }); + socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleTcpConnected(err, endpointIterator); + } + }); } else { LOG_WARN(cnxString_ << "No IP address found"); close(); @@ -659,15 +636,13 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, void ClientConnection::readNextCommand() { const static uint32_t minReadSize = sizeof(uint32_t); auto self = shared_from_this(); - asyncReceive( - incomingBuffer_.asio_buffer(), - customAllocReadHandler([this, self](const boost::system::error_code& err, size_t bytesTransferred) { - handleRead(err, bytesTransferred, minReadSize); - })); + asyncReceive(incomingBuffer_.asio_buffer(), + customAllocReadHandler([this, self](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, minReadSize); + })); } -void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred, - uint32_t minReadSize) { +void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t minReadSize) { if (isClosed()) { return; } @@ -675,9 +650,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b incomingBuffer_.bytesWritten(bytesTransferred); if (err || bytesTransferred == 0) { - if (err == boost::asio::error::operation_aborted) { + if (err == ASIO::error::operation_aborted) { LOG_DEBUG(cnxString_ << "Read operation was canceled: " << err.message()); - } else if (bytesTransferred == 0 || err == boost::asio::error::eof) { + } else if (bytesTransferred == 0 || err == ASIO::error::eof) { LOG_DEBUG(cnxString_ << "Server closed the connection: " << err.message()); } else { LOG_ERROR(cnxString_ << "Read operation failed: " << err.message()); @@ -689,11 +664,11 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred); auto self = shared_from_this(); auto nextMinReadSize = minReadSize - bytesTransferred; - asyncReceive(buffer.asio_buffer(), customAllocReadHandler([this, self, nextMinReadSize]( - const boost::system::error_code& err, - size_t bytesTransferred) { - handleRead(err, bytesTransferred, nextMinReadSize); - })); + asyncReceive(buffer.asio_buffer(), + customAllocReadHandler( + [this, self, nextMinReadSize](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, nextMinReadSize); + })); } else { processIncomingBuffer(); } @@ -720,12 +695,11 @@ void ClientConnection::processIncomingBuffer() { incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize); } auto self = shared_from_this(); - asyncReceive( - incomingBuffer_.asio_buffer(), - customAllocReadHandler([this, self, bytesToReceive](const boost::system::error_code& err, - size_t bytesTransferred) { - handleRead(err, bytesTransferred, bytesToReceive); - })); + asyncReceive(incomingBuffer_.asio_buffer(), + customAllocReadHandler( + [this, self, bytesToReceive](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, bytesToReceive); + })); return; } @@ -803,11 +777,11 @@ void ClientConnection::processIncomingBuffer() { uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes(); auto self = shared_from_this(); - asyncReceive(incomingBuffer_.asio_buffer(), - customAllocReadHandler([this, self, minReadSize](const boost::system::error_code& err, - size_t bytesTransferred) { - handleRead(err, bytesTransferred, minReadSize); - })); + asyncReceive( + incomingBuffer_.asio_buffer(), + customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, minReadSize); + })); return; } @@ -1056,7 +1030,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request requestData.timer = executor_->createDeadlineTimer(); requestData.timer->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const boost::system::error_code& ec) { + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleLookupTimeout(ec, requestData); @@ -1082,11 +1056,7 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) { self->sendCommandInternal(cmd); } }; -#if BOOST_VERSION >= 106600 - boost::asio::post(strand_, callback); -#else - strand_.post(callback); -#endif + ASIO::post(strand_, callback); } else { sendCommandInternal(cmd); } @@ -1099,8 +1069,9 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) { void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) { auto self = shared_from_this(); asyncWrite(cmd.const_asio_buffer(), - customAllocWriteHandler([this, self, cmd](const boost::system::error_code& err, - size_t bytesTransferred) { handleSend(err, cmd); })); + customAllocWriteHandler([this, self, cmd](const ASIO_ERROR& err, size_t bytesTransferred) { + handleSend(err, cmd); + })); } void ClientConnection::sendMessage(const std::shared_ptr& args) { @@ -1116,21 +1087,18 @@ void ClientConnection::sendMessage(const std::shared_ptr& args) { // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the // callback is called, an invalid buffer range might be passed to the underlying socket send. asyncWrite(buffer, customAllocWriteHandler( - [this, self, buffer](const boost::system::error_code& err, - size_t bytesTransferred) { handleSendPair(err); })); + [this, self, buffer](const ASIO_ERROR& err, size_t bytesTransferred) { + handleSendPair(err); + })); }; if (tlsSocket_) { -#if BOOST_VERSION >= 106600 - boost::asio::post(strand_, sendMessageInternal); -#else - strand_.post(sendMessageInternal); -#endif + ASIO::post(strand_, sendMessageInternal); } else { sendMessageInternal(); } } -void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) { +void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) { if (isClosed()) { return; } @@ -1142,7 +1110,7 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh } } -void ClientConnection::handleSendPair(const boost::system::error_code& err) { +void ClientConnection::handleSendPair(const ASIO_ERROR& err) { if (isClosed()) { return; } @@ -1166,8 +1134,8 @@ void ClientConnection::sendPendingCommands() { if (any.type() == typeid(SharedBuffer)) { SharedBuffer buffer = boost::any_cast(any); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, - size_t) { handleSend(err, buffer); })); + customAllocWriteHandler( + [this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); })); } else { assert(any.type() == typeid(std::shared_ptr)); @@ -1178,9 +1146,9 @@ void ClientConnection::sendPendingCommands() { // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the // callback is called, an invalid buffer range might be passed to the underlying socket send. - asyncWrite(buffer, - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, - size_t) { handleSendPair(err); })); + asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { + handleSendPair(err); + })); } } else { // No more pending writes @@ -1202,7 +1170,7 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm requestData.timer = executor_->createDeadlineTimer(); requestData.timer->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const boost::system::error_code& ec) { + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleRequestTimeout(ec, requestData); @@ -1216,21 +1184,19 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm return requestData.promise.getFuture(); } -void ClientConnection::handleRequestTimeout(const boost::system::error_code& ec, - PendingRequestData pendingRequestData) { +void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, PendingRequestData pendingRequestData) { if (!ec && !pendingRequestData.hasGotResponse->load()) { pendingRequestData.promise.setFailed(ResultTimeout); } } -void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec, - LookupRequestData pendingRequestData) { +void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, LookupRequestData pendingRequestData) { if (!ec) { pendingRequestData.promise->setFailed(ResultTimeout); } } -void ClientConnection::handleGetLastMessageIdTimeout(const boost::system::error_code& ec, +void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec, ClientConnection::LastMessageIdRequestData data) { if (!ec) { data.promise->setFailed(ResultTimeout); @@ -1255,9 +1221,9 @@ void ClientConnection::handleKeepAliveTimeout() { // be zero And we do not attempt to dereference the pointer. Lock lock(mutex_); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds)); auto weakSelf = weak_from_this(); - keepAliveTimer_->async_wait([weakSelf](const boost::system::error_code&) { + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); if (self) { self->handleKeepAliveTimeout(); @@ -1268,7 +1234,7 @@ void ClientConnection::handleKeepAliveTimeout() { } } -void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_code& ec, +void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, std::vector consumerStatsRequests) { if (ec) { LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec << "]"); @@ -1285,15 +1251,15 @@ void ClientConnection::close(Result result, bool detach) { state_ = Disconnected; if (socket_) { - boost::system::error_code err; - socket_->shutdown(boost::asio::socket_base::shutdown_both, err); + ASIO_ERROR err; + socket_->shutdown(ASIO::socket_base::shutdown_both, err); socket_->close(err); if (err) { LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); } } if (tlsSocket_) { - boost::system::error_code err; + ASIO_ERROR err; tlsSocket_->lowest_layer().close(err); if (err) { LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message()); @@ -1432,7 +1398,7 @@ Future ClientConnection::newGetLastMessageId(u requestData.timer = executor_->createDeadlineTimer(); requestData.timer->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const boost::system::error_code& ec) { + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleGetLastMessageIdTimeout(ec, requestData); @@ -1823,7 +1789,7 @@ void ClientConnection::handleAuthChallenge() { } auto self = shared_from_this(); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) { + customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { handleSentAuthResponse(err, buffer); })); } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 1bc1bd85..69155fdd 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -23,13 +23,20 @@ #include #include -#include +#ifdef USE_ASIO +#include +#include +#include +#include +#include +#else #include -#include #include #include #include #include +#endif +#include #include #include #include @@ -37,19 +44,18 @@ #include #include +#include "AsioTimer.h" #include "Commands.h" #include "GetLastMessageIdResponse.h" #include "LookupDataResult.h" #include "SharedBuffer.h" +#include "TimeUtils.h" #include "UtilAllocator.h" - namespace pulsar { class PulsarFriend; -using DeadlineTimerPtr = std::shared_ptr; -using TimeDuration = boost::posix_time::time_duration; -using TcpResolverPtr = std::shared_ptr; +using TcpResolverPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; @@ -114,10 +120,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this SocketPtr; - typedef std::shared_ptr> TlsSocketPtr; + typedef std::shared_ptr SocketPtr; + typedef std::shared_ptr> TlsSocketPtr; typedef std::shared_ptr ConnectionPtr; - typedef std::function ConnectionListener; + typedef std::function ConnectionListener; typedef std::vector::iterator ListenerIterator; /* @@ -224,17 +230,16 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= 106600 - boost::asio::async_write(*tlsSocket_, buffers, boost::asio::bind_executor(strand_, handler)); -#else - boost::asio::async_write(*tlsSocket_, buffers, strand_.wrap(handler)); -#endif + ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler)); } else { - boost::asio::async_write(*socket_, buffers, handler); + ASIO::async_write(*socket_, buffers, handler); } } @@ -296,11 +296,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= 106600 - tlsSocket_->async_read_some(buffers, boost::asio::bind_executor(strand_, handler)); -#else - tlsSocket_->async_read_some(buffers, strand_.wrap(handler)); -#endif + tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, handler)); } else { socket_->async_receive(buffers, handler); } @@ -321,11 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= 106600 - boost::asio::strand strand_; -#else - boost::asio::io_service::strand strand_; -#endif + ASIO::strand strand_; const std::string logicalAddress_; /* @@ -343,7 +335,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this consumerStatsRequests); + void handleConsumerStatsTimeout(const ASIO_ERROR& ec, std::vector consumerStatsRequests); void startConsumerStatsTimer(std::vector consumerStatsRequests); uint32_t maxPendingLookupRequest_; diff --git a/lib/CompressionCodec.cc b/lib/CompressionCodec.cc index 991d52c0..6105887a 100644 --- a/lib/CompressionCodec.cc +++ b/lib/CompressionCodec.cc @@ -45,7 +45,6 @@ CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compression default: return compressionCodecNone_; } - BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType enumeration value")); } SharedBuffer CompressionCodecNone::encode(const SharedBuffer& raw) { return raw; } diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index 95170a94..4cc8883a 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -18,15 +18,20 @@ */ #include "ConnectionPool.h" +#ifdef USE_ASIO +#include +#include +#else #include #include +#endif #include "ClientConnection.h" #include "ExecutorService.h" #include "LogUtils.h" -using boost::asio::ip::tcp; -namespace ssl = boost::asio::ssl; +using ASIO::ip::tcp; +namespace ssl = ASIO::ssl; typedef ssl::stream ssl_socket; DECLARE_LOG_OBJECT() diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index f9770d80..52162189 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -26,6 +26,7 @@ #include "AckGroupingTracker.h" #include "AckGroupingTrackerDisabled.h" #include "AckGroupingTrackerEnabled.h" +#include "AsioDefines.h" #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" #include "BitSet.h" @@ -55,6 +56,9 @@ namespace pulsar { DECLARE_LOG_OBJECT() +using std::chrono::milliseconds; +using std::chrono::seconds; + ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, bool isPersistent, const ConsumerInterceptorsPtr& interceptors, @@ -402,10 +406,9 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b } void ConsumerImpl::triggerCheckExpiredChunkedTimer() { - checkExpiredChunkedTimer_->expires_from_now( - boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); std::weak_ptr weakSelf{shared_from_this()}; - checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void { + checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { auto self = weakSelf.lock(); if (!self) { return; @@ -1581,7 +1584,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time } } else { TimeDuration next = std::min(remainTime, backoff->next()); - if (next.total_milliseconds() <= 0) { + if (toMillis(next) <= 0) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); callback(ResultNotConnected, MessageId()); return; @@ -1592,8 +1595,8 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time auto self = shared_from_this(); timer->async_wait([this, backoff, remainTime, timer, next, callback, - self](const boost::system::error_code& ec) -> void { - if (ec == boost::asio::error::operation_aborted) { + self](const ASIO_ERROR& ec) -> void { + if (ec == ASIO::error::operation_aborted) { LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "]."); return; } @@ -1602,7 +1605,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time return; } LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in " - << next.total_milliseconds() << " ms") + << toMillis(next) << " ms") this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback); }); } @@ -1693,7 +1696,7 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { } void ConsumerImpl::cancelTimers() noexcept { - boost::system::error_code ec; + ASIO_ERROR ec; batchReceiveTimer_->cancel(ec); checkExpiredChunkedTimer_->cancel(ec); unAckedMessageTrackerPtr_->stop(); diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc index 39212371..851d41e8 100644 --- a/lib/ConsumerImplBase.cc +++ b/lib/ConsumerImplBase.cc @@ -51,9 +51,9 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { if (timeoutMs > 0) { - batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs)); + batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs)); std::weak_ptr weakSelf{shared_from_this()}; - batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self && !ec) { self->doBatchReceiveTimeTask(); diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc index 53be8eaf..794e3619 100644 --- a/lib/ExecutorService.cc +++ b/lib/ExecutorService.cc @@ -32,7 +32,7 @@ void ExecutorService::start() { auto self = shared_from_this(); std::thread t{[this, self] { LOG_DEBUG("Run io_service in a single thread"); - boost::system::error_code ec; + ASIO_ERROR ec; while (!closed_) { io_service_.restart(); IOService::work work{getIOService()}; @@ -63,22 +63,22 @@ ExecutorServicePtr ExecutorService::create() { } /* - * factory method of boost::asio::ip::tcp::socket associated with io_service_ instance + * factory method of ASIO::ip::tcp::socket associated with io_service_ instance * @ returns shared_ptr to this socket */ SocketPtr ExecutorService::createSocket() { try { - return SocketPtr(new boost::asio::ip::tcp::socket(io_service_)); - } catch (const boost::system::system_error &e) { + return SocketPtr(new ASIO::ip::tcp::socket(io_service_)); + } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create socket: ") + e.what(); throw std::runtime_error(error); } } -TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) { - return std::shared_ptr>( - new boost::asio::ssl::stream(*socket, ctx)); +TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::context &ctx) { + return std::shared_ptr>( + new ASIO::ssl::stream(*socket, ctx)); } /* @@ -87,8 +87,8 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ss */ TcpResolverPtr ExecutorService::createTcpResolver() { try { - return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_)); - } catch (const boost::system::system_error &e) { + return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_)); + } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create resolver: ") + e.what(); throw std::runtime_error(error); @@ -97,10 +97,10 @@ TcpResolverPtr ExecutorService::createTcpResolver() { DeadlineTimerPtr ExecutorService::createDeadlineTimer() { try { - return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_)); - } catch (const boost::system::system_error &e) { + return DeadlineTimerPtr(new ASIO::steady_timer(io_service_)); + } catch (const ASIO_SYSTEM_ERROR &e) { restart(); - auto error = std::string("Failed to create deadline_timer: ") + e.what(); + auto error = std::string("Failed to create steady_timer: ") + e.what(); throw std::runtime_error(error); } } diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h index a373c0af..89d06d30 100644 --- a/lib/ExecutorService.h +++ b/lib/ExecutorService.h @@ -22,10 +22,15 @@ #include #include -#include +#ifdef USE_ASIO +#include +#include +#include +#else #include #include #include +#endif #include #include #include @@ -33,14 +38,15 @@ #include #include +#include "AsioTimer.h" + namespace pulsar { -typedef std::shared_ptr SocketPtr; -typedef std::shared_ptr > TlsSocketPtr; -typedef std::shared_ptr TcpResolverPtr; -typedef std::shared_ptr DeadlineTimerPtr; +typedef std::shared_ptr SocketPtr; +typedef std::shared_ptr > TlsSocketPtr; +typedef std::shared_ptr TcpResolverPtr; class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this { public: - using IOService = boost::asio::io_service; + using IOService = ASIO::io_service; using SharedPtr = std::shared_ptr; static SharedPtr create(); @@ -51,7 +57,7 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_thisgetIOExecutorProvider()->get()), mutex_(), creationTimestamp_(TimeUtils::now()), - operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), + operationTimeut_(std::chrono::seconds(client->conf().getOperationTimeoutSeconds())), state_(NotStarted), backoff_(backoff), epoch_(0), @@ -147,13 +148,13 @@ void HandlerBase::scheduleReconnection() { if (state == Pending || state == Ready) { TimeDuration delay = backoff_.next(); - LOG_INFO(getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s"); + LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); timer_->expires_from_now(delay); // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled // so we will not run into the case where grabCnx is invoked on out of scope handler auto name = getName(); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([name, weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleTimeout(ec); @@ -164,7 +165,7 @@ void HandlerBase::scheduleReconnection() { } } -void HandlerBase::handleTimeout(const boost::system::error_code& ec) { +void HandlerBase::handleTimeout(const ASIO_ERROR& ec) { if (ec) { LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec << "]"); return; diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index f62c4df0..68c0b6a6 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -20,20 +20,17 @@ #define _PULSAR_HANDLER_BASE_HEADER_ #include -#include #include #include #include +#include "AsioTimer.h" #include "Backoff.h" #include "Future.h" +#include "TimeUtils.h" namespace pulsar { -using namespace boost::posix_time; -using boost::posix_time::milliseconds; -using boost::posix_time::seconds; - class ClientImpl; using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; @@ -42,7 +39,6 @@ using ClientConnectionPtr = std::shared_ptr; using ClientConnectionWeakPtr = std::weak_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class HandlerBase : public std::enable_shared_from_this { public: @@ -95,7 +91,7 @@ class HandlerBase : public std::enable_shared_from_this { void handleDisconnection(Result result, const ClientConnectionPtr& cnx); - void handleTimeout(const boost::system::error_code& ec); + void handleTimeout(const ASIO_ERROR& ec); protected: ClientImplWeakPtr client_; diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h index dbc5d8a7..f1f5eef3 100644 --- a/lib/Int64SerDes.h +++ b/lib/Int64SerDes.h @@ -20,7 +20,12 @@ #include -#include // for ntohl +// for ntohl +#ifdef USE_ASIO +#include +#else +#include +#endif namespace pulsar { diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 15f9d9b8..af70623d 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -18,6 +18,7 @@ */ #include "MultiTopicsConsumerImpl.h" +#include #include #include "ClientImpl.h" @@ -37,6 +38,9 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +using std::chrono::milliseconds; +using std::chrono::seconds; + MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, @@ -90,7 +94,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std auto partitionsUpdateInterval = static_cast(client->conf().getPartitionsUpdateInterval()); if (partitionsUpdateInterval > 0) { partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); - partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); + partitionsUpdateInterval_ = seconds(partitionsUpdateInterval); lookupServicePtr_ = client->getLookup(); } @@ -936,7 +940,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { void MultiTopicsConsumerImpl::runPartitionUpdateTask() { partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); auto weakSelf = weak_from_this(); - partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it // cannot continue at this time, and the request needs to be ignored. auto self = weakSelf.lock(); @@ -1087,7 +1091,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { void MultiTopicsConsumerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { - boost::system::error_code ec; + ASIO_ERROR ec; partitionsUpdateTimer_->cancel(ec); } } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index d4127f63..c5834eaa 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -32,6 +32,7 @@ #include "LookupDataResult.h" #include "SynchronizedHashMap.h" #include "TestUtil.h" +#include "TimeUtils.h" #include "UnboundedBlockingQueue.h" namespace pulsar { @@ -119,7 +120,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { std::atomic_int incomingMessagesSize_ = {0}; MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; - boost::posix_time::time_duration partitionsUpdateInterval_; + TimeDuration partitionsUpdateInterval_; LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 0dd73589..e443496d 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -40,9 +40,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con nackDelay_ = std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), MIN_NACK_DELAY_MILLIS)); - timerInterval_ = boost::posix_time::milliseconds((long)(nackDelay_.count() / 3)); - LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() - << " ms - Timer interval: " << timerInterval_); + timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3)); + LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() << " ms - Timer interval: " + << timerInterval_.count()); } void NegativeAcksTracker::scheduleTimer() { @@ -51,14 +51,14 @@ void NegativeAcksTracker::scheduleTimer() { } std::weak_ptr weakSelf{shared_from_this()}; timer_->expires_from_now(timerInterval_); - timer_->async_wait([weakSelf](const boost::system::error_code &ec) { + timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { if (auto self = weakSelf.lock()) { self->handleTimer(ec); } }); } -void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { +void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) { if (ec) { // Ignore cancelled events return; @@ -107,7 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) { void NegativeAcksTracker::close() { closed_ = true; - boost::system::error_code ec; + ASIO_ERROR ec; timer_->cancel(ec); std::lock_guard lock(mutex_); nackedMessages_.clear(); diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index 4b489844..472e9763 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -23,12 +23,13 @@ #include #include -#include #include #include #include #include +#include "AsioDefines.h" +#include "AsioTimer.h" #include "TestUtil.h" namespace pulsar { @@ -36,7 +37,6 @@ namespace pulsar { class ConsumerImpl; class ClientImpl; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; @@ -56,13 +56,13 @@ class NegativeAcksTracker : public std::enable_shared_from_this nackedMessages_; diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index a1319e10..46fd9c1c 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -23,8 +23,6 @@ #include #include -#include - #include "ChunkMessageIdImpl.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" @@ -53,7 +51,7 @@ struct OpSendMsg { const int32_t numChunks; const uint32_t messagesCount; const uint64_t messagesSize; - const boost::posix_time::ptime timeout; + const ptime timeout; const SendCallback sendCallback; std::vector> trackerCallbacks; ChunkMessageIdListPtr chunkMessageIdList; @@ -98,7 +96,7 @@ struct OpSendMsg { numChunks(metadata.num_chunks_from_msg()), messagesCount(messagesCount), messagesSize(messagesSize), - timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), + timeout(TimeUtils::now() + std::chrono::milliseconds(sendTimeoutMs)), sendCallback(std::move(callback)), chunkMessageIdList(std::move(chunkMessageIdList)), sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 54e96c84..4178096c 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -58,7 +58,7 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top if (partitionsUpdateInterval > 0) { listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); - partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); + partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); lookupServicePtr_ = client->getLookup(); } } @@ -69,7 +69,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { return std::make_shared( conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(), conf_.getBatchingMaxAllowedSizeInBytes(), - boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); + std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs())); case ProducerConfiguration::CustomPartition: return conf_.getMessageRouterPtr(); case ProducerConfiguration::UseSinglePartition: @@ -422,7 +422,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { void PartitionedProducerImpl::runPartitionUpdateTask() { auto weakSelf = weak_from_this(); partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); - partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->getPartitionMetadata(); @@ -524,7 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { void PartitionedProducerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { - boost::system::error_code ec; + ASIO_ERROR ec; partitionsUpdateTimer_->cancel(ec); } } diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 2d07a81a..610c74ed 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -20,21 +20,21 @@ #include #include -#include #include #include #include +#include "AsioTimer.h" #include "LookupDataResult.h" #include "ProducerImplBase.h" #include "ProducerInterceptors.h" +#include "TimeUtils.h" namespace pulsar { class ClientImpl; using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; class LookupService; @@ -128,7 +128,7 @@ class PartitionedProducerImpl : public ProducerImplBase, ExecutorServicePtr listenerExecutor_; DeadlineTimerPtr partitionsUpdateTimer_; - boost::posix_time::time_duration partitionsUpdateInterval_; + TimeDuration partitionsUpdateInterval_; LookupServicePtr lookupServicePtr_; ProducerInterceptorsPtr interceptors_; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index 23e445ee..4fc7bb61 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -27,6 +27,8 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +using std::chrono::seconds; + PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( ClientImplPtr client, const std::string pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, @@ -49,15 +51,15 @@ void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); auto weakSelf = weak_from_this(); - autoDiscoveryTimer_->async_wait([weakSelf](const boost::system::error_code& err) { + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { if (auto self = weakSelf.lock()) { self->autoDiscoveryTimerTask(err); } }); } -void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system::error_code& err) { - if (err == boost::asio::error::operation_aborted) { +void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& err) { + if (err == ASIO::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); return; } else if (err) { @@ -228,7 +230,7 @@ void PatternMultiTopicsConsumerImpl::start() { if (conf_.getPatternAutoDiscoveryPeriod() > 0) { autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); auto weakSelf = weak_from_this(); - autoDiscoveryTimer_->async_wait([weakSelf](const boost::system::error_code& err) { + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { if (auto self = weakSelf.lock()) { self->autoDiscoveryTimerTask(err); } @@ -247,6 +249,6 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { } void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { - boost::system::error_code ec; + ASIO_ERROR ec; autoDiscoveryTimer_->cancel(ec); } diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 5d3ba9ec..f272df22 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -22,6 +22,7 @@ #include #include +#include "AsioTimer.h" #include "LookupDataResult.h" #include "MultiTopicsConsumerImpl.h" #include "NamespaceName.h" @@ -56,7 +57,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { const PULSAR_REGEX_NAMESPACE::regex getPattern(); - void autoDiscoveryTimerTask(const boost::system::error_code& err); + void autoDiscoveryTimerTask(const ASIO_ERROR& err); // filter input `topics` with given `pattern`, return matched topics. Do not match topic domain. static NamespaceTopicsPtr topicsPatternFilter(const std::vector& topics, @@ -74,7 +75,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { const std::string patternString_; const PULSAR_REGEX_NAMESPACE::regex pattern_; const CommandGetTopicsOfNamespace_Mode getTopicsMode_; - typedef std::shared_ptr TimerPtr; + typedef std::shared_ptr TimerPtr; TimerPtr autoDiscoveryTimer_; bool autoDiscoveryRunning_; NamespaceNamePtr namespaceName_; diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc index 6046eae2..9fde012a 100644 --- a/lib/PeriodicTask.cc +++ b/lib/PeriodicTask.cc @@ -18,6 +18,8 @@ */ #include "PeriodicTask.h" +#include + namespace pulsar { void PeriodicTask::start() { @@ -27,7 +29,7 @@ void PeriodicTask::start() { state_ = Ready; if (periodMs_ >= 0) { std::weak_ptr weakSelf{shared_from_this()}; - timer_->expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); timer_->async_wait([weakSelf](const ErrorCode& ec) { auto self = weakSelf.lock(); if (self) { @@ -48,7 +50,7 @@ void PeriodicTask::stop() noexcept { } void PeriodicTask::handleTimeout(const ErrorCode& ec) { - if (state_ != Ready || ec.value() == boost::system::errc::operation_canceled) { + if (state_ != Ready || ec == ASIO::error::operation_aborted) { return; } @@ -57,7 +59,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { // state_ may be changed in handleTimeout, so we check state_ again if (state_ == Ready) { auto self = shared_from_this(); - timer_->expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); } } diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h index 5de81ae4..bc186348 100644 --- a/lib/PeriodicTask.h +++ b/lib/PeriodicTask.h @@ -36,7 +36,7 @@ namespace pulsar { */ class PeriodicTask : public std::enable_shared_from_this { public: - using ErrorCode = boost::system::error_code; + using ErrorCode = ASIO_ERROR; using CallbackType = std::function; enum State : std::uint8_t diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 0a129259..fc39b231 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -20,7 +20,7 @@ #include -#include +#include #include "BatchMessageContainer.h" #include "BatchMessageKeyBasedContainer.h" @@ -46,6 +46,8 @@ namespace pulsar { DECLARE_LOG_OBJECT() +using std::chrono::milliseconds; + ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors, int32_t partition, bool retryOnCreationError) @@ -465,7 +467,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { Producer producer = Producer(shared_from_this()); auto interceptorMessage = interceptors_->beforeSend(producer, msg); - const auto now = boost::posix_time::microsec_clock::universal_time(); + const auto now = TimeUtils::now(); auto self = shared_from_this(); sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage]( Result result, const MessageId& messageId) { @@ -564,10 +566,9 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); bool isFull = batchMessageContainer_->add(msg, callback); if (isFirstMessage) { - batchTimer_->expires_from_now( - boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); + batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs())); auto weakSelf = weak_from_this(); - batchTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; @@ -824,14 +825,14 @@ Future ProducerImpl::getProducerCreatedFuture() uint64_t ProducerImpl::getProducerId() const { return producerId_; } -void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { +void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) { const auto state = state_.load(); if (state != Pending && state != Ready) { return; } Lock lock(mutex_); - if (err == boost::asio::error::operation_aborted) { + if (err == ASIO::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); return; } else if (err) { @@ -847,8 +848,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { } else { // If there is at least one message, calculate the diff between the message timeout and // the current time. - time_duration diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now(); - if (diff.total_milliseconds() <= 0) { + auto diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now(); + if (toMillis(diff) <= 0) { // The diff is less than or equal to zero, meaning that the message has been expired. LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks."); pendingMessages = getPendingCallbacksWhenFailed(); @@ -856,7 +857,7 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout())); } else { // The diff is greater than zero, set the timeout to the diff value - LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff); + LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff.count()); asyncWaitSendTimeout(diff); } } @@ -1000,7 +1001,7 @@ void ProducerImpl::shutdown() { void ProducerImpl::cancelTimers() noexcept { dataKeyRefreshTask_.stop(); - boost::system::error_code ec; + ASIO_ERROR ec; batchTimer_->cancel(ec); sendTimer_->cancel(ec); } @@ -1026,7 +1027,7 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { sendTimer_->expires_from_now(expiryTime); auto weakSelf = weak_from_this(); - sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) { + sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { auto self = weakSelf.lock(); if (self) { std::static_pointer_cast(self)->handleSendTimeout(err); diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index b467458d..97816050 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -19,6 +19,12 @@ #ifndef LIB_PRODUCERIMPL_H_ #define LIB_PRODUCERIMPL_H_ +#include "TimeUtils.h" +#ifdef USE_ASIO +#include +#else +#include +#endif #include #include #include @@ -30,6 +36,7 @@ #if defined(_MSC_VER) || defined(__APPLE__) #include "OpSendMsg.h" #endif +#include "AsioDefines.h" #include "PendingFailures.h" #include "PeriodicTask.h" #include "ProducerImplBase.h" @@ -39,7 +46,7 @@ namespace pulsar { class BatchMessageContainerBase; class ClientImpl; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; +using DeadlineTimerPtr = std::shared_ptr; class MessageCrypto; using MessageCryptoPtr = std::shared_ptr; class ProducerImpl; @@ -137,7 +144,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { void resendMessages(ClientConnectionPtr cnx); - void refreshEncryptionKey(const boost::system::error_code& ec); + void refreshEncryptionKey(const ASIO_ERROR& ec); bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, SharedBuffer& encryptedPayload); @@ -183,8 +190,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { std::string schemaVersion_; DeadlineTimerPtr sendTimer_; - void handleSendTimeout(const boost::system::error_code& err); - using DurationType = typename boost::asio::deadline_timer::duration_type; + void handleSendTimeout(const ASIO_ERROR& err); + using DurationType = TimeDuration; void asyncWaitSendTimeout(DurationType expiryTime); Promise producerCreatedPromise_; diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index d026e424..9c920da1 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -30,6 +30,7 @@ #include "Future.h" #include "LogUtils.h" #include "ResultUtils.h" +#include "TimeUtils.h" namespace pulsar { @@ -43,9 +44,8 @@ class RetryableOperation : public std::enable_shared_from_thiscancel(ec); } @@ -100,7 +100,7 @@ class RetryableOperation : public std::enable_shared_from_thisexpires_from_now(delay); auto nextRemainingTime = remainingTime - delay; - LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds() - << " ms, remaining time: " << nextRemainingTime.total_milliseconds() - << " ms"); - timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) { + LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay) + << " ms, remaining time: " << toMillis(nextRemainingTime) << " ms"); + timer_->async_wait([this, weakSelf, nextRemainingTime](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; } if (ec) { - if (ec == boost::asio::error::operation_aborted) { + if (ec == ASIO::error::operation_aborted) { LOG_DEBUG("Timer for " << name_ << " is cancelled"); promise_.setFailed(ResultTimeout); } else { LOG_WARN("Timer for " << name_ << " failed: " << ec.message()); } } else { - LOG_DEBUG("Run operation " << name_ << ", remaining time: " - << nextRemainingTime.total_milliseconds() << " ms"); + LOG_DEBUG("Run operation " << name_ << ", remaining time: " << toMillis(nextRemainingTime) + << " ms"); runImpl(nextRemainingTime); } }); diff --git a/lib/RoundRobinMessageRouter.cc b/lib/RoundRobinMessageRouter.cc index 9693cc25..1bc0b306 100644 --- a/lib/RoundRobinMessageRouter.cc +++ b/lib/RoundRobinMessageRouter.cc @@ -26,8 +26,7 @@ namespace pulsar { RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, bool batchingEnabled, uint32_t maxBatchingMessages, - uint32_t maxBatchingSize, - boost::posix_time::time_duration maxBatchingDelay) + uint32_t maxBatchingSize, TimeDuration maxBatchingDelay) : MessageRouterBase(hashingScheme), batchingEnabled_(batchingEnabled), maxBatchingMessages_(maxBatchingMessages), @@ -74,7 +73,7 @@ int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadat int64_t now = TimeUtils::currentTimeMillis(); if (messageCount >= maxBatchingMessages_ || (messageSize >= maxBatchingSize_ - batchSize) || - (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) { + (now - lastPartitionChange >= toMillis(maxBatchingDelay_))) { uint32_t currentPartitionCursor = ++currentPartitionCursor_; lastPartitionChange_ = now; cumulativeBatchSize_ = messageSize; diff --git a/lib/RoundRobinMessageRouter.h b/lib/RoundRobinMessageRouter.h index 753573a1..03cfac80 100644 --- a/lib/RoundRobinMessageRouter.h +++ b/lib/RoundRobinMessageRouter.h @@ -23,16 +23,16 @@ #include #include -#include #include "MessageRouterBase.h" +#include "TimeUtils.h" namespace pulsar { class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase { public: RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, bool batchingEnabled, uint32_t maxBatchingMessages, uint32_t maxBatchingSize, - boost::posix_time::time_duration maxBatchingDelay); + TimeDuration maxBatchingDelay); virtual ~RoundRobinMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); @@ -40,7 +40,7 @@ class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase { const bool batchingEnabled_; const uint32_t maxBatchingMessages_; const uint32_t maxBatchingSize_; - const boost::posix_time::time_duration maxBatchingDelay_; + const TimeDuration maxBatchingDelay_; std::atomic currentPartitionCursor_; std::atomic lastPartitionChange_; diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h index 7ee26184..26fc59ed 100644 --- a/lib/SharedBuffer.h +++ b/lib/SharedBuffer.h @@ -22,12 +22,19 @@ #include #include +#ifdef USE_ASIO +#include +#include +#else #include #include +#endif #include #include #include +#include "AsioDefines.h" + namespace pulsar { class SharedBuffer { @@ -144,13 +151,13 @@ class SharedBuffer { inline bool writable() const { return writableBytes() > 0; } - boost::asio::const_buffers_1 const_asio_buffer() const { - return boost::asio::const_buffers_1(ptr_ + readIdx_, readableBytes()); + ASIO::const_buffers_1 const_asio_buffer() const { + return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes()); } - boost::asio::mutable_buffers_1 asio_buffer() { + ASIO::mutable_buffers_1 asio_buffer() { assert(data_); - return boost::asio::buffer(ptr_ + writeIdx_, writableBytes()); + return ASIO::buffer(ptr_ + writeIdx_, writableBytes()); } void write(const char* data, uint32_t size) { @@ -239,17 +246,17 @@ class CompositeSharedBuffer { } // Implement the ConstBufferSequence requirements. - typedef boost::asio::const_buffer value_type; - typedef boost::asio::const_buffer* iterator; - typedef const boost::asio::const_buffer* const_iterator; + typedef ASIO::const_buffer value_type; + typedef ASIO::const_buffer* iterator; + typedef const ASIO::const_buffer* const_iterator; - const boost::asio::const_buffer* begin() const { return &(asioBuffers_.at(0)); } + const ASIO::const_buffer* begin() const { return &(asioBuffers_.at(0)); } - const boost::asio::const_buffer* end() const { return begin() + Size; } + const ASIO::const_buffer* end() const { return begin() + Size; } private: std::array sharedBuffers_; - std::array asioBuffers_; + std::array asioBuffers_; }; typedef CompositeSharedBuffer<2> PairSharedBuffer; diff --git a/lib/TimeUtils.h b/lib/TimeUtils.h index a55773d9..03f563c3 100644 --- a/lib/TimeUtils.h +++ b/lib/TimeUtils.h @@ -21,19 +21,23 @@ #include #include -#include #include namespace pulsar { -using namespace boost::posix_time; -using boost::posix_time::milliseconds; -using boost::posix_time::seconds; +using ptime = decltype(std::chrono::high_resolution_clock::now()); +using TimeDuration = std::chrono::nanoseconds; + +inline decltype(std::chrono::milliseconds(0).count()) toMillis(TimeDuration duration) { + return std::chrono::duration_cast(duration).count(); +} class PULSAR_PUBLIC TimeUtils { public: - static ptime now(); - static int64_t currentTimeMillis(); + static ptime now() { return std::chrono::high_resolution_clock::now(); } + static int64_t currentTimeMillis() { + return std::chrono::duration_cast(now().time_since_epoch()).count(); + } }; // This class processes a timeout with the following semantics: diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc index 061a1409..e371af99 100644 --- a/lib/UnAckedMessageTrackerEnabled.cc +++ b/lib/UnAckedMessageTrackerEnabled.cc @@ -34,9 +34,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { timeoutHandlerHelper(); ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); timer_ = executorService->createDeadlineTimer(); - timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_)); + timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_)); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self && !ec) { self->timeoutHandler(); @@ -173,7 +173,7 @@ void UnAckedMessageTrackerEnabled::clear() { } void UnAckedMessageTrackerEnabled::stop() { - boost::system::error_code ec; + ASIO_ERROR ec; if (timer_) { timer_->cancel(ec); } diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h index 6181a8a3..83edc4cb 100644 --- a/lib/UnAckedMessageTrackerEnabled.h +++ b/lib/UnAckedMessageTrackerEnabled.h @@ -18,13 +18,13 @@ */ #ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_ #define LIB_UNACKEDMESSAGETRACKERENABLED_H_ -#include #include #include #include #include #include +#include "AsioTimer.h" #include "TestUtil.h" #include "UnAckedMessageTrackerInterface.h" @@ -33,7 +33,6 @@ namespace pulsar { class ClientImpl; class ConsumerImplBase; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this, public UnAckedMessageTrackerInterface { diff --git a/lib/auth/athenz/ZTSClient.cc b/lib/auth/athenz/ZTSClient.cc index 230713ea..35387d96 100644 --- a/lib/auth/athenz/ZTSClient.cc +++ b/lib/auth/athenz/ZTSClient.cc @@ -44,8 +44,6 @@ namespace ptree = boost::property_tree; #pragma clang diagnostic ignored "-Wunknown-warning-option" #endif -#include - #if defined(__clang__) #pragma clang diagnostic pop #endif diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc index 056dbf6e..0eefabdc 100644 --- a/lib/stats/ConsumerStatsImpl.cc +++ b/lib/stats/ConsumerStatsImpl.cc @@ -46,7 +46,7 @@ ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats) totalAckedMsgMap_(stats.totalAckedMsgMap_), statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {} -void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) { +void ConsumerStatsImpl::flushAndReset(const ASIO_ERROR& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); return; @@ -85,9 +85,9 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy } void ConsumerStatsImpl::scheduleTimer() { - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); + timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h index 44f927f5..03f3a474 100644 --- a/lib/stats/ConsumerStatsImpl.h +++ b/lib/stats/ConsumerStatsImpl.h @@ -20,17 +20,16 @@ #ifndef PULSAR_CONSUMER_STATS_IMPL_H_ #define PULSAR_CONSUMER_STATS_IMPL_H_ -#include #include #include #include #include #include "ConsumerStatsBase.h" +#include "lib/AsioTimer.h" #include "lib/ExecutorService.h" namespace pulsar { -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; @@ -58,7 +57,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this public: ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int); ConsumerStatsImpl(const ConsumerStatsImpl& stats); - void flushAndReset(const boost::system::error_code&); + void flushAndReset(const ASIO_ERROR&); void start() override; void receivedMessage(Message&, Result) override; void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; diff --git a/lib/stats/ProducerStatsBase.h b/lib/stats/ProducerStatsBase.h index fe0ba0a5..b24266e8 100644 --- a/lib/stats/ProducerStatsBase.h +++ b/lib/stats/ProducerStatsBase.h @@ -22,14 +22,14 @@ #include #include -#include +#include "lib/TimeUtils.h" namespace pulsar { class ProducerStatsBase { public: virtual void start() {} virtual void messageSent(const Message& msg) = 0; - virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0; + virtual void messageReceived(Result, const ptime&) = 0; virtual ~ProducerStatsBase(){}; }; diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/stats/ProducerStatsDisabled.h index df1df0f8..df1da783 100644 --- a/lib/stats/ProducerStatsDisabled.h +++ b/lib/stats/ProducerStatsDisabled.h @@ -25,7 +25,7 @@ namespace pulsar { class ProducerStatsDisabled : public ProducerStatsBase { public: virtual void messageSent(const Message& msg){}; - virtual void messageReceived(Result, const boost::posix_time::ptime&){}; + virtual void messageReceived(Result, const ptime&){}; }; } // namespace pulsar #endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc index 3d3629db..15e9e67e 100644 --- a/lib/stats/ProducerStatsImpl.cc +++ b/lib/stats/ProducerStatsImpl.cc @@ -20,9 +20,11 @@ #include "ProducerStatsImpl.h" #include +#include #include "lib/ExecutorService.h" #include "lib/LogUtils.h" +#include "lib/TimeUtils.h" #include "lib/Utils.h" namespace pulsar { @@ -65,7 +67,7 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats) void ProducerStatsImpl::start() { scheduleTimer(); } -void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) { +void ProducerStatsImpl::flushAndReset(const ASIO_ERROR& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); return; @@ -93,9 +95,10 @@ void ProducerStatsImpl::messageSent(const Message& msg) { totalBytesSent_ += msg.getLength(); } -void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) { - boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time(); - double diffInMicros = (currentTime - publishTime).total_microseconds(); +void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) { + auto currentTime = TimeUtils::now(); + double diffInMicros = + std::chrono::duration_cast(currentTime - publishTime).count(); std::lock_guard lock(mutex_); totalLatencyAccumulator_(diffInMicros); latencyAccumulator_(diffInMicros); @@ -106,9 +109,9 @@ void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::pti ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); } void ProducerStatsImpl::scheduleTimer() { - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); + timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h index 8cd10992..5d445c60 100644 --- a/lib/stats/ProducerStatsImpl.h +++ b/lib/stats/ProducerStatsImpl.h @@ -30,20 +30,18 @@ #include #include #include -#include -#include #include #include #include #include #include "ProducerStatsBase.h" +#include "lib/AsioTimer.h" namespace pulsar { class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; typedef boost::accumulators::accumulator_set< double, @@ -83,11 +81,11 @@ class ProducerStatsImpl : public std::enable_shared_from_this void start() override; - void flushAndReset(const boost::system::error_code&); + void flushAndReset(const ASIO_ERROR&); void messageSent(const Message&) override; - void messageReceived(Result, const boost::posix_time::ptime&) override; + void messageReceived(Result, const ptime&) override; ~ProducerStatsImpl(); diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc index 9fd048b7..b091f973 100644 --- a/tests/AuthPluginTest.cc +++ b/tests/AuthPluginTest.cc @@ -21,9 +21,14 @@ #include #include +#ifdef USE_ASIO +#include +#else #include +#endif #include +#include "lib/AsioDefines.h" #include "lib/Future.h" #include "lib/Latch.h" #include "lib/LogUtils.h" @@ -287,10 +292,9 @@ namespace testAthenz { std::string principalToken; void mockZTS(Latch& latch, int port) { LOG_INFO("-- MockZTS started"); - boost::asio::io_service io; - boost::asio::ip::tcp::iostream stream; - boost::asio::ip::tcp::acceptor acceptor(io, - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)); + ASIO::io_service io; + ASIO::ip::tcp::iostream stream; + ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port)); LOG_INFO("-- MockZTS waiting for connnection"); latch.countdown(); diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index a04da085..7595f44e 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include #include diff --git a/tests/BackoffTest.cc b/tests/BackoffTest.cc index d066b944..5fe4f71a 100644 --- a/tests/BackoffTest.cc +++ b/tests/BackoffTest.cc @@ -26,42 +26,42 @@ #include "lib/stats/ProducerStatsImpl.h" using namespace pulsar; -using boost::posix_time::milliseconds; -using boost::posix_time::seconds; +using std::chrono::milliseconds; +using std::chrono::seconds; static bool checkExactAndDecrementTimer(Backoff& backoff, const unsigned int& t2) { - const unsigned int& t1 = backoff.next().total_milliseconds(); - boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + auto t1 = toMillis(backoff.next()); + auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); firstBackOffTime -= milliseconds(t2); return t1 == t2; } static bool withinTenPercentAndDecrementTimer(Backoff& backoff, const unsigned int& t2) { - const unsigned int& t1 = backoff.next().total_milliseconds(); - boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + auto t1 = toMillis(backoff.next()); + auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); firstBackOffTime -= milliseconds(t2); return (t1 >= t2 * 0.9 && t1 <= t2); } TEST(BackoffTest, mandatoryStopTestNegativeTest) { Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); - ASSERT_EQ(backoff.next().total_milliseconds(), 100); - backoff.next().total_milliseconds(); // 200 - backoff.next().total_milliseconds(); // 400 - backoff.next().total_milliseconds(); // 800 + ASSERT_EQ(toMillis(backoff.next()), 100); + backoff.next(); // 200 + backoff.next(); // 400 + backoff.next(); // 800 ASSERT_FALSE(withinTenPercentAndDecrementTimer(backoff, 400)); } TEST(BackoffTest, firstBackoffTimerTest) { Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); - ASSERT_EQ(backoff.next().total_milliseconds(), 100); - boost::posix_time::ptime firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + ASSERT_EQ(toMillis(backoff.next()), 100); + auto firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); std::this_thread::sleep_for(std::chrono::milliseconds(300)); TimeDuration diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime; ASSERT_EQ(diffBackOffTime, milliseconds(0)); // no change since reset not called backoff.reset(); - ASSERT_EQ(backoff.next().total_milliseconds(), 100); + ASSERT_EQ(toMillis(backoff.next()), 100); diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime; ASSERT_TRUE(diffBackOffTime >= milliseconds(300) && diffBackOffTime < seconds(1)); } diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index c7e98bce..f97457fe 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -955,7 +955,7 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) { auto elapsed = TimeUtils::now() - start; // getLastMessageIdAsync should be blocked until operationTimeout when the connection is disconnected. - ASSERT_GE(elapsed.seconds(), operationTimeout); + ASSERT_GE(std::chrono::duration_cast(elapsed).count(), operationTimeout); } TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index c2863e8c..73778842 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -170,7 +170,7 @@ class PulsarFriend { handler.connection_ = conn; } - static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) { + static auto getFirstBackoffTime(Backoff& backoff) -> decltype(backoff.firstBackoffTime_)& { return backoff.firstBackoffTime_; } diff --git a/tests/RoundRobinMessageRouterTest.cc b/tests/RoundRobinMessageRouterTest.cc index 56a76050..145c45ae 100644 --- a/tests/RoundRobinMessageRouterTest.cc +++ b/tests/RoundRobinMessageRouterTest.cc @@ -31,7 +31,7 @@ TEST(RoundRobinMessageRouterTest, onePartition) { const int numPartitions = 1; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, - boost::posix_time::milliseconds(0)); + std::chrono::milliseconds(0)); Message msg1 = MessageBuilder().setPartitionKey("my-key-1").setContent("one").build(); Message msg2 = MessageBuilder().setPartitionKey("my-key-2").setContent("two").build(); @@ -49,7 +49,7 @@ TEST(RoundRobinMessageRouterTest, sameKey) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, - boost::posix_time::milliseconds(0)); + std::chrono::milliseconds(0)); Message msg1 = MessageBuilder().setPartitionKey("my-key").setContent("one").build(); Message msg2 = MessageBuilder().setPartitionKey("my-key").setContent("two").build(); @@ -63,7 +63,7 @@ TEST(RoundRobinMessageRouterTest, batchingDisabled) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, - boost::posix_time::milliseconds(0)); + std::chrono::milliseconds(0)); Message msg1 = MessageBuilder().setContent("one").build(); Message msg2 = MessageBuilder().setContent("two").build(); @@ -77,7 +77,7 @@ TEST(RoundRobinMessageRouterTest, batchingEnabled) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000, - boost::posix_time::seconds(1)); + std::chrono::seconds(1)); int p = -1; for (int i = 0; i < 100; i++) { @@ -96,7 +96,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000, - boost::posix_time::seconds(1)); + std::chrono::seconds(1)); int p1 = -1; for (int i = 0; i < 100; i++) { @@ -132,8 +132,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) { TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) { const int numPartitions = 13; - RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 1000, - boost::posix_time::seconds(1)); + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 1000, std::chrono::seconds(1)); Message msg1 = MessageBuilder().setContent("one").build(); Message msg2 = MessageBuilder().setContent("two").build(); @@ -150,8 +149,7 @@ TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) { TEST(RoundRobinMessageRouterTest, maxBatchSize) { const int numPartitions = 13; - RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 8, - boost::posix_time::seconds(1)); + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 8, std::chrono::seconds(1)); Message msg1 = MessageBuilder().setContent("one").build(); Message msg2 = MessageBuilder().setContent("two").build(); diff --git a/vcpkg.json b/vcpkg.json index d023a406..5ff44100 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -5,45 +5,20 @@ "builtin-baseline": "b051745c68faa6f65c493371d564c4eb8af34dad", "dependencies": [ { - "name": "boost-accumulators", - "version>=": "1.83.0" - }, - { - "name": "boost-algorithm", - "version>=": "1.83.0" - }, - { - "name": "boost-any", - "version>=": "1.83.0" - }, - { - "name": "boost-asio", - "version>=": "1.83.0" - }, - { - "name": "boost-circular-buffer", - "version>=": "1.83.0" - }, - { - "name": "boost-date-time", - "version>=": "1.83.0" + "name": "asio", + "features": [ + "openssl" + ], + "version>=": "1.28.2" }, { - "name": "boost-predef", + "name": "boost-accumulators", "version>=": "1.83.0" }, { "name": "boost-property-tree", "version>=": "1.83.0" }, - { - "name": "boost-serialization", - "version>=": "1.83.0" - }, - { - "name": "boost-xpressive", - "version>=": "1.83.0" - }, { "name": "curl", "default-features": false,