@@ -20,6 +20,7 @@ import (
2020)
2121
2222const requestsMap = 128
23+ const ignoreStreamId = 0
2324const (
2425 connDisconnected = 0
2526 connConnected = 1
@@ -143,6 +144,8 @@ type Connection struct {
143144 state uint32
144145 dec * msgpack.Decoder
145146 lenbuf [PacketLengthBytes ]byte
147+
148+ lastStreamId uint64
146149}
147150
148151var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -528,16 +531,27 @@ func (conn *Connection) dial() (err error) {
528531 return
529532}
530533
531- func pack (h * smallWBuf , enc * msgpack.Encoder , reqid uint32 , req Request , res SchemaResolver ) (err error ) {
534+ func pack (h * smallWBuf , enc * msgpack.Encoder , reqid uint32 ,
535+ req Request , streamId uint64 , res SchemaResolver ) (err error ) {
532536 hl := h .Len ()
533- h .Write ([]byte {
537+
538+ hMapLen := byte (0x82 ) // 2 element map.
539+ if streamId != ignoreStreamId {
540+ hMapLen = byte (0x83 ) // 3 element map.
541+ }
542+ hBytes := []byte {
534543 0xce , 0 , 0 , 0 , 0 , // Length.
535- 0x82 , // 2 element map.
544+ hMapLen ,
536545 KeyCode , byte (req .Code ()), // Request code.
537546 KeySync , 0xce ,
538547 byte (reqid >> 24 ), byte (reqid >> 16 ),
539548 byte (reqid >> 8 ), byte (reqid ),
540- })
549+ }
550+ if streamId != ignoreStreamId {
551+ hBytes = append (hBytes , KeyStreamId , byte (streamId ))
552+ }
553+
554+ h .Write (hBytes )
541555
542556 if err = req .Body (res , enc ); err != nil {
543557 return
@@ -555,7 +569,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
555569func (conn * Connection ) writeAuthRequest (w * bufio.Writer , scramble []byte ) (err error ) {
556570 var packet smallWBuf
557571 req := newAuthRequest (conn .opts .User , string (scramble ))
558- err = pack (& packet , msgpack .NewEncoder (& packet ), 0 , req , conn .Schema )
572+ err = pack (& packet , msgpack .NewEncoder (& packet ), 0 , req , ignoreStreamId , conn .Schema )
559573
560574 if err != nil {
561575 return errors .New ("auth: pack error " + err .Error ())
@@ -869,7 +883,7 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
869883 }
870884}
871885
872- func (conn * Connection ) send (req Request ) * Future {
886+ func (conn * Connection ) send (req Request , streamId uint64 ) * Future {
873887 fut := conn .newFuture (req .Ctx ())
874888 if fut .ready == nil {
875889 return fut
@@ -882,14 +896,14 @@ func (conn *Connection) send(req Request) *Future {
882896 default :
883897 }
884898 }
885- conn .putFuture (fut , req )
899+ conn .putFuture (fut , req , streamId )
886900 if req .Ctx () != nil {
887901 go conn .contextWatchdog (fut , req .Ctx ())
888902 }
889903 return fut
890904}
891905
892- func (conn * Connection ) putFuture (fut * Future , req Request ) {
906+ func (conn * Connection ) putFuture (fut * Future , req Request , streamId uint64 ) {
893907 shardn := fut .requestId & (conn .opts .Concurrency - 1 )
894908 shard := & conn .shard [shardn ]
895909 shard .bufmut .Lock ()
@@ -906,7 +920,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
906920 }
907921 blen := shard .buf .Len ()
908922 reqid := fut .requestId
909- if err := pack (& shard .buf , shard .enc , reqid , req , conn .Schema ); err != nil {
923+ if err := pack (& shard .buf , shard .enc , reqid , req , streamId , conn .Schema ); err != nil {
910924 shard .buf .Trunc (blen )
911925 shard .bufmut .Unlock ()
912926 if f := conn .fetchFuture (reqid ); f == fut {
@@ -1095,7 +1109,7 @@ func (conn *Connection) Do(req Request) *Future {
10951109 default :
10961110 }
10971111 }
1098- return conn .send (req )
1112+ return conn .send (req , ignoreStreamId )
10991113}
11001114
11011115// ConfiguredTimeout returns a timeout from connection config.
@@ -1121,3 +1135,16 @@ func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
11211135 }
11221136 return NewPreparedFromResponse (conn , resp )
11231137}
1138+
1139+ // NewStream creates new Stream object for connection.
1140+ //
1141+ // Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1142+ // To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1143+ // Since 1.7.0
1144+ func (conn * Connection ) NewStream () (* Stream , error ) {
1145+ next := atomic .AddUint64 (& conn .lastStreamId , 1 )
1146+ return & Stream {
1147+ Id : next ,
1148+ Conn : conn ,
1149+ }, nil
1150+ }
0 commit comments