Skip to content

Commit a1e2b4a

Browse files
Fix multi-topics consumer could receive old messages after seek (#388)
### Motivation See apache/pulsar#21945 ### Modifications In C++ client, the multi-topics consumer receives messages by configuring internal consumers with a message listener that adds messages to `incomingMessages_`. So this patch pauses the listeners before seek and resumes them after seek. Add `MultiTopicsConsumerTest.testSeekToNewerPosition` for test.
1 parent d107d32 commit a1e2b4a

File tree

6 files changed

+215
-57
lines changed

6 files changed

+215
-57
lines changed

lib/ConsumerImpl.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,6 @@ class ConsumerImpl : public ConsumerImplBase {
333333
const ClientConnectionPtr& cnx, MessageId& messageId);
334334

335335
friend class PulsarFriend;
336-
337-
// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
338336
friend class MultiTopicsConsumerImpl;
339337

340338
FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);

lib/MultiResultCallback.h

Lines changed: 0 additions & 51 deletions
This file was deleted.

lib/MultiTopicsConsumerImpl.cc

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include "LookupService.h"
2929
#include "MessageImpl.h"
3030
#include "MessagesImpl.h"
31-
#include "MultiResultCallback.h"
3231
#include "MultiTopicsBrokerConsumerStatsImpl.h"
3332
#include "TopicName.h"
3433
#include "UnAckedMessageTrackerDisabled.h"
@@ -521,6 +520,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
521520
}
522521

523522
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
523+
if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
524+
return;
525+
}
524526
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
525527
<< " message:" << msg.getDataAsString());
526528
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
@@ -907,9 +909,37 @@ void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callb
907909
return;
908910
}
909911

910-
MultiResultCallback multiResultCallback(callback, consumers_.size());
911-
consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr consumer) {
912-
consumer->seekAsync(timestamp, multiResultCallback);
912+
duringSeek_.store(true, std::memory_order_release);
913+
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
914+
unAckedMessageTrackerPtr_->clear();
915+
incomingMessages_.clear();
916+
incomingMessagesSize_ = 0L;
917+
918+
auto weakSelf = weak_from_this();
919+
auto numConsumersLeft = std::make_shared<std::atomic<int64_t>>(consumers_.size());
920+
auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) {
921+
auto self = weakSelf.lock();
922+
if (PULSAR_UNLIKELY(!self)) {
923+
callback(result);
924+
return;
925+
}
926+
if (result != ResultOk) {
927+
*numConsumersLeft = 0; // skip the following callbacks
928+
callback(result);
929+
return;
930+
}
931+
if (--*numConsumersLeft > 0) {
932+
return;
933+
}
934+
duringSeek_.store(false, std::memory_order_release);
935+
listenerExecutor_->postWork([this, self] {
936+
consumers_.forEachValue(
937+
[](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
938+
});
939+
callback(ResultOk);
940+
};
941+
consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) {
942+
consumer->seekAsync(timestamp, wrappedCallback);
913943
});
914944
}
915945

lib/MultiTopicsConsumerImpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
131131
const Commands::SubscriptionMode subscriptionMode_;
132132
boost::optional<MessageId> startMessageId_;
133133
ConsumerInterceptorsPtr interceptors_;
134+
std::atomic_bool duringSeek_{false};
134135

135136
/* methods */
136137
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,

tests/MultiTopicsConsumerTest.cc

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <gtest/gtest.h>
20+
#include <pulsar/Client.h>
21+
22+
#include <chrono>
23+
24+
#include "ThreadSafeMessages.h"
25+
#include "lib/LogUtils.h"
26+
27+
static const std::string lookupUrl = "pulsar://localhost:6650";
28+
29+
DECLARE_LOG_OBJECT()
30+
31+
using namespace pulsar;
32+
33+
extern std::string unique_str();
34+
35+
TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
36+
const std::string topicPrefix = "multi-topics-consumer-seek-to-newer-position";
37+
Client client{lookupUrl};
38+
std::vector<std::string> topics{topicPrefix + unique_str(), topicPrefix + unique_str()};
39+
Producer producer1;
40+
ASSERT_EQ(ResultOk, client.createProducer(topics[0], producer1));
41+
Producer producer2;
42+
ASSERT_EQ(ResultOk, client.createProducer(topics[1], producer2));
43+
producer1.send(MessageBuilder().setContent("1-0").build());
44+
producer2.send(MessageBuilder().setContent("2-0").build());
45+
producer1.send(MessageBuilder().setContent("1-1").build());
46+
producer2.send(MessageBuilder().setContent("2-1").build());
47+
48+
Consumer consumer;
49+
ConsumerConfiguration conf;
50+
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
51+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
52+
std::vector<int64_t> timestamps;
53+
Message msg;
54+
for (int i = 0; i < 4; i++) {
55+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
56+
timestamps.emplace_back(msg.getPublishTimestamp());
57+
}
58+
std::sort(timestamps.begin(), timestamps.end());
59+
const auto timestamp = timestamps[2];
60+
consumer.close();
61+
62+
ThreadSafeMessages messages{2};
63+
64+
// Test synchronous receive after seek
65+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-2", conf, consumer));
66+
consumer.seek(timestamp);
67+
for (int i = 0; i < 2; i++) {
68+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
69+
messages.add(msg);
70+
}
71+
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", "2-1"}));
72+
consumer.close();
73+
74+
// Test asynchronous receive after seek
75+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-3", conf, consumer));
76+
messages.clear();
77+
consumer.seek(timestamp);
78+
for (int i = 0; i < 2; i++) {
79+
consumer.receiveAsync([&messages](Result result, const Message& msg) {
80+
if (result == ResultOk) {
81+
messages.add(msg);
82+
} else {
83+
LOG_ERROR("Failed to receive: " << result);
84+
}
85+
});
86+
}
87+
ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
88+
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", "2-1"}));
89+
consumer.close();
90+
91+
// Test message listener
92+
conf.setMessageListener([&messages](Consumer consumer, Message msg) { messages.add(msg); });
93+
messages.clear();
94+
messages.setMinNumMsgs(4);
95+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-4", conf, consumer));
96+
ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
97+
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-0", "1-1", "2-0", "2-1"}));
98+
messages.clear();
99+
messages.setMinNumMsgs(2);
100+
consumer.seek(timestamp);
101+
ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
102+
ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", "2-1"}));
103+
104+
client.close();
105+
}

tests/ThreadSafeMessages.h

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/Message.h>
22+
23+
#include <algorithm>
24+
#include <atomic>
25+
#include <condition_variable>
26+
#include <mutex>
27+
#include <vector>
28+
29+
namespace pulsar {
30+
31+
// When we receive messages in the message listener or the callback of receiveAsync(), we need to verify the
32+
// received messages in the test thread. This class is a helper class for thread-safe access to the messages.
33+
class ThreadSafeMessages {
34+
public:
35+
ThreadSafeMessages(size_t minNumMsgs) : minNumMsgs_(minNumMsgs) {}
36+
37+
template <typename Duration>
38+
bool wait(Duration duration) {
39+
std::unique_lock<std::mutex> lock{mutex_};
40+
return cond_.wait_for(lock, duration, [this] { return msgs_.size() >= minNumMsgs_; });
41+
}
42+
43+
void add(const Message& msg) {
44+
std::lock_guard<std::mutex> lock{mutex_};
45+
msgs_.emplace_back(msg);
46+
if (msgs_.size() >= minNumMsgs_) {
47+
cond_.notify_all();
48+
}
49+
}
50+
51+
void clear() {
52+
std::lock_guard<std::mutex> lock{mutex_};
53+
msgs_.clear();
54+
}
55+
56+
std::vector<std::string> getSortedValues() const {
57+
std::unique_lock<std::mutex> lock{mutex_};
58+
std::vector<std::string> values(msgs_.size());
59+
std::transform(msgs_.cbegin(), msgs_.cend(), values.begin(),
60+
[](const Message& msg) { return msg.getDataAsString(); });
61+
lock.unlock();
62+
std::sort(values.begin(), values.end());
63+
return values;
64+
}
65+
66+
void setMinNumMsgs(size_t minNumMsgs) noexcept { minNumMsgs_ = minNumMsgs; }
67+
68+
private:
69+
std::atomic_size_t minNumMsgs_;
70+
std::vector<Message> msgs_;
71+
mutable std::mutex mutex_;
72+
mutable std::condition_variable cond_;
73+
};
74+
75+
} // namespace pulsar

0 commit comments

Comments
 (0)