Skip to content

Commit 58223e7

Browse files
committed
Do not close the socket when the broker failed due to MetadataStoreException
### Motivation When the broker failed to acquire the ownership of a namespace bundle by `LockBusyException`. It means there is another broker that has acquired the metadata store path and didn't release that path. For example: Broker 1: ``` 2024-01-24T23:35:36,626+0000 [metadata-store-10-1] WARN org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup <role> for topic persistent://<tenant>/<ns>/<topic> with error org.apache.pulsar.broker.PulsarServerException: Failed to acquire ownership for namespace bundle <tenant>/<ns>/0x50000000_0x51000000 Caused by: java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException$LockBusyException: Resource at /namespace/<tenant>/<ns>/0x50000000_0x51000000 is already locked ``` Broker 2: ``` 2024-01-24T23:35:36,650+0000 [broker-topic-workers-OrderedExecutor-3-0] INFO org.apache.pulsar.broker.PulsarService - Loaded 1 topics on <tenant>/<ns>/0x50000000_0x51000000 -- time taken: 0.044 seconds ``` After broker 2 released the lock at 23:35:36,650, the lookup request to broker 1 should tell the client that namespace bundle 0x50000000_0x51000000 is currently being unloaded and in the next retry the client will connect to the new owner broker. Here is another typical error: ``` 2024-01-24T23:57:57,264+0000 [pulsar-io-4-5] INFO org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup <role> for topic persistent://<tenant>/<ns>/<topic> with error Namespace bundle <tenant>/<ns>/0x0d000000_0x0e000000 is being unloaded ``` Though after apache/pulsar#21211, the server error becomes `MetadataError` rather than `ServiceNotReady`. However, since the `ServerError` is `ServiceNotReady`, the client will close the connection. If there are many other producers or consumers on the same connection, they will all reestablish connection to the broker, which is unnecessary and brings much pressure to broker side. ### Modifications In `checkServerError`, when the error code is `ServiceNotReady`, check the error message as well, if it hit the case in `handleLookupError`, do not close the connection. Add `ConnectionTest` on a mocked `ClientConnection` object to verify `close()` will not be called.
1 parent d1dd08b commit 58223e7

File tree

4 files changed

+116
-19
lines changed

4 files changed

+116
-19
lines changed

lib/ClientConnection.cc

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <fstream>
2525

2626
#include "AsioDefines.h"
27+
#include "ClientConnectionAdaptor.h"
2728
#include "ClientImpl.h"
2829
#include "Commands.h"
2930
#include "ConnectionPool.h"
@@ -80,9 +81,7 @@ static Result getResult(ServerError serverError, const std::string& message) {
8081
return ResultConsumerBusy;
8182

8283
case ServiceNotReady:
83-
// If the error is not caused by a PulsarServerException, treat it as retryable.
84-
return (message.find("PulsarServerException") == std::string::npos) ? ResultRetryable
85-
: ResultServiceUnitNotReady;
84+
return ResultServiceUnitNotReady;
8685

8786
case ProducerBlockedQuotaExceededError:
8887
return ResultProducerBlockedQuotaExceededError;
@@ -1469,19 +1468,8 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
14691468
return promise.getFuture();
14701469
}
14711470

1472-
void ClientConnection::checkServerError(ServerError error) {
1473-
switch (error) {
1474-
case proto::ServerError::ServiceNotReady:
1475-
close(ResultDisconnected);
1476-
break;
1477-
case proto::ServerError::TooManyRequests:
1478-
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
1479-
// https://github.com/apache/pulsar/pull/274
1480-
close(ResultDisconnected);
1481-
break;
1482-
default:
1483-
break;
1484-
}
1471+
void ClientConnection::checkServerError(ServerError error, const std::string& message) {
1472+
pulsar::adaptor::checkServerError(*this, error, message);
14851473
}
14861474

14871475
void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendReceipt) {
@@ -1573,7 +1561,7 @@ void ClientConnection::handlePartitionedMetadataResponse(
15731561
<< partitionMetadataResponse.request_id()
15741562
<< " error: " << partitionMetadataResponse.error()
15751563
<< " msg: " << partitionMetadataResponse.message());
1576-
checkServerError(partitionMetadataResponse.error());
1564+
checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message());
15771565
lookupDataPromise->setFailed(
15781566
getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message()));
15791567
} else {
@@ -1650,7 +1638,7 @@ void ClientConnection::handleLookupTopicRespose(
16501638
LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id()
16511639
<< " error: " << lookupTopicResponse.error()
16521640
<< " msg: " << lookupTopicResponse.message());
1653-
checkServerError(lookupTopicResponse.error());
1641+
checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message());
16541642
lookupDataPromise->setFailed(
16551643
getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
16561644
} else {

lib/ClientConnection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
404404

405405
friend class PulsarFriend;
406406

407-
void checkServerError(ServerError error);
407+
void checkServerError(ServerError error, const std::string& message);
408408

409409
void handleSendReceipt(const proto::CommandSendReceipt&);
410410
void handleSendError(const proto::CommandSendError&);

lib/ClientConnectionAdaptor.h

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
// This file is used to mock ClientConnection's methods
20+
#pragma once
21+
22+
#include <pulsar/Result.h>
23+
24+
#include "ProtoApiEnums.h"
25+
#include "PulsarApi.pb.h"
26+
27+
namespace pulsar {
28+
29+
namespace adaptor {
30+
31+
template <typename Connection>
32+
inline void checkServerError(Connection& connection, ServerError error, const std::string& message) {
33+
switch (error) {
34+
case proto::ServerError::ServiceNotReady:
35+
// There are some typical error messages that should not trigger the
36+
// close() of the connection.
37+
// "Namespace bundle ... is being unloaded"
38+
// "KeeperException$..."
39+
// "Failed to acquire ownership for namespace bundle ..."
40+
// Before https://github.com/apache/pulsar/pull/21211, the error of the 1st and 2nd messages
41+
// is ServiceNotReady. Before https://github.com/apache/pulsar/pull/21993, the error of the 3rd
42+
// message is ServiceNotReady.
43+
if (message.find("Failed to acquire ownership") == std::string::npos &&
44+
message.find("KeeperException") == std::string::npos &&
45+
message.find("is being unloaded") == std::string::npos) {
46+
connection.close(ResultDisconnected);
47+
}
48+
break;
49+
case proto::ServerError::TooManyRequests:
50+
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
51+
// https://github.com/apache/pulsar/pull/274
52+
connection.close(ResultDisconnected);
53+
break;
54+
default:
55+
break;
56+
}
57+
}
58+
59+
} // namespace adaptor
60+
61+
} // namespace pulsar

tests/ConnectionTest.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
*/
15+
#include <gmock/gmock.h>
16+
#include <gtest/gtest.h>
17+
18+
#include <vector>
19+
20+
#include "lib/ClientConnection.h"
21+
#include "lib/ClientConnectionAdaptor.h"
22+
23+
using namespace pulsar;
24+
25+
class MockClientConnection {
26+
public:
27+
MOCK_METHOD(void, close, (Result));
28+
29+
void checkServerError(ServerError error, const std::string& message) {
30+
::pulsar::adaptor::checkServerError(*this, error, message);
31+
}
32+
};
33+
34+
// These error messages come from
35+
// https://github.com/apache/pulsar/blob/a702e5a582eaa8292720f9e25fc2dcf858078813/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L334-L351
36+
static const std::vector<std::string> retryableErrorMessages{
37+
"Namespace bundle public/default/0x00000000_0xffffffff is being unloaded",
38+
"org.apache.zookeeper.KeeperException$OperationTimeoutException: KeeperErrorCode = OperationTimeout for "
39+
"/namespace/public/default/0x00000000_0xffffffff",
40+
"Failed to acquire ownership for namespace bundle public/default/0x00000000_0xffffffff"};
41+
42+
TEST(ConnectionTest, testCheckServerError) {
43+
MockClientConnection conn;
44+
EXPECT_CALL(conn, close(ResultDisconnected)).Times(0);
45+
for (auto&& msg : retryableErrorMessages) {
46+
conn.checkServerError(pulsar::proto::ServiceNotReady, msg);
47+
}
48+
}

0 commit comments

Comments
 (0)