From fbe7df37e278431261a90ddaddbfb5275904344c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 5 Jan 2024 17:19:30 +0800 Subject: [PATCH] Depend on the independent Asio instead of Boost.Asio by default Fixes https://github.com/apache/pulsar-client-cpp/issues/367 ### Motivation See the difference of Asio and Boost.Asio here: https://think-async.com/Asio/AsioAndBoostAsio.html Asio is updated more frequently than Boost.Asio and its release does not need to be synchronous with other Boost components. Depending on the independent Asio could make it easier for a newer Asio release. ### Modifications Import `asio` 1.28.2 as the dependency and remove the `boost-asio` dependency from the vcpkg.json. Since the latest Asio already removed the `deadline_timer`, this patch replaces all `deadline_timer` with `steady_timer`, which uses `std::chrono` rather than Boost.Date_Time component to compute the timeout. Add a `USE_ASIO` CMake option to determine whether Asio or Boost.Asio is depended. For vcpkg users, the option is always enabled. Finally, simplify the vcpkg.json by removing some `boost-*` dependencies depended indirectly by the rest two major dependencies: - boost-accumulators: latency percentiles computation - boost-property-tree: JSON operations These two dependencies are hard to remove for now unless introducing other dependencies so they will be kept from some time. --- .../workflows/ci-build-binary-artifacts.yaml | 2 + .github/workflows/ci-pr-validation.yaml | 2 + CMakeLists.txt | 11 + LegacyFindPackages.cmake | 4 - lib/AckGroupingTrackerEnabled.cc | 6 +- lib/AckGroupingTrackerEnabled.h | 3 +- lib/AsioDefines.h | 32 +++ lib/{TimeUtils.cc => AsioTimer.h} | 19 +- lib/Backoff.cc | 7 +- lib/Backoff.h | 8 +- lib/ClientConnection.cc | 254 ++++++++---------- lib/ClientConnection.h | 73 +++-- lib/CompressionCodec.cc | 1 - lib/ConnectionPool.cc | 9 +- lib/ConsumerImpl.cc | 19 +- lib/ConsumerImplBase.cc | 4 +- lib/ExecutorService.cc | 24 +- lib/ExecutorService.h | 20 +- lib/HandlerBase.cc | 9 +- lib/HandlerBase.h | 10 +- lib/Int64SerDes.h | 7 +- lib/MultiTopicsConsumerImpl.cc | 10 +- lib/MultiTopicsConsumerImpl.h | 3 +- lib/NegativeAcksTracker.cc | 12 +- lib/NegativeAcksTracker.h | 8 +- lib/OpSendMsg.h | 6 +- lib/PartitionedProducerImpl.cc | 8 +- lib/PartitionedProducerImpl.h | 6 +- lib/PatternMultiTopicsConsumerImpl.cc | 12 +- lib/PatternMultiTopicsConsumerImpl.h | 5 +- lib/PeriodicTask.cc | 8 +- lib/PeriodicTask.h | 2 +- lib/ProducerImpl.cc | 25 +- lib/ProducerImpl.h | 15 +- lib/RetryableOperation.h | 23 +- lib/RoundRobinMessageRouter.cc | 5 +- lib/RoundRobinMessageRouter.h | 6 +- lib/SharedBuffer.h | 27 +- lib/TimeUtils.h | 16 +- lib/UnAckedMessageTrackerEnabled.cc | 6 +- lib/UnAckedMessageTrackerEnabled.h | 3 +- lib/auth/athenz/ZTSClient.cc | 2 - lib/stats/ConsumerStatsImpl.cc | 6 +- lib/stats/ConsumerStatsImpl.h | 5 +- lib/stats/ProducerStatsBase.h | 4 +- lib/stats/ProducerStatsDisabled.h | 2 +- lib/stats/ProducerStatsImpl.cc | 15 +- lib/stats/ProducerStatsImpl.h | 8 +- tests/AuthPluginTest.cc | 12 +- tests/AuthTokenTest.cc | 1 - tests/BackoffTest.cc | 26 +- tests/ConsumerTest.cc | 2 +- tests/PulsarFriend.h | 2 +- tests/RoundRobinMessageRouterTest.cc | 16 +- vcpkg.json | 37 +-- 55 files changed, 441 insertions(+), 427 deletions(-) create mode 100644 lib/AsioDefines.h rename lib/{TimeUtils.cc => AsioTimer.h} (71%) diff --git a/.github/workflows/ci-build-binary-artifacts.yaml b/.github/workflows/ci-build-binary-artifacts.yaml index a2b7be84..63644e5a 100644 --- a/.github/workflows/ci-build-binary-artifacts.yaml +++ b/.github/workflows/ci-build-binary-artifacts.yaml @@ -148,6 +148,7 @@ jobs: mkdir -p $BUILD_DIR cmake -B $BUILD_DIR \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET=${{ matrix.triplet }} \ -DCMAKE_INSTALL_PREFIX=${{ env.INSTALL_DIR }} \ @@ -174,6 +175,7 @@ jobs: mkdir -p $BUILD_DIR cmake -B $BUILD_DIR \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET=${{ matrix.triplet }} \ -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR_DEBUG \ diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 50a5c807..56309e9a 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -191,6 +191,7 @@ jobs: cmake \ -B ./build-1 \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET="${{ matrix.triplet }}" \ -DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \ @@ -232,6 +233,7 @@ jobs: cmake \ -B ./build-2 \ -G "${{ matrix.generator }}" ${{ matrix.arch }} \ + -DUSE_ASIO=ON \ -DBUILD_TESTS=OFF \ -DVCPKG_TRIPLET="${{ matrix.triplet }}" \ -DCMAKE_INSTALL_PREFIX="${{ env.INSTALL_DIR }}" \ diff --git a/CMakeLists.txt b/CMakeLists.txt index 5bde2b77..662e84af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,8 +19,11 @@ cmake_minimum_required(VERSION 3.13) +option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) + option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) if (INTEGRATE_VCPKG) + set(USE_ASIO ON) set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") endif () @@ -129,6 +132,10 @@ if (INTEGRATE_VCPKG) $,zstd::libzstd_shared,zstd::libzstd_static> Snappy::snappy ) + if (USE_ASIO) + find_package(asio CONFIG REQUIRED) + set(COMMON_LIBS ${COMMON_LIBS} asio::asio) + endif () add_definitions(-DHAS_ZSTD -DHAS_SNAPPY) if (MSVC) find_package(dlfcn-win32 CONFIG REQUIRED) @@ -140,6 +147,10 @@ else () include(./LegacyFindPackages.cmake) endif () +if (USE_ASIO) + add_definitions(-DUSE_ASIO) +endif () + set(LIB_NAME $ENV{PULSAR_LIBRARY_NAME}) if (NOT LIB_NAME) set(LIB_NAME pulsar) diff --git a/LegacyFindPackages.cmake b/LegacyFindPackages.cmake index 10c9fa7c..5004545b 100644 --- a/LegacyFindPackages.cmake +++ b/LegacyFindPackages.cmake @@ -176,10 +176,6 @@ if (Boost_MAJOR_VERSION EQUAL 1 AND Boost_MINOR_VERSION LESS 69) MESSAGE(STATUS "Linking with Boost:System") endif() -if (MSVC) - set(BOOST_COMPONENTS ${BOOST_COMPONENTS} date_time) -endif() - if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9) # GCC 4.8.2 implementation of std::regex is buggy set(BOOST_COMPONENTS ${BOOST_COMPONENTS} regex) diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc index d90cc874..7233b2c9 100644 --- a/lib/AckGroupingTrackerEnabled.cc +++ b/lib/AckGroupingTrackerEnabled.cc @@ -117,7 +117,7 @@ void AckGroupingTrackerEnabled::close() { this->flush(); std::lock_guard lock(this->mutexTimer_); if (this->timer_) { - boost::system::error_code ec; + ASIO_ERROR ec; this->timer_->cancel(ec); } } @@ -168,9 +168,9 @@ void AckGroupingTrackerEnabled::scheduleTimer() { std::lock_guard lock(this->mutexTimer_); this->timer_ = this->executor_->createDeadlineTimer(); - this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); + this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); auto self = shared_from_this(); - this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void { + this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void { if (!ec) { this->flush(); this->scheduleTimer(); diff --git a/lib/AckGroupingTrackerEnabled.h b/lib/AckGroupingTrackerEnabled.h index ec1d66be..b04f4059 100644 --- a/lib/AckGroupingTrackerEnabled.h +++ b/lib/AckGroupingTrackerEnabled.h @@ -22,18 +22,17 @@ #include #include -#include #include #include #include #include "AckGroupingTracker.h" +#include "AsioTimer.h" namespace pulsar { class ClientImpl; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; class HandlerBase; diff --git a/lib/AsioDefines.h b/lib/AsioDefines.h new file mode 100644 index 00000000..2e89812c --- /dev/null +++ b/lib/AsioDefines.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// This header defines common macros to use Asio or Boost.Asio. +#pragma once + +#ifdef USE_ASIO +#define ASIO ::asio +#define ASIO_ERROR asio::error_code +#define ASIO_SUCCESS (ASIO_ERROR{}) +#define ASIO_SYSTEM_ERROR asio::system_error +#else +#define ASIO boost::asio +#define ASIO_ERROR boost::system::error_code +#define ASIO_SUCCESS boost::system::errc::make_error_code(boost::system::errc::success) +#define ASIO_SYSTEM_ERROR boost::system::system_error +#endif diff --git a/lib/TimeUtils.cc b/lib/AsioTimer.h similarity index 71% rename from lib/TimeUtils.cc rename to lib/AsioTimer.h index 7eecb86b..d0c3de58 100644 --- a/lib/TimeUtils.cc +++ b/lib/AsioTimer.h @@ -16,17 +16,16 @@ * specific language governing permissions and limitations * under the License. */ +#pragma once -#include "TimeUtils.h" +#ifdef USE_ASIO +#include +#else +#include +#endif -namespace pulsar { +#include -ptime TimeUtils::now() { return microsec_clock::universal_time(); } +#include "AsioDefines.h" -int64_t TimeUtils::currentTimeMillis() { - static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1)); - - time_duration diff = now() - time_t_epoch; - return diff.total_milliseconds(); -} -} // namespace pulsar \ No newline at end of file +using DeadlineTimerPtr = std::shared_ptr; diff --git a/lib/Backoff.cc b/lib/Backoff.cc index fdb13592..e2c43d1c 100644 --- a/lib/Backoff.cc +++ b/lib/Backoff.cc @@ -21,6 +21,9 @@ #include /* time */ #include +#include + +#include "TimeUtils.h" namespace pulsar { @@ -33,8 +36,8 @@ TimeDuration Backoff::next() { // Check for mandatory stop if (!mandatoryStopMade_) { - const boost::posix_time::ptime& now = boost::posix_time::microsec_clock::universal_time(); - TimeDuration timeElapsedSinceFirstBackoff = boost::posix_time::milliseconds(0); + auto now = TimeUtils::now(); + TimeDuration timeElapsedSinceFirstBackoff = std::chrono::nanoseconds(0); if (initial_ == current) { firstBackoffTime_ = now; } else { diff --git a/lib/Backoff.h b/lib/Backoff.h index a59c00f5..d9d7fae3 100644 --- a/lib/Backoff.h +++ b/lib/Backoff.h @@ -20,12 +20,12 @@ #define _PULSAR_BACKOFF_HEADER_ #include -#include +#include #include -namespace pulsar { +#include "TimeUtils.h" -using TimeDuration = boost::posix_time::time_duration; +namespace pulsar { class PULSAR_PUBLIC Backoff { public: @@ -38,7 +38,7 @@ class PULSAR_PUBLIC Backoff { const TimeDuration max_; TimeDuration next_; TimeDuration mandatoryStop_; - boost::posix_time::ptime firstBackoffTime_; + decltype(std::chrono::high_resolution_clock::now()) firstBackoffTime_; std::mt19937 rng_; bool mandatoryStopMade_ = false; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 61aa7f73..82ab4928 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -23,6 +23,7 @@ #include #include +#include "AsioDefines.h" #include "Commands.h" #include "ConnectionPool.h" #include "ConsumerImpl.h" @@ -39,7 +40,7 @@ DECLARE_LOG_OBJECT() -using namespace boost::asio::ip; +using namespace ASIO::ip; namespace pulsar { @@ -162,19 +163,13 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication, const std::string& clientVersion, ConnectionPool& pool, size_t poolIndex) - : operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())), + : operationsTimeout_(std::chrono::seconds(clientConfiguration.getOperationTimeoutSeconds())), authentication_(authentication), serverProtocolVersion_(proto::ProtocolVersion_MIN), executor_(executor), resolver_(executor_->createTcpResolver()), socket_(executor_->createSocket()), -#if BOOST_VERSION >= 107000 - strand_(boost::asio::make_strand(executor_->getIOService().get_executor())), -#elif BOOST_VERSION >= 106600 - strand_(executor_->getIOService().get_executor()), -#else - strand_(executor_->getIOService()), -#endif + strand_(ASIO::make_strand(executor_->getIOService().get_executor())), logicalAddress_(logicalAddress), physicalAddress_(physicalAddress), cnxString_("[ -> " + physicalAddress + "] "), @@ -203,11 +198,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: } if (clientConfiguration.isUseTls()) { -#if BOOST_VERSION >= 105400 - boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client); -#else - boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client); -#endif + ASIO::ssl::context ctx(ASIO::ssl::context::tlsv12_client); Url serviceUrl; Url proxyUrl; Url::parse(physicalAddress, serviceUrl); @@ -219,10 +210,10 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_); } if (clientConfiguration.isTlsAllowInsecureConnection()) { - ctx.set_verify_mode(boost::asio::ssl::context::verify_none); + ctx.set_verify_mode(ASIO::ssl::context::verify_none); isTlsAllowInsecureConnection_ = true; } else { - ctx.set_verify_mode(boost::asio::ssl::context::verify_peer); + ctx.set_verify_mode(ASIO::ssl::context::verify_peer); std::string trustCertFilePath = clientConfiguration.getTlsTrustCertsFilePath(); if (!trustCertFilePath.empty()) { @@ -252,12 +243,12 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: LOG_ERROR(tlsCertificates << ": No such tlsCertificates"); throw ResultAuthenticationError; } - ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem); - ctx.use_certificate_file(tlsCertificates, boost::asio::ssl::context::pem); + ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); + ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem); } else { if (file_exists(tlsPrivateKey) && file_exists(tlsCertificates)) { - ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem); - ctx.use_certificate_file(tlsCertificates, boost::asio::ssl::context::pem); + ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem); + ctx.use_certificate_file(tlsCertificates, ASIO::ssl::context::pem); } } @@ -266,14 +257,13 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); - tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost)); + tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost)); } LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) { - boost::system::error_code ec{static_cast(::ERR_get_error()), - boost::asio::error::get_ssl_category()}; - LOG_ERROR(boost::system::system_error{ec}.what() << ": Error while setting TLS SNI"); + ASIO_ERROR ec{static_cast(::ERR_get_error()), ASIO::error::get_ssl_category()}; + LOG_ERROR(ec.message() << ": Error while setting TLS SNI"); return; } } @@ -310,9 +300,9 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC // Only send keep-alive probes if the broker supports it keepAliveTimer_ = executor_->createDeadlineTimer(); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds)); auto weakSelf = weak_from_this(); - keepAliveTimer_->async_wait([weakSelf](const boost::system::error_code&) { + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); if (self) { self->handleKeepAliveTimeout(); @@ -357,13 +347,12 @@ void ClientConnection::startConsumerStatsTimer(std::vector consumerSta if (consumerStatsRequestTimer_) { consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - consumerStatsRequestTimer_->async_wait( - [weakSelf, consumerStatsRequests](const boost::system::error_code& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleConsumerStatsTimeout(err, consumerStatsRequests); - } - }); + consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleConsumerStatsTimeout(err, consumerStatsRequests); + } + }); } lock.unlock(); // Complex logic since promises need to be fulfilled outside the lock @@ -375,19 +364,19 @@ void ClientConnection::startConsumerStatsTimer(std::vector consumerSta /// The number of unacknowledged probes to send before considering the connection dead and notifying the /// application layer -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_count; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_count; /// The interval between subsequential keepalive probes, regardless of what the connection has exchanged in /// the meantime -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_interval; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_interval; /// The interval between the last data packet sent (simple ACKs are not considered data) and the first /// keepalive /// probe; after the connection is marked to need keepalive, this counter is not used any further #ifdef __APPLE__ -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_idle; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_idle; #else -typedef boost::asio::detail::socket_option::integer tcp_keep_alive_idle; +typedef ASIO::detail::socket_option::integer tcp_keep_alive_idle; #endif /* @@ -396,15 +385,14 @@ typedef boost::asio::detail::socket_option::integer t * if async_connect without any error, connected_ would be set to true * at this point the connection is deemed valid to be used by clients of this class */ -void ClientConnection::handleTcpConnected(const boost::system::error_code& err, - tcp::resolver::iterator endpointIterator) { +void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { if (!err) { std::stringstream cnxStringStream; try { cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; cnxString_ = cnxStringStream.str(); - } catch (const boost::system::system_error& e) { + } catch (const ASIO_SYSTEM_ERROR& e) { LOG_ERROR("Failed to get endpoint: " << e.what()); close(ResultRetryable); return; @@ -424,7 +412,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, state_ = TcpConnected; lock.unlock(); - boost::system::error_code error; + ASIO_ERROR error; socket_->set_option(tcp::no_delay(true), error); if (error) { LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); @@ -457,7 +445,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, if (tlsSocket_) { if (!isTlsAllowInsecureConnection_) { - boost::system::error_code err; + ASIO_ERROR err; Url service_url; if (!Url::parse(physicalAddress_, service_url)) { LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); @@ -466,26 +454,21 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } } auto weakSelf = weak_from_this(); - auto callback = [weakSelf](const boost::system::error_code& err) { + auto callback = [weakSelf](const ASIO_ERROR& err) { auto self = weakSelf.lock(); if (self) { self->handleHandshake(err); } }; -#if BOOST_VERSION >= 106600 - tlsSocket_->async_handshake(boost::asio::ssl::stream::client, - boost::asio::bind_executor(strand_, callback)); -#else - tlsSocket_->async_handshake(boost::asio::ssl::stream::client, - strand_.wrap(callback)); -#endif + tlsSocket_->async_handshake(ASIO::ssl::stream::client, + ASIO::bind_executor(strand_, callback)); } else { - handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success)); + handleHandshake(ASIO_SUCCESS); } } else if (endpointIterator != tcp::resolver::iterator()) { LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); // The connection failed. Try the next endpoint in the list. - boost::system::error_code closeError; + ASIO_ERROR closeError; socket_->close(closeError); // ignore the error of close if (closeError) { LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); @@ -497,15 +480,14 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, connectTimeoutTask_->start(); tcp::endpoint endpoint = *endpointIterator; auto weakSelf = weak_from_this(); - socket_->async_connect(endpoint, - [weakSelf, endpointIterator](const boost::system::error_code& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleTcpConnected(err, endpointIterator); - } - }); + socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleTcpConnected(err, endpointIterator); + } + }); } else { - if (err == boost::asio::error::operation_aborted) { + if (err == ASIO::error::operation_aborted) { // TCP connect timeout, which is not retryable close(); } else { @@ -518,7 +500,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } } -void ClientConnection::handleHandshake(const boost::system::error_code& err) { +void ClientConnection::handleHandshake(const ASIO_ERROR& err) { if (err) { LOG_ERROR(cnxString_ << "Handshake failed: " << err.message()); close(); @@ -537,13 +519,12 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) { // Send CONNECT command to broker auto self = shared_from_this(); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) { + customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { handleSentPulsarConnect(err, buffer); })); } -void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err, - const SharedBuffer& buffer) { +void ClientConnection::handleSentPulsarConnect(const ASIO_ERROR& err, const SharedBuffer& buffer) { if (isClosed()) { return; } @@ -557,8 +538,7 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& readNextCommand(); } -void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err, - const SharedBuffer& buffer) { +void ClientConnection::handleSentAuthResponse(const ASIO_ERROR& err, const SharedBuffer& buffer) { if (isClosed()) { return; } @@ -580,7 +560,7 @@ void ClientConnection::tcpConnectAsync() { return; } - boost::system::error_code err; + ASIO_ERROR err; Url service_url; std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_; if (!Url::parse(hostUrl, service_url)) { @@ -599,17 +579,15 @@ void ClientConnection::tcpConnectAsync() { LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); auto weakSelf = weak_from_this(); - resolver_->async_resolve( - query, [weakSelf](const boost::system::error_code& err, tcp::resolver::iterator iterator) { - auto self = weakSelf.lock(); - if (self) { - self->handleResolve(err, iterator); - } - }); + resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) { + auto self = weakSelf.lock(); + if (self) { + self->handleResolve(err, iterator); + } + }); } -void ClientConnection::handleResolve(const boost::system::error_code& err, - tcp::resolver::iterator endpointIterator) { +void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { if (err) { std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); @@ -642,13 +620,12 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, if (endpointIterator != tcp::resolver::iterator()) { LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // << " to " << endpointIterator->endpoint()); - socket_->async_connect(*endpointIterator, - [weakSelf, endpointIterator](const boost::system::error_code& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleTcpConnected(err, endpointIterator); - } - }); + socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleTcpConnected(err, endpointIterator); + } + }); } else { LOG_WARN(cnxString_ << "No IP address found"); close(); @@ -659,15 +636,13 @@ void ClientConnection::handleResolve(const boost::system::error_code& err, void ClientConnection::readNextCommand() { const static uint32_t minReadSize = sizeof(uint32_t); auto self = shared_from_this(); - asyncReceive( - incomingBuffer_.asio_buffer(), - customAllocReadHandler([this, self](const boost::system::error_code& err, size_t bytesTransferred) { - handleRead(err, bytesTransferred, minReadSize); - })); + asyncReceive(incomingBuffer_.asio_buffer(), + customAllocReadHandler([this, self](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, minReadSize); + })); } -void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred, - uint32_t minReadSize) { +void ClientConnection::handleRead(const ASIO_ERROR& err, size_t bytesTransferred, uint32_t minReadSize) { if (isClosed()) { return; } @@ -675,9 +650,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b incomingBuffer_.bytesWritten(bytesTransferred); if (err || bytesTransferred == 0) { - if (err == boost::asio::error::operation_aborted) { + if (err == ASIO::error::operation_aborted) { LOG_DEBUG(cnxString_ << "Read operation was canceled: " << err.message()); - } else if (bytesTransferred == 0 || err == boost::asio::error::eof) { + } else if (bytesTransferred == 0 || err == ASIO::error::eof) { LOG_DEBUG(cnxString_ << "Server closed the connection: " << err.message()); } else { LOG_ERROR(cnxString_ << "Read operation failed: " << err.message()); @@ -689,11 +664,11 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred); auto self = shared_from_this(); auto nextMinReadSize = minReadSize - bytesTransferred; - asyncReceive(buffer.asio_buffer(), customAllocReadHandler([this, self, nextMinReadSize]( - const boost::system::error_code& err, - size_t bytesTransferred) { - handleRead(err, bytesTransferred, nextMinReadSize); - })); + asyncReceive(buffer.asio_buffer(), + customAllocReadHandler( + [this, self, nextMinReadSize](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, nextMinReadSize); + })); } else { processIncomingBuffer(); } @@ -720,12 +695,11 @@ void ClientConnection::processIncomingBuffer() { incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize); } auto self = shared_from_this(); - asyncReceive( - incomingBuffer_.asio_buffer(), - customAllocReadHandler([this, self, bytesToReceive](const boost::system::error_code& err, - size_t bytesTransferred) { - handleRead(err, bytesTransferred, bytesToReceive); - })); + asyncReceive(incomingBuffer_.asio_buffer(), + customAllocReadHandler( + [this, self, bytesToReceive](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, bytesToReceive); + })); return; } @@ -803,11 +777,11 @@ void ClientConnection::processIncomingBuffer() { uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes(); auto self = shared_from_this(); - asyncReceive(incomingBuffer_.asio_buffer(), - customAllocReadHandler([this, self, minReadSize](const boost::system::error_code& err, - size_t bytesTransferred) { - handleRead(err, bytesTransferred, minReadSize); - })); + asyncReceive( + incomingBuffer_.asio_buffer(), + customAllocReadHandler([this, self, minReadSize](const ASIO_ERROR& err, size_t bytesTransferred) { + handleRead(err, bytesTransferred, minReadSize); + })); return; } @@ -1056,7 +1030,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request requestData.timer = executor_->createDeadlineTimer(); requestData.timer->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const boost::system::error_code& ec) { + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleLookupTimeout(ec, requestData); @@ -1082,11 +1056,7 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) { self->sendCommandInternal(cmd); } }; -#if BOOST_VERSION >= 106600 - boost::asio::post(strand_, callback); -#else - strand_.post(callback); -#endif + ASIO::post(strand_, callback); } else { sendCommandInternal(cmd); } @@ -1099,8 +1069,9 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) { void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) { auto self = shared_from_this(); asyncWrite(cmd.const_asio_buffer(), - customAllocWriteHandler([this, self, cmd](const boost::system::error_code& err, - size_t bytesTransferred) { handleSend(err, cmd); })); + customAllocWriteHandler([this, self, cmd](const ASIO_ERROR& err, size_t bytesTransferred) { + handleSend(err, cmd); + })); } void ClientConnection::sendMessage(const std::shared_ptr& args) { @@ -1116,21 +1087,18 @@ void ClientConnection::sendMessage(const std::shared_ptr& args) { // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the // callback is called, an invalid buffer range might be passed to the underlying socket send. asyncWrite(buffer, customAllocWriteHandler( - [this, self, buffer](const boost::system::error_code& err, - size_t bytesTransferred) { handleSendPair(err); })); + [this, self, buffer](const ASIO_ERROR& err, size_t bytesTransferred) { + handleSendPair(err); + })); }; if (tlsSocket_) { -#if BOOST_VERSION >= 106600 - boost::asio::post(strand_, sendMessageInternal); -#else - strand_.post(sendMessageInternal); -#endif + ASIO::post(strand_, sendMessageInternal); } else { sendMessageInternal(); } } -void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) { +void ClientConnection::handleSend(const ASIO_ERROR& err, const SharedBuffer&) { if (isClosed()) { return; } @@ -1142,7 +1110,7 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh } } -void ClientConnection::handleSendPair(const boost::system::error_code& err) { +void ClientConnection::handleSendPair(const ASIO_ERROR& err) { if (isClosed()) { return; } @@ -1166,8 +1134,8 @@ void ClientConnection::sendPendingCommands() { if (any.type() == typeid(SharedBuffer)) { SharedBuffer buffer = boost::any_cast(any); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, - size_t) { handleSend(err, buffer); })); + customAllocWriteHandler( + [this, self, buffer](const ASIO_ERROR& err, size_t) { handleSend(err, buffer); })); } else { assert(any.type() == typeid(std::shared_ptr)); @@ -1178,9 +1146,9 @@ void ClientConnection::sendPendingCommands() { // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the // callback is called, an invalid buffer range might be passed to the underlying socket send. - asyncWrite(buffer, - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, - size_t) { handleSendPair(err); })); + asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { + handleSendPair(err); + })); } } else { // No more pending writes @@ -1202,7 +1170,7 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm requestData.timer = executor_->createDeadlineTimer(); requestData.timer->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const boost::system::error_code& ec) { + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleRequestTimeout(ec, requestData); @@ -1216,21 +1184,19 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm return requestData.promise.getFuture(); } -void ClientConnection::handleRequestTimeout(const boost::system::error_code& ec, - PendingRequestData pendingRequestData) { +void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, PendingRequestData pendingRequestData) { if (!ec && !pendingRequestData.hasGotResponse->load()) { pendingRequestData.promise.setFailed(ResultTimeout); } } -void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec, - LookupRequestData pendingRequestData) { +void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, LookupRequestData pendingRequestData) { if (!ec) { pendingRequestData.promise->setFailed(ResultTimeout); } } -void ClientConnection::handleGetLastMessageIdTimeout(const boost::system::error_code& ec, +void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec, ClientConnection::LastMessageIdRequestData data) { if (!ec) { data.promise->setFailed(ResultTimeout); @@ -1255,9 +1221,9 @@ void ClientConnection::handleKeepAliveTimeout() { // be zero And we do not attempt to dereference the pointer. Lock lock(mutex_); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds)); + keepAliveTimer_->expires_from_now(std::chrono::seconds(KeepAliveIntervalInSeconds)); auto weakSelf = weak_from_this(); - keepAliveTimer_->async_wait([weakSelf](const boost::system::error_code&) { + keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); if (self) { self->handleKeepAliveTimeout(); @@ -1268,7 +1234,7 @@ void ClientConnection::handleKeepAliveTimeout() { } } -void ClientConnection::handleConsumerStatsTimeout(const boost::system::error_code& ec, +void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, std::vector consumerStatsRequests) { if (ec) { LOG_DEBUG(cnxString_ << " Ignoring timer cancelled event, code[" << ec << "]"); @@ -1285,15 +1251,15 @@ void ClientConnection::close(Result result, bool detach) { state_ = Disconnected; if (socket_) { - boost::system::error_code err; - socket_->shutdown(boost::asio::socket_base::shutdown_both, err); + ASIO_ERROR err; + socket_->shutdown(ASIO::socket_base::shutdown_both, err); socket_->close(err); if (err) { LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); } } if (tlsSocket_) { - boost::system::error_code err; + ASIO_ERROR err; tlsSocket_->lowest_layer().close(err); if (err) { LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message()); @@ -1432,7 +1398,7 @@ Future ClientConnection::newGetLastMessageId(u requestData.timer = executor_->createDeadlineTimer(); requestData.timer->expires_from_now(operationsTimeout_); auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const boost::system::error_code& ec) { + requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleGetLastMessageIdTimeout(ec, requestData); @@ -1823,7 +1789,7 @@ void ClientConnection::handleAuthChallenge() { } auto self = shared_from_this(); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) { + customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { handleSentAuthResponse(err, buffer); })); } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 1bc1bd85..69155fdd 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -23,13 +23,20 @@ #include #include -#include +#ifdef USE_ASIO +#include +#include +#include +#include +#include +#else #include -#include #include #include #include #include +#endif +#include #include #include #include @@ -37,19 +44,18 @@ #include #include +#include "AsioTimer.h" #include "Commands.h" #include "GetLastMessageIdResponse.h" #include "LookupDataResult.h" #include "SharedBuffer.h" +#include "TimeUtils.h" #include "UtilAllocator.h" - namespace pulsar { class PulsarFriend; -using DeadlineTimerPtr = std::shared_ptr; -using TimeDuration = boost::posix_time::time_duration; -using TcpResolverPtr = std::shared_ptr; +using TcpResolverPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; @@ -114,10 +120,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this SocketPtr; - typedef std::shared_ptr> TlsSocketPtr; + typedef std::shared_ptr SocketPtr; + typedef std::shared_ptr> TlsSocketPtr; typedef std::shared_ptr ConnectionPtr; - typedef std::function ConnectionListener; + typedef std::function ConnectionListener; typedef std::vector::iterator ListenerIterator; /* @@ -224,17 +230,16 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= 106600 - boost::asio::async_write(*tlsSocket_, buffers, boost::asio::bind_executor(strand_, handler)); -#else - boost::asio::async_write(*tlsSocket_, buffers, strand_.wrap(handler)); -#endif + ASIO::async_write(*tlsSocket_, buffers, ASIO::bind_executor(strand_, handler)); } else { - boost::asio::async_write(*socket_, buffers, handler); + ASIO::async_write(*socket_, buffers, handler); } } @@ -296,11 +296,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= 106600 - tlsSocket_->async_read_some(buffers, boost::asio::bind_executor(strand_, handler)); -#else - tlsSocket_->async_read_some(buffers, strand_.wrap(handler)); -#endif + tlsSocket_->async_read_some(buffers, ASIO::bind_executor(strand_, handler)); } else { socket_->async_receive(buffers, handler); } @@ -321,11 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this= 106600 - boost::asio::strand strand_; -#else - boost::asio::io_service::strand strand_; -#endif + ASIO::strand strand_; const std::string logicalAddress_; /* @@ -343,7 +335,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this consumerStatsRequests); + void handleConsumerStatsTimeout(const ASIO_ERROR& ec, std::vector consumerStatsRequests); void startConsumerStatsTimer(std::vector consumerStatsRequests); uint32_t maxPendingLookupRequest_; diff --git a/lib/CompressionCodec.cc b/lib/CompressionCodec.cc index 991d52c0..6105887a 100644 --- a/lib/CompressionCodec.cc +++ b/lib/CompressionCodec.cc @@ -45,7 +45,6 @@ CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compression default: return compressionCodecNone_; } - BOOST_THROW_EXCEPTION(std::logic_error("Invalid CompressionType enumeration value")); } SharedBuffer CompressionCodecNone::encode(const SharedBuffer& raw) { return raw; } diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index 95170a94..4cc8883a 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -18,15 +18,20 @@ */ #include "ConnectionPool.h" +#ifdef USE_ASIO +#include +#include +#else #include #include +#endif #include "ClientConnection.h" #include "ExecutorService.h" #include "LogUtils.h" -using boost::asio::ip::tcp; -namespace ssl = boost::asio::ssl; +using ASIO::ip::tcp; +namespace ssl = ASIO::ssl; typedef ssl::stream ssl_socket; DECLARE_LOG_OBJECT() diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index f9770d80..52162189 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -26,6 +26,7 @@ #include "AckGroupingTracker.h" #include "AckGroupingTrackerDisabled.h" #include "AckGroupingTrackerEnabled.h" +#include "AsioDefines.h" #include "BatchMessageAcker.h" #include "BatchedMessageIdImpl.h" #include "BitSet.h" @@ -55,6 +56,9 @@ namespace pulsar { DECLARE_LOG_OBJECT() +using std::chrono::milliseconds; +using std::chrono::seconds; + ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, bool isPersistent, const ConsumerInterceptorsPtr& interceptors, @@ -402,10 +406,9 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b } void ConsumerImpl::triggerCheckExpiredChunkedTimer() { - checkExpiredChunkedTimer_->expires_from_now( - boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); std::weak_ptr weakSelf{shared_from_this()}; - checkExpiredChunkedTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) -> void { + checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { auto self = weakSelf.lock(); if (!self) { return; @@ -1581,7 +1584,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time } } else { TimeDuration next = std::min(remainTime, backoff->next()); - if (next.total_milliseconds() <= 0) { + if (toMillis(next) <= 0) { LOG_ERROR(getName() << " Client Connection not ready for Consumer"); callback(ResultNotConnected, MessageId()); return; @@ -1592,8 +1595,8 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time auto self = shared_from_this(); timer->async_wait([this, backoff, remainTime, timer, next, callback, - self](const boost::system::error_code& ec) -> void { - if (ec == boost::asio::error::operation_aborted) { + self](const ASIO_ERROR& ec) -> void { + if (ec == ASIO::error::operation_aborted) { LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "]."); return; } @@ -1602,7 +1605,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time return; } LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in " - << next.total_milliseconds() << " ms") + << toMillis(next) << " ms") this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback); }); } @@ -1693,7 +1696,7 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { } void ConsumerImpl::cancelTimers() noexcept { - boost::system::error_code ec; + ASIO_ERROR ec; batchReceiveTimer_->cancel(ec); checkExpiredChunkedTimer_->cancel(ec); unAckedMessageTrackerPtr_->stop(); diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc index 39212371..851d41e8 100644 --- a/lib/ConsumerImplBase.cc +++ b/lib/ConsumerImplBase.cc @@ -51,9 +51,9 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { if (timeoutMs > 0) { - batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs)); + batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs)); std::weak_ptr weakSelf{shared_from_this()}; - batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self && !ec) { self->doBatchReceiveTimeTask(); diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc index 53be8eaf..794e3619 100644 --- a/lib/ExecutorService.cc +++ b/lib/ExecutorService.cc @@ -32,7 +32,7 @@ void ExecutorService::start() { auto self = shared_from_this(); std::thread t{[this, self] { LOG_DEBUG("Run io_service in a single thread"); - boost::system::error_code ec; + ASIO_ERROR ec; while (!closed_) { io_service_.restart(); IOService::work work{getIOService()}; @@ -63,22 +63,22 @@ ExecutorServicePtr ExecutorService::create() { } /* - * factory method of boost::asio::ip::tcp::socket associated with io_service_ instance + * factory method of ASIO::ip::tcp::socket associated with io_service_ instance * @ returns shared_ptr to this socket */ SocketPtr ExecutorService::createSocket() { try { - return SocketPtr(new boost::asio::ip::tcp::socket(io_service_)); - } catch (const boost::system::system_error &e) { + return SocketPtr(new ASIO::ip::tcp::socket(io_service_)); + } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create socket: ") + e.what(); throw std::runtime_error(error); } } -TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) { - return std::shared_ptr>( - new boost::asio::ssl::stream(*socket, ctx)); +TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::context &ctx) { + return std::shared_ptr>( + new ASIO::ssl::stream(*socket, ctx)); } /* @@ -87,8 +87,8 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ss */ TcpResolverPtr ExecutorService::createTcpResolver() { try { - return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_)); - } catch (const boost::system::system_error &e) { + return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_)); + } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create resolver: ") + e.what(); throw std::runtime_error(error); @@ -97,10 +97,10 @@ TcpResolverPtr ExecutorService::createTcpResolver() { DeadlineTimerPtr ExecutorService::createDeadlineTimer() { try { - return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_)); - } catch (const boost::system::system_error &e) { + return DeadlineTimerPtr(new ASIO::steady_timer(io_service_)); + } catch (const ASIO_SYSTEM_ERROR &e) { restart(); - auto error = std::string("Failed to create deadline_timer: ") + e.what(); + auto error = std::string("Failed to create steady_timer: ") + e.what(); throw std::runtime_error(error); } } diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h index a373c0af..89d06d30 100644 --- a/lib/ExecutorService.h +++ b/lib/ExecutorService.h @@ -22,10 +22,15 @@ #include #include -#include +#ifdef USE_ASIO +#include +#include +#include +#else #include #include #include +#endif #include #include #include @@ -33,14 +38,15 @@ #include #include +#include "AsioTimer.h" + namespace pulsar { -typedef std::shared_ptr SocketPtr; -typedef std::shared_ptr > TlsSocketPtr; -typedef std::shared_ptr TcpResolverPtr; -typedef std::shared_ptr DeadlineTimerPtr; +typedef std::shared_ptr SocketPtr; +typedef std::shared_ptr > TlsSocketPtr; +typedef std::shared_ptr TcpResolverPtr; class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this { public: - using IOService = boost::asio::io_service; + using IOService = ASIO::io_service; using SharedPtr = std::shared_ptr; static SharedPtr create(); @@ -51,7 +57,7 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_thisgetIOExecutorProvider()->get()), mutex_(), creationTimestamp_(TimeUtils::now()), - operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), + operationTimeut_(std::chrono::seconds(client->conf().getOperationTimeoutSeconds())), state_(NotStarted), backoff_(backoff), epoch_(0), @@ -147,13 +148,13 @@ void HandlerBase::scheduleReconnection() { if (state == Pending || state == Ready) { TimeDuration delay = backoff_.next(); - LOG_INFO(getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s"); + LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); timer_->expires_from_now(delay); // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled // so we will not run into the case where grabCnx is invoked on out of scope handler auto name = getName(); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([name, weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([name, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->handleTimeout(ec); @@ -164,7 +165,7 @@ void HandlerBase::scheduleReconnection() { } } -void HandlerBase::handleTimeout(const boost::system::error_code& ec) { +void HandlerBase::handleTimeout(const ASIO_ERROR& ec) { if (ec) { LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec << "]"); return; diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index f62c4df0..68c0b6a6 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -20,20 +20,17 @@ #define _PULSAR_HANDLER_BASE_HEADER_ #include -#include #include #include #include +#include "AsioTimer.h" #include "Backoff.h" #include "Future.h" +#include "TimeUtils.h" namespace pulsar { -using namespace boost::posix_time; -using boost::posix_time::milliseconds; -using boost::posix_time::seconds; - class ClientImpl; using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; @@ -42,7 +39,6 @@ using ClientConnectionPtr = std::shared_ptr; using ClientConnectionWeakPtr = std::weak_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class HandlerBase : public std::enable_shared_from_this { public: @@ -95,7 +91,7 @@ class HandlerBase : public std::enable_shared_from_this { void handleDisconnection(Result result, const ClientConnectionPtr& cnx); - void handleTimeout(const boost::system::error_code& ec); + void handleTimeout(const ASIO_ERROR& ec); protected: ClientImplWeakPtr client_; diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h index dbc5d8a7..f1f5eef3 100644 --- a/lib/Int64SerDes.h +++ b/lib/Int64SerDes.h @@ -20,7 +20,12 @@ #include -#include // for ntohl +// for ntohl +#ifdef USE_ASIO +#include +#else +#include +#endif namespace pulsar { diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 15f9d9b8..af70623d 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -18,6 +18,7 @@ */ #include "MultiTopicsConsumerImpl.h" +#include #include #include "ClientImpl.h" @@ -37,6 +38,9 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +using std::chrono::milliseconds; +using std::chrono::seconds; + MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, @@ -90,7 +94,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std auto partitionsUpdateInterval = static_cast(client->conf().getPartitionsUpdateInterval()); if (partitionsUpdateInterval > 0) { partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); - partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); + partitionsUpdateInterval_ = seconds(partitionsUpdateInterval); lookupServicePtr_ = client->getLookup(); } @@ -936,7 +940,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { void MultiTopicsConsumerImpl::runPartitionUpdateTask() { partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); auto weakSelf = weak_from_this(); - partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it // cannot continue at this time, and the request needs to be ignored. auto self = weakSelf.lock(); @@ -1087,7 +1091,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { void MultiTopicsConsumerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { - boost::system::error_code ec; + ASIO_ERROR ec; partitionsUpdateTimer_->cancel(ec); } } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index d4127f63..c5834eaa 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -32,6 +32,7 @@ #include "LookupDataResult.h" #include "SynchronizedHashMap.h" #include "TestUtil.h" +#include "TimeUtils.h" #include "UnboundedBlockingQueue.h" namespace pulsar { @@ -119,7 +120,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { std::atomic_int incomingMessagesSize_ = {0}; MessageListener messageListener_; DeadlineTimerPtr partitionsUpdateTimer_; - boost::posix_time::time_duration partitionsUpdateInterval_; + TimeDuration partitionsUpdateInterval_; LookupServicePtr lookupServicePtr_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 0dd73589..e443496d 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -40,9 +40,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con nackDelay_ = std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), MIN_NACK_DELAY_MILLIS)); - timerInterval_ = boost::posix_time::milliseconds((long)(nackDelay_.count() / 3)); - LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() - << " ms - Timer interval: " << timerInterval_); + timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3)); + LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() << " ms - Timer interval: " + << timerInterval_.count()); } void NegativeAcksTracker::scheduleTimer() { @@ -51,14 +51,14 @@ void NegativeAcksTracker::scheduleTimer() { } std::weak_ptr weakSelf{shared_from_this()}; timer_->expires_from_now(timerInterval_); - timer_->async_wait([weakSelf](const boost::system::error_code &ec) { + timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { if (auto self = weakSelf.lock()) { self->handleTimer(ec); } }); } -void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { +void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) { if (ec) { // Ignore cancelled events return; @@ -107,7 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) { void NegativeAcksTracker::close() { closed_ = true; - boost::system::error_code ec; + ASIO_ERROR ec; timer_->cancel(ec); std::lock_guard lock(mutex_); nackedMessages_.clear(); diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index 4b489844..472e9763 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -23,12 +23,13 @@ #include #include -#include #include #include #include #include +#include "AsioDefines.h" +#include "AsioTimer.h" #include "TestUtil.h" namespace pulsar { @@ -36,7 +37,6 @@ namespace pulsar { class ConsumerImpl; class ClientImpl; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; @@ -56,13 +56,13 @@ class NegativeAcksTracker : public std::enable_shared_from_this nackedMessages_; diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index a1319e10..46fd9c1c 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -23,8 +23,6 @@ #include #include -#include - #include "ChunkMessageIdImpl.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" @@ -53,7 +51,7 @@ struct OpSendMsg { const int32_t numChunks; const uint32_t messagesCount; const uint64_t messagesSize; - const boost::posix_time::ptime timeout; + const ptime timeout; const SendCallback sendCallback; std::vector> trackerCallbacks; ChunkMessageIdListPtr chunkMessageIdList; @@ -98,7 +96,7 @@ struct OpSendMsg { numChunks(metadata.num_chunks_from_msg()), messagesCount(messagesCount), messagesSize(messagesSize), - timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), + timeout(TimeUtils::now() + std::chrono::milliseconds(sendTimeoutMs)), sendCallback(std::move(callback)), chunkMessageIdList(std::move(chunkMessageIdList)), sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 54e96c84..4178096c 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -58,7 +58,7 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top if (partitionsUpdateInterval > 0) { listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); - partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval); + partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); lookupServicePtr_ = client->getLookup(); } } @@ -69,7 +69,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { return std::make_shared( conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(), conf_.getBatchingMaxAllowedSizeInBytes(), - boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); + std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs())); case ProducerConfiguration::CustomPartition: return conf_.getMessageRouterPtr(); case ProducerConfiguration::UseSinglePartition: @@ -422,7 +422,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { void PartitionedProducerImpl::runPartitionUpdateTask() { auto weakSelf = weak_from_this(); partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); - partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) { + partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->getPartitionMetadata(); @@ -524,7 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { void PartitionedProducerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { - boost::system::error_code ec; + ASIO_ERROR ec; partitionsUpdateTimer_->cancel(ec); } } diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 2d07a81a..610c74ed 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -20,21 +20,21 @@ #include #include -#include #include #include #include +#include "AsioTimer.h" #include "LookupDataResult.h" #include "ProducerImplBase.h" #include "ProducerInterceptors.h" +#include "TimeUtils.h" namespace pulsar { class ClientImpl; using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; class LookupService; @@ -128,7 +128,7 @@ class PartitionedProducerImpl : public ProducerImplBase, ExecutorServicePtr listenerExecutor_; DeadlineTimerPtr partitionsUpdateTimer_; - boost::posix_time::time_duration partitionsUpdateInterval_; + TimeDuration partitionsUpdateInterval_; LookupServicePtr lookupServicePtr_; ProducerInterceptorsPtr interceptors_; diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index 23e445ee..4fc7bb61 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -27,6 +27,8 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +using std::chrono::seconds; + PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl( ClientImplPtr client, const std::string pattern, CommandGetTopicsOfNamespace_Mode getTopicsMode, const std::vector& topics, const std::string& subscriptionName, @@ -49,15 +51,15 @@ void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); auto weakSelf = weak_from_this(); - autoDiscoveryTimer_->async_wait([weakSelf](const boost::system::error_code& err) { + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { if (auto self = weakSelf.lock()) { self->autoDiscoveryTimerTask(err); } }); } -void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system::error_code& err) { - if (err == boost::asio::error::operation_aborted) { +void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const ASIO_ERROR& err) { + if (err == ASIO::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); return; } else if (err) { @@ -228,7 +230,7 @@ void PatternMultiTopicsConsumerImpl::start() { if (conf_.getPatternAutoDiscoveryPeriod() > 0) { autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); auto weakSelf = weak_from_this(); - autoDiscoveryTimer_->async_wait([weakSelf](const boost::system::error_code& err) { + autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { if (auto self = weakSelf.lock()) { self->autoDiscoveryTimerTask(err); } @@ -247,6 +249,6 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { } void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { - boost::system::error_code ec; + ASIO_ERROR ec; autoDiscoveryTimer_->cancel(ec); } diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h index 5d3ba9ec..f272df22 100644 --- a/lib/PatternMultiTopicsConsumerImpl.h +++ b/lib/PatternMultiTopicsConsumerImpl.h @@ -22,6 +22,7 @@ #include #include +#include "AsioTimer.h" #include "LookupDataResult.h" #include "MultiTopicsConsumerImpl.h" #include "NamespaceName.h" @@ -56,7 +57,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { const PULSAR_REGEX_NAMESPACE::regex getPattern(); - void autoDiscoveryTimerTask(const boost::system::error_code& err); + void autoDiscoveryTimerTask(const ASIO_ERROR& err); // filter input `topics` with given `pattern`, return matched topics. Do not match topic domain. static NamespaceTopicsPtr topicsPatternFilter(const std::vector& topics, @@ -74,7 +75,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl { const std::string patternString_; const PULSAR_REGEX_NAMESPACE::regex pattern_; const CommandGetTopicsOfNamespace_Mode getTopicsMode_; - typedef std::shared_ptr TimerPtr; + typedef std::shared_ptr TimerPtr; TimerPtr autoDiscoveryTimer_; bool autoDiscoveryRunning_; NamespaceNamePtr namespaceName_; diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc index 6046eae2..9fde012a 100644 --- a/lib/PeriodicTask.cc +++ b/lib/PeriodicTask.cc @@ -18,6 +18,8 @@ */ #include "PeriodicTask.h" +#include + namespace pulsar { void PeriodicTask::start() { @@ -27,7 +29,7 @@ void PeriodicTask::start() { state_ = Ready; if (periodMs_ >= 0) { std::weak_ptr weakSelf{shared_from_this()}; - timer_->expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); timer_->async_wait([weakSelf](const ErrorCode& ec) { auto self = weakSelf.lock(); if (self) { @@ -48,7 +50,7 @@ void PeriodicTask::stop() noexcept { } void PeriodicTask::handleTimeout(const ErrorCode& ec) { - if (state_ != Ready || ec.value() == boost::system::errc::operation_canceled) { + if (state_ != Ready || ec == ASIO::error::operation_aborted) { return; } @@ -57,7 +59,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { // state_ may be changed in handleTimeout, so we check state_ again if (state_ == Ready) { auto self = shared_from_this(); - timer_->expires_from_now(boost::posix_time::millisec(periodMs_)); + timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); } } diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h index 5de81ae4..bc186348 100644 --- a/lib/PeriodicTask.h +++ b/lib/PeriodicTask.h @@ -36,7 +36,7 @@ namespace pulsar { */ class PeriodicTask : public std::enable_shared_from_this { public: - using ErrorCode = boost::system::error_code; + using ErrorCode = ASIO_ERROR; using CallbackType = std::function; enum State : std::uint8_t diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 0a129259..fc39b231 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -20,7 +20,7 @@ #include -#include +#include #include "BatchMessageContainer.h" #include "BatchMessageKeyBasedContainer.h" @@ -46,6 +46,8 @@ namespace pulsar { DECLARE_LOG_OBJECT() +using std::chrono::milliseconds; + ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors, int32_t partition, bool retryOnCreationError) @@ -465,7 +467,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { Producer producer = Producer(shared_from_this()); auto interceptorMessage = interceptors_->beforeSend(producer, msg); - const auto now = boost::posix_time::microsec_clock::universal_time(); + const auto now = TimeUtils::now(); auto self = shared_from_this(); sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage]( Result result, const MessageId& messageId) { @@ -564,10 +566,9 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); bool isFull = batchMessageContainer_->add(msg, callback); if (isFirstMessage) { - batchTimer_->expires_from_now( - boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); + batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs())); auto weakSelf = weak_from_this(); - batchTimer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; @@ -824,14 +825,14 @@ Future ProducerImpl::getProducerCreatedFuture() uint64_t ProducerImpl::getProducerId() const { return producerId_; } -void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { +void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) { const auto state = state_.load(); if (state != Pending && state != Ready) { return; } Lock lock(mutex_); - if (err == boost::asio::error::operation_aborted) { + if (err == ASIO::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); return; } else if (err) { @@ -847,8 +848,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { } else { // If there is at least one message, calculate the diff between the message timeout and // the current time. - time_duration diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now(); - if (diff.total_milliseconds() <= 0) { + auto diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now(); + if (toMillis(diff) <= 0) { // The diff is less than or equal to zero, meaning that the message has been expired. LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks."); pendingMessages = getPendingCallbacksWhenFailed(); @@ -856,7 +857,7 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout())); } else { // The diff is greater than zero, set the timeout to the diff value - LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff); + LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff.count()); asyncWaitSendTimeout(diff); } } @@ -1000,7 +1001,7 @@ void ProducerImpl::shutdown() { void ProducerImpl::cancelTimers() noexcept { dataKeyRefreshTask_.stop(); - boost::system::error_code ec; + ASIO_ERROR ec; batchTimer_->cancel(ec); sendTimer_->cancel(ec); } @@ -1026,7 +1027,7 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { sendTimer_->expires_from_now(expiryTime); auto weakSelf = weak_from_this(); - sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) { + sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { auto self = weakSelf.lock(); if (self) { std::static_pointer_cast(self)->handleSendTimeout(err); diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index b467458d..97816050 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -19,6 +19,12 @@ #ifndef LIB_PRODUCERIMPL_H_ #define LIB_PRODUCERIMPL_H_ +#include "TimeUtils.h" +#ifdef USE_ASIO +#include +#else +#include +#endif #include #include #include @@ -30,6 +36,7 @@ #if defined(_MSC_VER) || defined(__APPLE__) #include "OpSendMsg.h" #endif +#include "AsioDefines.h" #include "PendingFailures.h" #include "PeriodicTask.h" #include "ProducerImplBase.h" @@ -39,7 +46,7 @@ namespace pulsar { class BatchMessageContainerBase; class ClientImpl; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; +using DeadlineTimerPtr = std::shared_ptr; class MessageCrypto; using MessageCryptoPtr = std::shared_ptr; class ProducerImpl; @@ -137,7 +144,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { void resendMessages(ClientConnectionPtr cnx); - void refreshEncryptionKey(const boost::system::error_code& ec); + void refreshEncryptionKey(const ASIO_ERROR& ec); bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, SharedBuffer& encryptedPayload); @@ -183,8 +190,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { std::string schemaVersion_; DeadlineTimerPtr sendTimer_; - void handleSendTimeout(const boost::system::error_code& err); - using DurationType = typename boost::asio::deadline_timer::duration_type; + void handleSendTimeout(const ASIO_ERROR& err); + using DurationType = TimeDuration; void asyncWaitSendTimeout(DurationType expiryTime); Promise producerCreatedPromise_; diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index d026e424..9c920da1 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -30,6 +30,7 @@ #include "Future.h" #include "LogUtils.h" #include "ResultUtils.h" +#include "TimeUtils.h" namespace pulsar { @@ -43,9 +44,8 @@ class RetryableOperation : public std::enable_shared_from_thiscancel(ec); } @@ -100,7 +100,7 @@ class RetryableOperation : public std::enable_shared_from_thisexpires_from_now(delay); auto nextRemainingTime = remainingTime - delay; - LOG_INFO("Reschedule " << name_ << " for " << delay.total_milliseconds() - << " ms, remaining time: " << nextRemainingTime.total_milliseconds() - << " ms"); - timer_->async_wait([this, weakSelf, nextRemainingTime](const boost::system::error_code& ec) { + LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay) + << " ms, remaining time: " << toMillis(nextRemainingTime) << " ms"); + timer_->async_wait([this, weakSelf, nextRemainingTime](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; } if (ec) { - if (ec == boost::asio::error::operation_aborted) { + if (ec == ASIO::error::operation_aborted) { LOG_DEBUG("Timer for " << name_ << " is cancelled"); promise_.setFailed(ResultTimeout); } else { LOG_WARN("Timer for " << name_ << " failed: " << ec.message()); } } else { - LOG_DEBUG("Run operation " << name_ << ", remaining time: " - << nextRemainingTime.total_milliseconds() << " ms"); + LOG_DEBUG("Run operation " << name_ << ", remaining time: " << toMillis(nextRemainingTime) + << " ms"); runImpl(nextRemainingTime); } }); diff --git a/lib/RoundRobinMessageRouter.cc b/lib/RoundRobinMessageRouter.cc index 9693cc25..1bc0b306 100644 --- a/lib/RoundRobinMessageRouter.cc +++ b/lib/RoundRobinMessageRouter.cc @@ -26,8 +26,7 @@ namespace pulsar { RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, bool batchingEnabled, uint32_t maxBatchingMessages, - uint32_t maxBatchingSize, - boost::posix_time::time_duration maxBatchingDelay) + uint32_t maxBatchingSize, TimeDuration maxBatchingDelay) : MessageRouterBase(hashingScheme), batchingEnabled_(batchingEnabled), maxBatchingMessages_(maxBatchingMessages), @@ -74,7 +73,7 @@ int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadat int64_t now = TimeUtils::currentTimeMillis(); if (messageCount >= maxBatchingMessages_ || (messageSize >= maxBatchingSize_ - batchSize) || - (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) { + (now - lastPartitionChange >= toMillis(maxBatchingDelay_))) { uint32_t currentPartitionCursor = ++currentPartitionCursor_; lastPartitionChange_ = now; cumulativeBatchSize_ = messageSize; diff --git a/lib/RoundRobinMessageRouter.h b/lib/RoundRobinMessageRouter.h index 753573a1..03cfac80 100644 --- a/lib/RoundRobinMessageRouter.h +++ b/lib/RoundRobinMessageRouter.h @@ -23,16 +23,16 @@ #include #include -#include #include "MessageRouterBase.h" +#include "TimeUtils.h" namespace pulsar { class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase { public: RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, bool batchingEnabled, uint32_t maxBatchingMessages, uint32_t maxBatchingSize, - boost::posix_time::time_duration maxBatchingDelay); + TimeDuration maxBatchingDelay); virtual ~RoundRobinMessageRouter(); virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata); @@ -40,7 +40,7 @@ class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase { const bool batchingEnabled_; const uint32_t maxBatchingMessages_; const uint32_t maxBatchingSize_; - const boost::posix_time::time_duration maxBatchingDelay_; + const TimeDuration maxBatchingDelay_; std::atomic currentPartitionCursor_; std::atomic lastPartitionChange_; diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h index 7ee26184..26fc59ed 100644 --- a/lib/SharedBuffer.h +++ b/lib/SharedBuffer.h @@ -22,12 +22,19 @@ #include #include +#ifdef USE_ASIO +#include +#include +#else #include #include +#endif #include #include #include +#include "AsioDefines.h" + namespace pulsar { class SharedBuffer { @@ -144,13 +151,13 @@ class SharedBuffer { inline bool writable() const { return writableBytes() > 0; } - boost::asio::const_buffers_1 const_asio_buffer() const { - return boost::asio::const_buffers_1(ptr_ + readIdx_, readableBytes()); + ASIO::const_buffers_1 const_asio_buffer() const { + return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes()); } - boost::asio::mutable_buffers_1 asio_buffer() { + ASIO::mutable_buffers_1 asio_buffer() { assert(data_); - return boost::asio::buffer(ptr_ + writeIdx_, writableBytes()); + return ASIO::buffer(ptr_ + writeIdx_, writableBytes()); } void write(const char* data, uint32_t size) { @@ -239,17 +246,17 @@ class CompositeSharedBuffer { } // Implement the ConstBufferSequence requirements. - typedef boost::asio::const_buffer value_type; - typedef boost::asio::const_buffer* iterator; - typedef const boost::asio::const_buffer* const_iterator; + typedef ASIO::const_buffer value_type; + typedef ASIO::const_buffer* iterator; + typedef const ASIO::const_buffer* const_iterator; - const boost::asio::const_buffer* begin() const { return &(asioBuffers_.at(0)); } + const ASIO::const_buffer* begin() const { return &(asioBuffers_.at(0)); } - const boost::asio::const_buffer* end() const { return begin() + Size; } + const ASIO::const_buffer* end() const { return begin() + Size; } private: std::array sharedBuffers_; - std::array asioBuffers_; + std::array asioBuffers_; }; typedef CompositeSharedBuffer<2> PairSharedBuffer; diff --git a/lib/TimeUtils.h b/lib/TimeUtils.h index a55773d9..03f563c3 100644 --- a/lib/TimeUtils.h +++ b/lib/TimeUtils.h @@ -21,19 +21,23 @@ #include #include -#include #include namespace pulsar { -using namespace boost::posix_time; -using boost::posix_time::milliseconds; -using boost::posix_time::seconds; +using ptime = decltype(std::chrono::high_resolution_clock::now()); +using TimeDuration = std::chrono::nanoseconds; + +inline decltype(std::chrono::milliseconds(0).count()) toMillis(TimeDuration duration) { + return std::chrono::duration_cast(duration).count(); +} class PULSAR_PUBLIC TimeUtils { public: - static ptime now(); - static int64_t currentTimeMillis(); + static ptime now() { return std::chrono::high_resolution_clock::now(); } + static int64_t currentTimeMillis() { + return std::chrono::duration_cast(now().time_since_epoch()).count(); + } }; // This class processes a timeout with the following semantics: diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc index 061a1409..e371af99 100644 --- a/lib/UnAckedMessageTrackerEnabled.cc +++ b/lib/UnAckedMessageTrackerEnabled.cc @@ -34,9 +34,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { timeoutHandlerHelper(); ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); timer_ = executorService->createDeadlineTimer(); - timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_)); + timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_)); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self && !ec) { self->timeoutHandler(); @@ -173,7 +173,7 @@ void UnAckedMessageTrackerEnabled::clear() { } void UnAckedMessageTrackerEnabled::stop() { - boost::system::error_code ec; + ASIO_ERROR ec; if (timer_) { timer_->cancel(ec); } diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h index 6181a8a3..83edc4cb 100644 --- a/lib/UnAckedMessageTrackerEnabled.h +++ b/lib/UnAckedMessageTrackerEnabled.h @@ -18,13 +18,13 @@ */ #ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_ #define LIB_UNACKEDMESSAGETRACKERENABLED_H_ -#include #include #include #include #include #include +#include "AsioTimer.h" #include "TestUtil.h" #include "UnAckedMessageTrackerInterface.h" @@ -33,7 +33,6 @@ namespace pulsar { class ClientImpl; class ConsumerImplBase; using ClientImplPtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this, public UnAckedMessageTrackerInterface { diff --git a/lib/auth/athenz/ZTSClient.cc b/lib/auth/athenz/ZTSClient.cc index 230713ea..35387d96 100644 --- a/lib/auth/athenz/ZTSClient.cc +++ b/lib/auth/athenz/ZTSClient.cc @@ -44,8 +44,6 @@ namespace ptree = boost::property_tree; #pragma clang diagnostic ignored "-Wunknown-warning-option" #endif -#include - #if defined(__clang__) #pragma clang diagnostic pop #endif diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc index 056dbf6e..0eefabdc 100644 --- a/lib/stats/ConsumerStatsImpl.cc +++ b/lib/stats/ConsumerStatsImpl.cc @@ -46,7 +46,7 @@ ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats) totalAckedMsgMap_(stats.totalAckedMsgMap_), statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {} -void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) { +void ConsumerStatsImpl::flushAndReset(const ASIO_ERROR& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); return; @@ -85,9 +85,9 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy } void ConsumerStatsImpl::scheduleTimer() { - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); + timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h index 44f927f5..03f3a474 100644 --- a/lib/stats/ConsumerStatsImpl.h +++ b/lib/stats/ConsumerStatsImpl.h @@ -20,17 +20,16 @@ #ifndef PULSAR_CONSUMER_STATS_IMPL_H_ #define PULSAR_CONSUMER_STATS_IMPL_H_ -#include #include #include #include #include #include "ConsumerStatsBase.h" +#include "lib/AsioTimer.h" #include "lib/ExecutorService.h" namespace pulsar { -using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; @@ -58,7 +57,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this public: ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int); ConsumerStatsImpl(const ConsumerStatsImpl& stats); - void flushAndReset(const boost::system::error_code&); + void flushAndReset(const ASIO_ERROR&); void start() override; void receivedMessage(Message&, Result) override; void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; diff --git a/lib/stats/ProducerStatsBase.h b/lib/stats/ProducerStatsBase.h index fe0ba0a5..b24266e8 100644 --- a/lib/stats/ProducerStatsBase.h +++ b/lib/stats/ProducerStatsBase.h @@ -22,14 +22,14 @@ #include #include -#include +#include "lib/TimeUtils.h" namespace pulsar { class ProducerStatsBase { public: virtual void start() {} virtual void messageSent(const Message& msg) = 0; - virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0; + virtual void messageReceived(Result, const ptime&) = 0; virtual ~ProducerStatsBase(){}; }; diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/stats/ProducerStatsDisabled.h index df1df0f8..df1da783 100644 --- a/lib/stats/ProducerStatsDisabled.h +++ b/lib/stats/ProducerStatsDisabled.h @@ -25,7 +25,7 @@ namespace pulsar { class ProducerStatsDisabled : public ProducerStatsBase { public: virtual void messageSent(const Message& msg){}; - virtual void messageReceived(Result, const boost::posix_time::ptime&){}; + virtual void messageReceived(Result, const ptime&){}; }; } // namespace pulsar #endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc index 3d3629db..15e9e67e 100644 --- a/lib/stats/ProducerStatsImpl.cc +++ b/lib/stats/ProducerStatsImpl.cc @@ -20,9 +20,11 @@ #include "ProducerStatsImpl.h" #include +#include #include "lib/ExecutorService.h" #include "lib/LogUtils.h" +#include "lib/TimeUtils.h" #include "lib/Utils.h" namespace pulsar { @@ -65,7 +67,7 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats) void ProducerStatsImpl::start() { scheduleTimer(); } -void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) { +void ProducerStatsImpl::flushAndReset(const ASIO_ERROR& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); return; @@ -93,9 +95,10 @@ void ProducerStatsImpl::messageSent(const Message& msg) { totalBytesSent_ += msg.getLength(); } -void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) { - boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time(); - double diffInMicros = (currentTime - publishTime).total_microseconds(); +void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) { + auto currentTime = TimeUtils::now(); + double diffInMicros = + std::chrono::duration_cast(currentTime - publishTime).count(); std::lock_guard lock(mutex_); totalLatencyAccumulator_(diffInMicros); latencyAccumulator_(diffInMicros); @@ -106,9 +109,9 @@ void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::pti ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); } void ProducerStatsImpl::scheduleTimer() { - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); + timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { return; diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h index 8cd10992..5d445c60 100644 --- a/lib/stats/ProducerStatsImpl.h +++ b/lib/stats/ProducerStatsImpl.h @@ -30,20 +30,18 @@ #include #include #include -#include -#include #include #include #include #include #include "ProducerStatsBase.h" +#include "lib/AsioTimer.h" namespace pulsar { class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -using DeadlineTimerPtr = std::shared_ptr; typedef boost::accumulators::accumulator_set< double, @@ -83,11 +81,11 @@ class ProducerStatsImpl : public std::enable_shared_from_this void start() override; - void flushAndReset(const boost::system::error_code&); + void flushAndReset(const ASIO_ERROR&); void messageSent(const Message&) override; - void messageReceived(Result, const boost::posix_time::ptime&) override; + void messageReceived(Result, const ptime&) override; ~ProducerStatsImpl(); diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc index 9fd048b7..b091f973 100644 --- a/tests/AuthPluginTest.cc +++ b/tests/AuthPluginTest.cc @@ -21,9 +21,14 @@ #include #include +#ifdef USE_ASIO +#include +#else #include +#endif #include +#include "lib/AsioDefines.h" #include "lib/Future.h" #include "lib/Latch.h" #include "lib/LogUtils.h" @@ -287,10 +292,9 @@ namespace testAthenz { std::string principalToken; void mockZTS(Latch& latch, int port) { LOG_INFO("-- MockZTS started"); - boost::asio::io_service io; - boost::asio::ip::tcp::iostream stream; - boost::asio::ip::tcp::acceptor acceptor(io, - boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)); + ASIO::io_service io; + ASIO::ip::tcp::iostream stream; + ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port)); LOG_INFO("-- MockZTS waiting for connnection"); latch.countdown(); diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc index a04da085..7595f44e 100644 --- a/tests/AuthTokenTest.cc +++ b/tests/AuthTokenTest.cc @@ -22,7 +22,6 @@ #include #include -#include #include #include #include diff --git a/tests/BackoffTest.cc b/tests/BackoffTest.cc index d066b944..5fe4f71a 100644 --- a/tests/BackoffTest.cc +++ b/tests/BackoffTest.cc @@ -26,42 +26,42 @@ #include "lib/stats/ProducerStatsImpl.h" using namespace pulsar; -using boost::posix_time::milliseconds; -using boost::posix_time::seconds; +using std::chrono::milliseconds; +using std::chrono::seconds; static bool checkExactAndDecrementTimer(Backoff& backoff, const unsigned int& t2) { - const unsigned int& t1 = backoff.next().total_milliseconds(); - boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + auto t1 = toMillis(backoff.next()); + auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); firstBackOffTime -= milliseconds(t2); return t1 == t2; } static bool withinTenPercentAndDecrementTimer(Backoff& backoff, const unsigned int& t2) { - const unsigned int& t1 = backoff.next().total_milliseconds(); - boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + auto t1 = toMillis(backoff.next()); + auto& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); firstBackOffTime -= milliseconds(t2); return (t1 >= t2 * 0.9 && t1 <= t2); } TEST(BackoffTest, mandatoryStopTestNegativeTest) { Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); - ASSERT_EQ(backoff.next().total_milliseconds(), 100); - backoff.next().total_milliseconds(); // 200 - backoff.next().total_milliseconds(); // 400 - backoff.next().total_milliseconds(); // 800 + ASSERT_EQ(toMillis(backoff.next()), 100); + backoff.next(); // 200 + backoff.next(); // 400 + backoff.next(); // 800 ASSERT_FALSE(withinTenPercentAndDecrementTimer(backoff, 400)); } TEST(BackoffTest, firstBackoffTimerTest) { Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); - ASSERT_EQ(backoff.next().total_milliseconds(), 100); - boost::posix_time::ptime firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + ASSERT_EQ(toMillis(backoff.next()), 100); + auto firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); std::this_thread::sleep_for(std::chrono::milliseconds(300)); TimeDuration diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime; ASSERT_EQ(diffBackOffTime, milliseconds(0)); // no change since reset not called backoff.reset(); - ASSERT_EQ(backoff.next().total_milliseconds(), 100); + ASSERT_EQ(toMillis(backoff.next()), 100); diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime; ASSERT_TRUE(diffBackOffTime >= milliseconds(300) && diffBackOffTime < seconds(1)); } diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index c7e98bce..f97457fe 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -955,7 +955,7 @@ TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) { auto elapsed = TimeUtils::now() - start; // getLastMessageIdAsync should be blocked until operationTimeout when the connection is disconnected. - ASSERT_GE(elapsed.seconds(), operationTimeout); + ASSERT_GE(std::chrono::duration_cast(elapsed).count(), operationTimeout); } TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages) { diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index c2863e8c..73778842 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -170,7 +170,7 @@ class PulsarFriend { handler.connection_ = conn; } - static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) { + static auto getFirstBackoffTime(Backoff& backoff) -> decltype(backoff.firstBackoffTime_)& { return backoff.firstBackoffTime_; } diff --git a/tests/RoundRobinMessageRouterTest.cc b/tests/RoundRobinMessageRouterTest.cc index 56a76050..145c45ae 100644 --- a/tests/RoundRobinMessageRouterTest.cc +++ b/tests/RoundRobinMessageRouterTest.cc @@ -31,7 +31,7 @@ TEST(RoundRobinMessageRouterTest, onePartition) { const int numPartitions = 1; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, - boost::posix_time::milliseconds(0)); + std::chrono::milliseconds(0)); Message msg1 = MessageBuilder().setPartitionKey("my-key-1").setContent("one").build(); Message msg2 = MessageBuilder().setPartitionKey("my-key-2").setContent("two").build(); @@ -49,7 +49,7 @@ TEST(RoundRobinMessageRouterTest, sameKey) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, - boost::posix_time::milliseconds(0)); + std::chrono::milliseconds(0)); Message msg1 = MessageBuilder().setPartitionKey("my-key").setContent("one").build(); Message msg2 = MessageBuilder().setPartitionKey("my-key").setContent("two").build(); @@ -63,7 +63,7 @@ TEST(RoundRobinMessageRouterTest, batchingDisabled) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1, - boost::posix_time::milliseconds(0)); + std::chrono::milliseconds(0)); Message msg1 = MessageBuilder().setContent("one").build(); Message msg2 = MessageBuilder().setContent("two").build(); @@ -77,7 +77,7 @@ TEST(RoundRobinMessageRouterTest, batchingEnabled) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000, - boost::posix_time::seconds(1)); + std::chrono::seconds(1)); int p = -1; for (int i = 0; i < 100; i++) { @@ -96,7 +96,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) { const int numPartitions = 13; RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000, - boost::posix_time::seconds(1)); + std::chrono::seconds(1)); int p1 = -1; for (int i = 0; i < 100; i++) { @@ -132,8 +132,7 @@ TEST(RoundRobinMessageRouterTest, maxDelay) { TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) { const int numPartitions = 13; - RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 1000, - boost::posix_time::seconds(1)); + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 1000, std::chrono::seconds(1)); Message msg1 = MessageBuilder().setContent("one").build(); Message msg2 = MessageBuilder().setContent("two").build(); @@ -150,8 +149,7 @@ TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) { TEST(RoundRobinMessageRouterTest, maxBatchSize) { const int numPartitions = 13; - RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 8, - boost::posix_time::seconds(1)); + RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 8, std::chrono::seconds(1)); Message msg1 = MessageBuilder().setContent("one").build(); Message msg2 = MessageBuilder().setContent("two").build(); diff --git a/vcpkg.json b/vcpkg.json index d023a406..5ff44100 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -5,45 +5,20 @@ "builtin-baseline": "b051745c68faa6f65c493371d564c4eb8af34dad", "dependencies": [ { - "name": "boost-accumulators", - "version>=": "1.83.0" - }, - { - "name": "boost-algorithm", - "version>=": "1.83.0" - }, - { - "name": "boost-any", - "version>=": "1.83.0" - }, - { - "name": "boost-asio", - "version>=": "1.83.0" - }, - { - "name": "boost-circular-buffer", - "version>=": "1.83.0" - }, - { - "name": "boost-date-time", - "version>=": "1.83.0" + "name": "asio", + "features": [ + "openssl" + ], + "version>=": "1.28.2" }, { - "name": "boost-predef", + "name": "boost-accumulators", "version>=": "1.83.0" }, { "name": "boost-property-tree", "version>=": "1.83.0" }, - { - "name": "boost-serialization", - "version>=": "1.83.0" - }, - { - "name": "boost-xpressive", - "version>=": "1.83.0" - }, { "name": "curl", "default-features": false,