2222#include " behaviortree_cpp/bt_factory.h"
2323
2424#include " behaviortree_ros2/ros_node_params.hpp"
25+ #include < boost/signals2.hpp>
2526
2627namespace BT
2728{
2829
30+
2931/* *
3032 * @brief Abstract class to wrap a Topic subscriber.
3133 * Considering the example in the tutorial:
@@ -43,11 +45,44 @@ namespace BT
4345template <class TopicT >
4446class RosTopicSubNode : public BT ::ConditionNode
4547{
46-
47- public:
48+ public:
4849 // Type definitions
4950 using Subscriber = typename rclcpp::Subscription<TopicT>;
5051
52+ protected:
53+ struct SubscriberInstance
54+ {
55+ std::shared_ptr<Subscriber> subscriber;
56+ rclcpp::CallbackGroup::SharedPtr callback_group;
57+ rclcpp::executors::SingleThreadedExecutor callback_group_executor;
58+ boost::signals2::signal<void (const std::shared_ptr<TopicT>&)> broadcaster;
59+ };
60+
61+ static std::mutex& registryMutex ()
62+ {
63+ static std::mutex sub_mutex;
64+ return sub_mutex;
65+ }
66+
67+ // contains the fully-qualified name of the node and the name of the topic
68+ static SubscriberInstance& getRegisteredInstance (const std::string& key)
69+ {
70+ static std::unordered_map<std::string, SubscriberInstance> subscribers_registry;
71+ return subscribers_registry[key];
72+ }
73+
74+ std::shared_ptr<rclcpp::Node> node_;
75+ SubscriberInstance* sub_instance_ = nullptr ;
76+ std::shared_ptr<TopicT> last_msg_;
77+ std::string topic_name_;
78+
79+ rclcpp::Logger logger ()
80+ {
81+ return node_->get_logger ();
82+ }
83+
84+ public:
85+
5186 /* * You are not supposed to instantiate this class directly, the factory will do it.
5287 * To register this class into the factory, use:
5388 *
@@ -87,32 +122,17 @@ class RosTopicSubNode : public BT::ConditionNode
87122
88123 NodeStatus tick () override final ;
89124
90- /* * topicCallback is the callback invoked when a message is received.
91- */
92- virtual void topicCallback (const std::shared_ptr<TopicT> msg);
93-
94125 /* * Callback invoked in the tick. You must return either SUCCESS of FAILURE
95126 *
96- * @param last_msg the latest message received since the last tick.
97- * it might be empty.
127+ * @param last_msg the latest message received, since the last tick.
128+ * Will be empty if no new message received.
129+ *
98130 * @return the new status of the Node, based on last_msg
99131 */
100- virtual BT::NodeStatus onTick (const typename TopicT::SharedPtr& last_msg) = 0;
101-
102- protected:
103-
104- std::shared_ptr<rclcpp::Node> node_;
105- std::string prev_topic_name_;
106- bool topic_name_may_change_ = false ;
132+ virtual NodeStatus onTick (const std::shared_ptr<TopicT>& last_msg) = 0;
107133
108134private:
109135
110- std::shared_ptr<Subscriber> subscriber_;
111- typename TopicT::SharedPtr last_msg_;
112- bool is_message_received_;
113- rclcpp::CallbackGroup::SharedPtr callback_group_;
114- rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
115-
116136 bool createSubscriber (const std::string& topic_name);
117137};
118138
@@ -151,7 +171,7 @@ template<class T> inline
151171 createSubscriber (bb_topic_name);
152172 }
153173 else {
154- topic_name_may_change_ = true ;
174+ // do nothing
155175 // createSubscriber will be invoked in the first tick().
156176 }
157177 }
@@ -173,48 +193,74 @@ template<class T> inline
173193 {
174194 throw RuntimeError (" topic_name is empty" );
175195 }
176-
177- callback_group_ = node_->create_callback_group (rclcpp::CallbackGroupType::MutuallyExclusive, false );
178- callback_group_executor_.add_callback_group (callback_group_, node_->get_node_base_interface ());
179- rclcpp::SubscriptionOptions sub_option;
180- sub_option.callback_group = callback_group_;
181- auto callback = std::bind (&RosTopicSubNode::topicCallback, this , std::placeholders::_1);
182- subscriber_ = node_->create_subscription <T>(topic_name, 1 , callback, sub_option);
183- prev_topic_name_ = topic_name;
196+ if (sub_instance_)
197+ {
198+ throw RuntimeError (" Can't call createSubscriber more than once" );
199+ }
200+
201+ // find SubscriberInstance in the registry
202+ std::unique_lock lk (registryMutex ());
203+ const auto key = topic_name + " @" + node_->get_fully_qualified_name ();
204+ sub_instance_ = &(getRegisteredInstance (key));
205+
206+ // just created (subscriber is not initialized)
207+ if (!sub_instance_->subscriber )
208+ {
209+ // create a callback group for this particular instance
210+ sub_instance_->callback_group =
211+ node_->create_callback_group (rclcpp::CallbackGroupType::MutuallyExclusive, false );
212+ sub_instance_->callback_group_executor .add_callback_group (
213+ sub_instance_->callback_group , node_->get_node_base_interface ());
214+
215+ rclcpp::SubscriptionOptions sub_option;
216+ sub_option.callback_group = sub_instance_->callback_group ;
217+
218+ // The callback will broadcast to all the instances of RosTopicSubNode<T>
219+ auto callback = [this ](const std::shared_ptr<T> msg) {
220+ sub_instance_->broadcaster (msg);
221+ };
222+ sub_instance_->subscriber =
223+ node_->create_subscription <T>(topic_name, 1 , callback, sub_option);
224+
225+ RCLCPP_INFO (logger (),
226+ " Node [%s] created Subscriber to topic [%s]" ,
227+ name ().c_str (), topic_name.c_str () );
228+ }
229+
230+ // add "this" as received of the broadcaster
231+ sub_instance_->broadcaster .connect ([this ](const std::shared_ptr<T> msg)
232+ {
233+ last_msg_ = msg;
234+ });
235+
236+ topic_name_ = topic_name;
184237 return true ;
185238}
186239
187- template <class T > inline
188- void RosTopicSubNode<T>::topicCallback(const std::shared_ptr<T> msg)
189- {
190- last_msg_ = msg;
191- }
192240
193241template <class T > inline
194242 NodeStatus RosTopicSubNode<T>::tick()
195243{
196244 // First, check if the subscriber_ is valid and that the name of the
197245 // topic_name in the port didn't change.
198246 // otherwise, create a new subscriber
199- if (!subscriber_ || ( status () == NodeStatus::IDLE && topic_name_may_change_) )
247+ if (!sub_instance_ )
200248 {
201249 std::string topic_name;
202250 getInput (" topic_name" , topic_name);
203- if (prev_topic_name_ != topic_name)
204- {
205- createSubscriber (topic_name);
206- }
251+ createSubscriber (topic_name);
207252 }
208253
209254 auto CheckStatus = [](NodeStatus status)
210255 {
211256 if ( !isStatusCompleted (status) )
212257 {
213- throw std::logic_error (" RosTopicSubNode: the callback must return either SUCCESS or FAILURE" );
258+ throw std::logic_error (" RosTopicSubNode: the callback must return"
259+ " either SUCCESS or FAILURE" );
214260 }
215261 return status;
216262 };
217- callback_group_executor_ .spin_some ();
263+ sub_instance_-> callback_group_executor .spin_some ();
218264 auto status = CheckStatus (onTick (last_msg_));
219265 last_msg_ = nullptr ;
220266
0 commit comments