@@ -14,11 +14,23 @@ use tokio::sync::mpsc;
1414pub struct LuaChannel {
1515 name : String ,
1616 id : i64 ,
17+ pub receiver : Arc < Mutex < mpsc:: Receiver < i64 > > > ,
18+ pub sender : Arc < Mutex < mpsc:: Sender < i64 > > > ,
1719}
1820
1921impl LuaChannel {
20- fn new ( name : String , id : i64 ) -> LuaChannel {
21- LuaChannel { name, id }
22+ fn new (
23+ name : String ,
24+ id : i64 ,
25+ receiver : Arc < Mutex < mpsc:: Receiver < i64 > > > ,
26+ sender : Arc < Mutex < mpsc:: Sender < i64 > > > ,
27+ ) -> LuaChannel {
28+ LuaChannel {
29+ name,
30+ id,
31+ receiver,
32+ sender,
33+ }
2234 }
2335
2436 pub fn mt_string ( & self ) -> String {
@@ -28,17 +40,13 @@ impl LuaChannel {
2840
2941pub struct LuaChannelMgr {
3042 channels : HashMap < String , LuaChannel > ,
31- receivers : HashMap < i64 , Arc < Mutex < mpsc:: Receiver < i64 > > > > ,
32- senders : HashMap < i64 , Arc < Mutex < mpsc:: Sender < i64 > > > > ,
3343 id_counter : i64 ,
3444}
3545
3646impl LuaChannelMgr {
3747 pub fn new ( ) -> LuaChannelMgr {
3848 LuaChannelMgr {
3949 channels : HashMap :: new ( ) ,
40- receivers : HashMap :: new ( ) ,
41- senders : HashMap :: new ( ) ,
4250 id_counter : 0 ,
4351 }
4452 }
@@ -47,23 +55,18 @@ impl LuaChannelMgr {
4755 let ( sender, receiver) = mpsc:: channel ( 100 ) ;
4856 let id = self . id_counter ;
4957 self . id_counter += 1 ;
50- let channel = LuaChannel :: new ( name. clone ( ) , id) ;
58+ let channel = LuaChannel :: new (
59+ name. clone ( ) ,
60+ id,
61+ Arc :: new ( Mutex :: new ( receiver) ) ,
62+ Arc :: new ( Mutex :: new ( sender) ) ,
63+ ) ;
5164 self . channels . insert ( name. clone ( ) , channel) ;
52- self . receivers . insert ( id, Arc :: new ( Mutex :: new ( receiver) ) ) ;
53- self . senders . insert ( id, Arc :: new ( Mutex :: new ( sender) ) ) ;
5465 }
5566
5667 pub fn get_channel ( & self , name : & str ) -> Option < LuaChannel > {
5768 self . channels . get ( name) . cloned ( )
5869 }
59-
60- pub fn get_sender ( & self , id : i64 ) -> Option < Arc < Mutex < mpsc:: Sender < i64 > > > > {
61- self . senders . get ( & id) . cloned ( )
62- }
63-
64- pub fn get_receiver ( & self , id : i64 ) -> Option < Arc < Mutex < mpsc:: Receiver < i64 > > > > {
65- self . receivers . get ( & id) . cloned ( )
66- }
6770}
6871
6972lazy_static ! {
@@ -77,28 +80,20 @@ impl UserData for LuaChannel {
7780 } ) ;
7881
7982 methods. add_async_method ( "push" , |lua, this, args : mlua:: MultiValue | async move {
80- let id = this. id ;
8183 let lua_seri_pack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_pack" ) ?;
8284 let ptr = lua_seri_pack. call :: < i64 > ( args) . unwrap ( ) ;
83- let opt_sender = { ChannelMgr . lock ( ) . unwrap ( ) . get_sender ( id) } ;
84- if let Some ( sender) = opt_sender {
85- let sender = sender. lock ( ) . unwrap ( ) ;
86- sender. send ( ptr) . await . unwrap ( ) ;
87- }
85+ let sender = this. sender . lock ( ) . unwrap ( ) ;
86+ sender. send ( ptr) . await . unwrap ( ) ;
8887 Ok ( ( ) )
8988 } ) ;
9089
9190 methods. add_method ( "pop" , |lua, this, ( ) | {
92- let id = this. id ;
93- let opt_receiver = { ChannelMgr . lock ( ) . unwrap ( ) . get_receiver ( id) } ;
94- if let Some ( receiver) = opt_receiver {
95- let data = receiver. lock ( ) . unwrap ( ) . try_recv ( ) ;
96- if let Ok ( data) = data {
97- let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
98- let mut returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
99- returns. insert ( 0 , mlua:: Value :: Boolean ( true ) ) ;
100- return Ok ( returns) ;
101- }
91+ let data = this. receiver . lock ( ) . unwrap ( ) . try_recv ( ) ;
92+ if let Ok ( data) = data {
93+ let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
94+ let mut returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
95+ returns. insert ( 0 , mlua:: Value :: Boolean ( true ) ) ;
96+ return Ok ( returns) ;
10297 }
10398
10499 let mut returns = mlua:: MultiValue :: new ( ) ;
@@ -107,15 +102,11 @@ impl UserData for LuaChannel {
107102 } ) ;
108103
109104 methods. add_async_method ( "bpop" , |lua, this, ( ) | async move {
110- let id = this. id ;
111- let opt_receiver = { ChannelMgr . lock ( ) . unwrap ( ) . get_receiver ( id) } ;
112- if let Some ( receiver) = opt_receiver {
113- let data = receiver. lock ( ) . unwrap ( ) . recv ( ) . await ;
114- if let Some ( data) = data {
115- let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
116- let returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
117- return Ok ( returns) ;
118- }
105+ let data = { this. receiver . lock ( ) . unwrap ( ) . recv ( ) . await } ;
106+ if let Some ( data) = data {
107+ let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
108+ let returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
109+ return Ok ( returns) ;
119110 }
120111
121112 let returns = mlua:: MultiValue :: new ( ) ;
0 commit comments