Skip to content

Commit 8129810

Browse files
rtt_roscomm: shortcut ROS spinner callback queue
The patch introduces a new "queue" without a queue to bypass the global CallbackQueue of ROS for the subscriptions of Orocos messages. Instead, the network thread will immediately execute the callback, and will pass it to the global CallbackQueue only in case that it fails to execute with TryAgain status. Helper functions are provided to generate a subscriber with a custom implementation of CallbackQueueInterface. This method is chosen for the subscribers of the message transporter.
1 parent c65cf80 commit 8129810

File tree

4 files changed

+266
-4
lines changed

4 files changed

+266
-4
lines changed

rtt_roscomm/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ orocos_library(rtt_rostopic
1717
src/rtt_rostopic.cpp)
1818
target_link_libraries(rtt_rostopic ${catkin_LIBRARIES})
1919

20+
orocos_library(passthrough_callback_queue
21+
src/passthrough_callback_queue.cpp)
22+
target_link_libraries(passthrough_callback_queue ${catkin_LIBRARIES})
23+
2024
orocos_service(rtt_rostopic_service
2125
src/rtt_rostopic_service.cpp
2226
src/rtt_rostopic_ros_publish_activity.cpp)
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
2+
#ifndef RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_
3+
#define RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_
4+
5+
#include <ros/ros.h>
6+
#include <ros/callback_queue_interface.h>
7+
#include <ros/subscription_queue.h>
8+
#include <ros/callback_queue.h>
9+
10+
namespace rtt_roscomm {
11+
class PassthroughCallbackQueue: public ros::CallbackQueueInterface
12+
{
13+
public:
14+
PassthroughCallbackQueue();
15+
16+
/**
17+
* Implementation of ros::CallbackQueueInterface::addCallback()
18+
*
19+
* @param callback aaa
20+
* @param owner_id aaa
21+
*
22+
* @return void
23+
*/
24+
virtual void addCallback(
25+
const ros::CallbackInterfacePtr &callback,
26+
uint64_t owner_id=0);
27+
28+
/**
29+
* Implementation of ros::CallbackQueueInterface::removeByID()
30+
*
31+
* @param owner_id aaa
32+
*
33+
* @return void
34+
*/
35+
virtual void removeByID(uint64_t owner_id);
36+
37+
private:
38+
39+
/**
40+
* Port where to write the message
41+
*/
42+
// base::PortInterface* output_port_ptr_;
43+
44+
}; // class PassthroughCallbackQueue
45+
46+
/**
47+
* Helpers to generate subscribers with a custom ros::CallbackQueueInterface
48+
*/
49+
50+
// template<class M, class T>
51+
// ros::Subscriber subscribe(const std::string& topic, uint32_t queue_size,
52+
// void(T::*fp)(const boost::shared_ptr<M const>&) const, T* obj,
53+
// ros::CallbackQueueInterface* cq, ros::NodeHandle& nh,
54+
// const TransportHints& transport_hints = ros::TransportHints())
55+
// {
56+
// ros::SubscribeOptions ops;
57+
// ops.template init<M>(topic, queue_size, boost::bind(fp, obj, _1));
58+
// ops.transport_hints = transport_hints;
59+
// ops.callback_queue = cq;
60+
// return nh.subscribe(ops);
61+
// }
62+
63+
template<class M, class T>
64+
ros::Subscriber subscribe(
65+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
66+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M), T* obj,
67+
const ros::TransportHints& transport_hints = ros::TransportHints())
68+
{
69+
ros::SubscribeOptions ops;
70+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj, _1));
71+
ops.transport_hints = transport_hints;
72+
ops.callback_queue = cq;
73+
return nh.subscribe(ops);
74+
}
75+
76+
/// and the const version
77+
template<class M, class T>
78+
ros::Subscriber subscribe(
79+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
80+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const, T* obj,
81+
const ros::TransportHints& transport_hints = ros::TransportHints())
82+
{
83+
ros::SubscribeOptions ops;
84+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj, _1));
85+
ops.transport_hints = transport_hints;
86+
ops.callback_queue = cq;
87+
return nh.subscribe(ops);
88+
}
89+
template<class M, class T>
90+
ros::Subscriber subscribe(
91+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
92+
const std::string& topic, uint32_t queue_size,
93+
void(T::*fp)(const boost::shared_ptr<M const>&), T* obj,
94+
const ros::TransportHints& transport_hints = ros::TransportHints())
95+
{
96+
ros::SubscribeOptions ops;
97+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj, _1));
98+
ops.transport_hints = transport_hints;
99+
ops.callback_queue = cq;
100+
return nh.subscribe(ops);
101+
}
102+
template<class M, class T>
103+
ros::Subscriber subscribe(
104+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
105+
const std::string& topic, uint32_t queue_size,
106+
void(T::*fp)(const boost::shared_ptr<M const>&) const, T* obj,
107+
const ros::TransportHints& transport_hints = ros::TransportHints())
108+
{
109+
ros::SubscribeOptions ops;
110+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj, _1));
111+
ops.transport_hints = transport_hints;
112+
ops.callback_queue = cq;
113+
return nh.subscribe(ops);
114+
}
115+
template<class M, class T>
116+
ros::Subscriber subscribe(
117+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
118+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M),
119+
const boost::shared_ptr<T>& obj,
120+
const ros::TransportHints& transport_hints = ros::TransportHints())
121+
{
122+
ros::SubscribeOptions ops;
123+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
124+
ops.tracked_object = obj;
125+
ops.transport_hints = transport_hints;
126+
ops.callback_queue = cq;
127+
return nh.subscribe(ops);
128+
}
129+
130+
template<class M, class T>
131+
ros::Subscriber subscribe(
132+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
133+
const std::string& topic, uint32_t queue_size, void(T::*fp)(M) const,
134+
const boost::shared_ptr<T>& obj,
135+
const ros::TransportHints& transport_hints = ros::TransportHints())
136+
{
137+
ros::SubscribeOptions ops;
138+
ops.template initByFullCallbackType<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
139+
ops.tracked_object = obj;
140+
ops.transport_hints = transport_hints;
141+
ops.callback_queue = cq;
142+
return nh.subscribe(ops);
143+
}
144+
template<class M, class T>
145+
ros::Subscriber subscribe(
146+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
147+
const std::string& topic, uint32_t queue_size,
148+
void(T::*fp)(const boost::shared_ptr<M const>&),
149+
const boost::shared_ptr<T>& obj,
150+
const ros::TransportHints& transport_hints = ros::TransportHints())
151+
{
152+
ros::SubscribeOptions ops;
153+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
154+
ops.tracked_object = obj;
155+
ops.transport_hints = transport_hints;
156+
ops.callback_queue = cq;
157+
return nh.subscribe(ops);
158+
}
159+
template<class M, class T>
160+
ros::Subscriber subscribe(
161+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
162+
const std::string& topic, uint32_t queue_size,
163+
void(T::*fp)(const boost::shared_ptr<M const>&) const,
164+
const boost::shared_ptr<T>& obj,
165+
const ros::TransportHints& transport_hints = ros::TransportHints())
166+
{
167+
ros::SubscribeOptions ops;
168+
ops.template init<M>(topic, queue_size, boost::bind(fp, obj.get(), _1));
169+
ops.tracked_object = obj;
170+
ops.transport_hints = transport_hints;
171+
ops.callback_queue = cq;
172+
return nh.subscribe(ops);
173+
}
174+
template<class M>
175+
ros::Subscriber subscribe(
176+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
177+
const std::string& topic, uint32_t queue_size, void(*fp)(M),
178+
const ros::TransportHints& transport_hints = ros::TransportHints())
179+
{
180+
ros::SubscribeOptions ops;
181+
ops.template initByFullCallbackType<M>(topic, queue_size, fp);
182+
ops.transport_hints = transport_hints;
183+
ops.callback_queue = cq;
184+
return nh.subscribe(ops);
185+
}
186+
template<class M>
187+
ros::Subscriber subscribe(
188+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
189+
const std::string& topic, uint32_t queue_size,
190+
void(*fp)(const boost::shared_ptr<M const>&),
191+
const ros::TransportHints& transport_hints = ros::TransportHints())
192+
{
193+
ros::SubscribeOptions ops;
194+
ops.template init<M>(topic, queue_size, fp);
195+
ops.transport_hints = transport_hints;
196+
ops.callback_queue = cq;
197+
return nh.subscribe(ops);
198+
}
199+
template<class M>
200+
ros::Subscriber subscribe(
201+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
202+
const std::string& topic, uint32_t queue_size,
203+
const boost::function<void (const boost::shared_ptr<M const>&)>& callback,
204+
const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(),
205+
const ros::TransportHints& transport_hints = ros::TransportHints())
206+
{
207+
ros::SubscribeOptions ops;
208+
ops.template init<M>(topic, queue_size, callback);
209+
ops.tracked_object = tracked_object;
210+
ops.transport_hints = transport_hints;
211+
ops.callback_queue = cq;
212+
return nh.subscribe(ops);
213+
}
214+
template<class M, class C>
215+
ros::Subscriber subscribe(
216+
ros::NodeHandle& nh, ros::CallbackQueueInterface* cq,
217+
const std::string& topic, uint32_t queue_size,
218+
const boost::function<void (C)>& callback,
219+
const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr(),
220+
const ros::TransportHints& transport_hints = ros::TransportHints())
221+
{
222+
ros::SubscribeOptions ops;
223+
ops.template initByFullCallbackType<C>(topic, queue_size, callback);
224+
ops.tracked_object = tracked_object;
225+
ops.transport_hints = transport_hints;
226+
ops.callback_queue = cq;
227+
return nh.subscribe(ops);
228+
}
229+
230+
} // namespace rtt_roscomm
231+
232+
#endif // RTT_ROSCOMM__PASSTHROUGH_CALLBACK_QUEUE_HPP_

rtt_roscomm/include/rtt_roscomm/rtt_rostopic_ros_msg_transporter.hpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include <rtt/internal/ConnFactory.hpp>
5757
#include <ros/ros.h>
5858

59+
#include <rtt_roscomm/passthrough_callback_queue.hpp>
5960
#include <rtt_roscomm/rtt_rostopic_ros_publish_activity.hpp>
6061

6162
#ifndef RTT_VERSION_GTE
@@ -241,6 +242,7 @@ namespace rtt_roscomm {
241242
ros::NodeHandle ros_node;
242243
ros::NodeHandle ros_node_private;
243244
ros::Subscriber ros_sub;
245+
PassthroughCallbackQueue prassthrough_callback_queue;
244246

245247
public:
246248
/**
@@ -264,9 +266,9 @@ namespace rtt_roscomm {
264266
RTT::log(RTT::Debug)<<"Creating ROS subscriber for port "<<port->getName()<<" on topic "<<policy.name_id<<RTT::endlog();
265267
}
266268
if(topicname.length() > 1 && topicname.at(0) == '~') {
267-
ros_sub = ros_node_private.subscribe(policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
269+
ros_sub = subscribe(ros_node_private, &prassthrough_callback_queue, policy.name_id.substr(1), policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
268270
} else {
269-
ros_sub = ros_node.subscribe(policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
271+
ros_sub = subscribe(ros_node, &prassthrough_callback_queue, policy.name_id, policy.size > 0 ? policy.size : 1, &RosSubChannelElement::newData, this); // minimum queue_size 1
270272
}
271273
}
272274

@@ -350,6 +352,6 @@ namespace rtt_roscomm {
350352

351353
return channel;
352354
}
353-
};
354-
}
355+
}; // class RosMsgTransporter
356+
} // namespace rtt_roscomm
355357
#endif
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
2+
#include "rtt_roscomm/passthrough_callback_queue.hpp"
3+
4+
namespace rtt_roscomm {
5+
6+
PassthroughCallbackQueue::PassthroughCallbackQueue()
7+
{}
8+
9+
void PassthroughCallbackQueue::addCallback(
10+
const ros::CallbackInterfacePtr &callback,
11+
uint64_t owner_id)
12+
{
13+
// Call CallbackInterface::CallResult SubscriptionQueue::call()
14+
if (ros::CallbackInterface::TryAgain == callback->call()) {
15+
ros::getGlobalCallbackQueue()->addCallback(callback, owner_id);
16+
}
17+
}
18+
19+
void PassthroughCallbackQueue::removeByID(uint64_t owner_id)
20+
{
21+
}
22+
23+
} // namespace rtt_roscomm
24+

0 commit comments

Comments
 (0)