66 "errors"
77 "log"
88 "net"
9+ "strconv"
910 "sync"
1011 "sync/atomic"
1112 "time"
@@ -38,12 +39,16 @@ type Client interface {
3839 // Publish will publish a message with the specified DeliveryMode and content
3940 // to the specified topic.
4041 Publish (topic string , payload []byte , pubOpts ... PubOptions ) Result
42+ // Relay sends a request to relay messages for one or more topics those are persisted on the server.
43+ // Provide a MessageHandler to be executed when a message is published on the topic provided,
44+ // or nil for the default handler.
45+ Relay (topics []string , relOpts ... RelOptions ) Result
4146 // Subscribe starts a new subscription. Provide a MessageHandler to be executed when
4247 // a message is published on the topic provided, or nil for the default handler.
43- // Relay sends a relay request to server. Provide a MessageHandler to be executed when
44- // a message is published on the topic provided, or nil for the default handler.
45- Relay (topic string , relOpts ... RelOptions ) Result
4648 Subscribe (topic string , subOpts ... SubOptions ) Result
49+ // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to be executed when
50+ // a message is published on the topic provided, or nil for the default handler.
51+ SubscribeMultiple (subs []string , subOpts ... SubOptions ) Result
4752 // Unsubscribe will end the subscription from each of the topics provided.
4853 // Messages published to those topics from other clients will no longer be
4954 // received.
@@ -312,7 +317,7 @@ func (c *client) serverDisconnect(err error) {
312317
313318// Publish will publish a message with the specified DeliveryMode and content
314319// to the specified topic.
315- func (c * client ) Publish (topic string , payload []byte , pubOpts ... PubOptions ) Result {
320+ func (c * client ) Publish (pubTopic string , payload []byte , pubOpts ... PubOptions ) Result {
316321 r := & PublishResult {result : result {complete : make (chan struct {})}}
317322 if err := c .ok (); err != nil {
318323 r .setError (errors .New ("error not connected" ))
@@ -324,22 +329,52 @@ func (c *client) Publish(topic string, payload []byte, pubOpts ...PubOptions) Re
324329 opt .set (opts )
325330 }
326331
332+ deliveryMode := opts .deliveryMode
333+ delay := opts .delay
334+ ttl := opts .ttl
335+ t := new (topic )
336+
337+ // parse the topic.
338+ if ok := t .parse (pubTopic ); ! ok {
339+ r .setError (errors .New ("publish: unable to parse topic" ))
340+ return r
341+ }
342+
343+ if dMode , ok := t .getOption ("delivery_mode" ); ok {
344+ val , err := strconv .ParseInt (dMode , 10 , 64 )
345+ if err == nil {
346+ deliveryMode = uint8 (val )
347+ }
348+ }
349+
350+ if d , ok := t .getOption ("delay" ); ok {
351+ val , err := strconv .ParseInt (d , 10 , 64 )
352+ if err == nil {
353+ delay = int32 (val )
354+ }
355+ }
356+
357+ if dur , ok := t .getOption ("ttl" ); ok {
358+ ttl = dur
359+ }
360+
327361 pubMsg := & utp.PublishMessage {
328- Topic : topic ,
362+ Topic : t . topic ,
329363 Payload : payload ,
330- Ttl : opts . ttl ,
364+ Ttl : ttl ,
331365 }
332366
333367 // Check batch or delay delivery.
334- if opts .deliveryMode == 2 || opts .delay > 0 {
335- // timeID := c.TimeID(opts.delay)
336- return c .batchManager .add (opts .delay , pubMsg )
368+ if deliveryMode == 2 || delay > 0 {
369+ return c .batchManager .add (delay , pubMsg )
337370 }
338- pub := & utp.Publish {DeliveryMode : opts .deliveryMode , Messages : []* utp.PublishMessage {pubMsg }}
371+ pub := & utp.Publish {DeliveryMode : deliveryMode , Messages : []* utp.PublishMessage {pubMsg }}
372+
339373 if pub .MessageID == 0 {
340374 mID := c .nextID (r )
341375 pub .MessageID = c .outboundID (mID )
342376 }
377+
343378 publishWaitTimeout := c .opts .writeTimeout
344379 if publishWaitTimeout == 0 {
345380 publishWaitTimeout = time .Second * 30
@@ -354,35 +389,56 @@ func (c *client) Publish(topic string, payload []byte, pubOpts ...PubOptions) Re
354389 r .setError (errors .New ("publish timeout error occurred" ))
355390 return r
356391 }
392+
357393 return r
358394}
359395
360396// Relay send a new relay request. Provide a MessageHandler to be executed when
361397// a message is published on the topic provided.
362- func (c * client ) Relay (topic string , relOpts ... RelOptions ) Result {
398+ func (c * client ) Relay (topics [] string , relOpts ... RelOptions ) Result {
363399 r := & RelayResult {result : result {complete : make (chan struct {})}}
364400 if err := c .ok (); err != nil {
365401 r .setError (errors .New ("error not connected" ))
366402 return r
367403 }
404+
368405 opts := new (relOptions )
369406 for _ , opt := range relOpts {
370407 opt .set (opts )
371408 }
372409
373410 relMsg := & utp.Relay {}
374- relMsg .RelayRequests = append (relMsg .RelayRequests , & utp.RelayRequest {Topic : topic , Last : opts .last })
411+
412+ for _ , relTopic := range topics {
413+ last := opts .last
414+ t := new (topic )
415+
416+ // parse the topic.
417+ if ok := t .parse (relTopic ); ! ok {
418+ r .setError (errors .New ("relay: unable to parse topic" ))
419+ return r
420+ }
421+
422+ if dur , ok := t .getOption ("last" ); ok {
423+ last = dur
424+ }
425+
426+ relMsg .RelayRequests = append (relMsg .RelayRequests , & utp.RelayRequest {Topic : t .topic , Last : last })
427+ }
375428
376429 if relMsg .MessageID == 0 {
377430 mID := c .nextID (r )
378431 relMsg .MessageID = c .outboundID (mID )
379432 }
433+
380434 relayWaitTimeout := c .opts .writeTimeout
381435 if relayWaitTimeout == 0 {
382436 relayWaitTimeout = time .Second * 30
383437 }
438+
384439 // persist outbound
385440 c .storeOutbound (relMsg )
441+
386442 select {
387443 case c .send <- & MessageAndResult {m : relMsg , r : r }:
388444 case <- time .After (relayWaitTimeout ):
@@ -395,30 +451,125 @@ func (c *client) Relay(topic string, relOpts ...RelOptions) Result {
395451
396452// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
397453// a message is published on the topic provided.
398- func (c * client ) Subscribe (topic string , subOpts ... SubOptions ) Result {
454+ func (c * client ) Subscribe (subTopic string , subOpts ... SubOptions ) Result {
399455 r := & SubscribeResult {result : result {complete : make (chan struct {})}}
400456 if err := c .ok (); err != nil {
401457 r .setError (errors .New ("error not connected" ))
402458 return r
403459 }
460+
404461 opts := new (subOptions )
405462 for _ , opt := range subOpts {
406463 opt .set (opts )
407464 }
408465
409466 subMsg := & utp.Subscribe {}
410- subMsg .Subscriptions = append (subMsg .Subscriptions , & utp.Subscription {DeliveryMode : opts .deliveryMode , Delay : opts .delay , Topic : topic })
467+
468+ deliveryMode := opts .deliveryMode
469+ delay := opts .delay
470+ t := new (topic )
471+
472+ // parse the topic.
473+ if ok := t .parse (subTopic ); ! ok {
474+ r .setError (errors .New ("subscribe: unable to parse topic" ))
475+ return r
476+ }
477+
478+ if dMode , ok := t .getOption ("delivery_mode" ); ok {
479+ val , err := strconv .ParseInt (dMode , 10 , 64 )
480+ if err == nil {
481+ deliveryMode = uint8 (val )
482+ }
483+ }
484+
485+ if d , ok := t .getOption ("delay" ); ok {
486+ val , err := strconv .ParseInt (d , 10 , 64 )
487+ if err == nil {
488+ delay = int32 (val )
489+ }
490+ }
491+
492+ subMsg .Subscriptions = append (subMsg .Subscriptions , & utp.Subscription {DeliveryMode : deliveryMode , Delay : delay , Topic : t .topic })
411493
412494 if subMsg .MessageID == 0 {
413495 mID := c .nextID (r )
414496 subMsg .MessageID = c .outboundID (mID )
415497 }
498+
416499 subscribeWaitTimeout := c .opts .writeTimeout
417500 if subscribeWaitTimeout == 0 {
418501 subscribeWaitTimeout = time .Second * 30
419502 }
503+
420504 // persist outbound
421505 c .storeOutbound (subMsg )
506+
507+ select {
508+ case c .send <- & MessageAndResult {m : subMsg , r : r }:
509+ case <- time .After (subscribeWaitTimeout ):
510+ r .setError (errors .New ("subscribe timeout error occurred" ))
511+ return r
512+ }
513+
514+ return r
515+ }
516+
517+ // SubscribeMultiple starts a new subscription. Provide a MessageHandler to be executed when
518+ // a message is published on the topic provided.
519+ func (c * client ) SubscribeMultiple (topics []string , subOpts ... SubOptions ) Result {
520+ r := & SubscribeResult {result : result {complete : make (chan struct {})}}
521+ if err := c .ok (); err != nil {
522+ r .setError (errors .New ("error not connected" ))
523+ return r
524+ }
525+
526+ opts := new (subOptions )
527+ for _ , opt := range subOpts {
528+ opt .set (opts )
529+ }
530+
531+ subMsg := & utp.Subscribe {}
532+ for _ , subTopic := range topics {
533+ deliveryMode := opts .deliveryMode
534+ delay := opts .delay
535+ t := new (topic )
536+
537+ // parse the topic.
538+ if ok := t .parse (subTopic ); ! ok {
539+ r .setError (errors .New ("SubscribeMultiple: unable to parse topic" ))
540+ return r
541+ }
542+
543+ if dMode , ok := t .getOption ("delivery_mode" ); ok {
544+ val , err := strconv .ParseInt (dMode , 10 , 64 )
545+ if err == nil {
546+ deliveryMode = uint8 (val )
547+ }
548+ }
549+
550+ if d , ok := t .getOption ("delay" ); ok {
551+ val , err := strconv .ParseInt (d , 10 , 64 )
552+ if err == nil {
553+ delay = int32 (val )
554+ }
555+ }
556+
557+ subMsg .Subscriptions = append (subMsg .Subscriptions , & utp.Subscription {DeliveryMode : deliveryMode , Delay : delay , Topic : t .topic })
558+ }
559+
560+ if subMsg .MessageID == 0 {
561+ mID := c .nextID (r )
562+ subMsg .MessageID = c .outboundID (mID )
563+ }
564+
565+ subscribeWaitTimeout := c .opts .writeTimeout
566+ if subscribeWaitTimeout == 0 {
567+ subscribeWaitTimeout = time .Second * 30
568+ }
569+
570+ // persist outbound
571+ c .storeOutbound (subMsg )
572+
422573 select {
423574 case c .send <- & MessageAndResult {m : subMsg , r : r }:
424575 case <- time .After (subscribeWaitTimeout ):
@@ -434,29 +585,35 @@ func (c *client) Subscribe(topic string, subOpts ...SubOptions) Result {
434585// received.
435586func (c * client ) Unsubscribe (topics ... string ) Result {
436587 r := & SubscribeResult {result : result {complete : make (chan struct {})}}
588+
437589 unsubMsg := & utp.Unsubscribe {}
438590 var subs []* utp.Subscription
439591 for _ , topic := range topics {
440592 sub := & utp.Subscription {Topic : topic }
441593 subs = append (subs , sub )
442594 }
443595 unsubMsg .Subscriptions = subs
596+
444597 if unsubMsg .MessageID == 0 {
445598 mID := c .nextID (r )
446599 unsubMsg .MessageID = c .outboundID (mID )
447600 }
601+
448602 unsubscribeWaitTimeout := c .opts .writeTimeout
449603 if unsubscribeWaitTimeout == 0 {
450604 unsubscribeWaitTimeout = time .Second * 30
451605 }
606+
452607 // persist outbound
453608 c .storeOutbound (unsubMsg )
609+
454610 select {
455611 case c .send <- & MessageAndResult {m : unsubMsg , r : r }:
456612 case <- time .After (unsubscribeWaitTimeout ):
457613 r .setError (errors .New ("unsubscribe timeout error occurred" ))
458614 return r
459615 }
616+
460617 return r
461618}
462619
0 commit comments