@@ -180,6 +180,13 @@ func (c *_Conn) subscribe(msg lp.Subscribe, topic *security.Topic, sub *lp.Subsc
180180 defer c .Unlock ()
181181
182182 key := string (topic .Key )
183+ if key == "" {
184+ key , err = security .GenerateKey (c .clientID .Contract (), []byte (topic .Topic ), security .AllowNone )
185+ if err != nil {
186+ log .ErrLogger .Err (err ).Str ("context" , "conn.subscribe" )
187+ return err
188+ }
189+ }
183190 if exists := c .subs .Exist (key ); exists && ! msg .IsForwarded && Globals .Cluster .isRemoteContract (string (c .clientID .Contract ())) {
184191 // The contract is handled by a remote node. Forward message to it.
185192 if err := Globals .Cluster .routeToContract (& msg , topic , message .SUBSCRIBE , & message.Message {}, c ); err != nil {
@@ -242,7 +249,7 @@ func (c *_Conn) publish(pkt lp.Publish, topic *security.Topic, m *lp.PublishMess
242249 // subscription count
243250 msgCount := 0
244251
245- subs , err := store .Subscription .Get (c .clientID .Contract (), topic .Topic )
252+ subscriptions , err := store .Subscription .Get (c .clientID .Contract (), topic .Topic )
246253 if err != nil {
247254 log .ErrLogger .Err (err ).Str ("context" , "conn.publish" )
248255 }
@@ -251,11 +258,11 @@ func (c *_Conn) publish(pkt lp.Publish, topic *security.Topic, m *lp.PublishMess
251258 Topic : topic .Topic [:topic .Size ],
252259 Payload : m .Payload ,
253260 }
254- for _ , sub := range subs {
255- pubMsg .DeliveryMode = sub [0 ]
256- lid := uid .LID (binary .LittleEndian .Uint32 (sub [1 :5 ]))
257- pubMsg .Delay = int32 (uid .LID (binary .LittleEndian .Uint32 (sub [5 :9 ])))
258- sub := Globals .connCache .get (lid )
261+ for _ , subscription := range subscriptions {
262+ pubMsg .DeliveryMode = subscription [0 ]
263+ connID := uid .LID (binary .LittleEndian .Uint32 (subscription [1 :5 ]))
264+ pubMsg .Delay = int32 (uid .LID (binary .LittleEndian .Uint32 (subscription [5 :9 ])))
265+ sub := Globals .connCache .get (connID )
259266 if sub != nil {
260267 if pubMsg .MessageID == 0 {
261268 pubMsg .MessageID = c .MessageIds .NextID (lp .PUBLISH .Value ())
0 commit comments