@@ -17,37 +17,31 @@ function publish{MsgType<:MsgT}(p::Publisher{MsgType}, msg::MsgType)
1717 pycall (p. o[" publish" ], PyAny, convert (PyObject, msg))
1818end
1919
20- if _threads_enabled () # callbacks are broken
21-
22- type Subscriber{T}
23- end
24- Subscriber (args... ) = error (
25- """ Subscribing to a topic is currently broken on julia v0.5 and above. See
26- https://github.com/jdlangs/RobotOS.jl/issues/15 for ongoing efforts to fix this.""" )
27-
28- else # callbacks not broken
29-
3020type Subscriber{MsgType<: MsgT }
31- o:: PyObject
3221 callback
22+ callback_args:: Tuple
23+ sub_obj:: PyObject
24+ queue:: PyObject
25+ async_loop:: Task
3326
3427 function Subscriber (topic:: AbstractString , cb, cb_args:: Tuple = (); kwargs... )
3528 @debug (" Creating <$(string (MsgType)) > subscriber on topic: '$topic '" )
3629 rospycls = _get_rospy_class (MsgType)
37- jl_cb (msg:: PyObject ) = cb (convert (MsgType, msg), cb_args... )
38- return new (
39- __rospy__[:Subscriber ](ascii (topic), rospycls, jl_cb; kwargs... ),
40- jl_cb
41- )
30+
31+ cond = Base. AsyncCondition ()
32+ mqueue = _py_ros_callbacks[" MessageQueue" ](CB_NOTIFY_PTR, cond. handle)
33+ subobj = __rospy__[:Subscriber ](ascii (topic), rospycls, mqueue[" storemsg" ]; kwargs... )
34+
35+ rosobj = new (cb, cb_args, subobj, mqueue)
36+ cbloop = Task (() -> _callback_async_loop (rosobj, cond))
37+ schedule (cbloop)
38+
39+ rosobj. async_loop = cbloop
40+ return rosobj
4241 end
4342end
4443
45- Subscriber {MsgType<:MsgT} (
46- topic:: AbstractString ,
47- :: Type{MsgType} ,
48- cb,
49- cb_args:: Tuple = ();
50- kwargs...
51- ) = Subscriber {MsgType} (ascii (topic), cb, cb_args; kwargs... )
44+ function Subscriber {MsgType<:MsgT} (topic, :: Type{MsgType} , cb, cb_args:: Tuple = (); kwargs... )
45+ Subscriber {MsgType} (topic, cb, cb_args; kwargs... )
46+ end
5247
53- end # check
0 commit comments