@@ -13,7 +13,6 @@ type Future struct {
1313 next * Future
1414 timeout time.Duration
1515 mutex sync.Mutex
16- pushes []Response
1716 resp Response
1817 err error
1918 ready chan struct {}
@@ -39,131 +38,15 @@ func (fut *Future) isDone() bool {
3938 }
4039}
4140
42- type asyncResponseIterator struct {
43- fut * Future
44- timeout time.Duration
45- resp Response
46- err error
47- curPos int
48- done bool
49- }
50-
51- func (it * asyncResponseIterator ) Next () bool {
52- if it .done || it .err != nil {
53- it .resp = nil
54- return false
55- }
56-
57- var last = false
58- var exit = false
59- for ! exit {
60- // We try to read at least once.
61- it .fut .mutex .Lock ()
62- it .resp = it .nextResponse ()
63- it .err = it .fut .err
64- last = it .resp == it .fut .resp
65- it .fut .mutex .Unlock ()
66-
67- if it .timeout == 0 || it .resp != nil || it .err != nil {
68- break
69- }
70-
71- select {
72- case <- it .fut .ready :
73- case <- time .After (it .timeout ):
74- exit = true
75- }
76- }
77-
78- if it .resp == nil {
79- return false
80- }
81-
82- if last {
83- it .done = true
84- } else {
85- it .err = nil
86- it .curPos += 1
87- }
88-
89- return true
90- }
91-
92- func (it * asyncResponseIterator ) Value () Response {
93- return it .resp
94- }
95-
96- func (it * asyncResponseIterator ) IsPush () bool {
97- return ! it .done
98- }
99-
100- func (it * asyncResponseIterator ) Err () error {
101- return it .err
102- }
103-
104- func (it * asyncResponseIterator ) WithTimeout (timeout time.Duration ) TimeoutResponseIterator {
105- it .timeout = timeout
106- return it
107- }
108-
109- func (it * asyncResponseIterator ) nextResponse () (resp Response ) {
110- fut := it .fut
111- pushesLen := len (fut .pushes )
112-
113- if it .curPos < pushesLen {
114- resp = fut .pushes [it .curPos ]
115- } else if it .curPos == pushesLen {
116- resp = fut .resp
117- }
118-
119- return resp
120- }
121-
122- // PushResponse is used for push requests for the Future.
123- type PushResponse struct {
124- baseResponse
125- }
126-
127- func createPushResponse (header Header , body io.Reader ) (Response , error ) {
128- resp , err := createBaseResponse (header , body )
129- if err != nil {
130- return nil , err
131- }
132- return & PushResponse {resp }, nil
133- }
134-
13541// NewFuture creates a new empty Future for a given Request.
13642func NewFuture (req Request ) (fut * Future ) {
13743 fut = & Future {}
13844 fut .ready = make (chan struct {}, 1000000000 )
13945 fut .done = make (chan struct {})
140- fut .pushes = make ([]Response , 0 )
14146 fut .req = req
14247 return fut
14348}
14449
145- // AppendPush appends the push response to the future.
146- // Note: it works only before SetResponse() or SetError()
147- //
148- // Deprecated: the method will be removed in the next major version,
149- // use Connector.NewWatcher() instead of box.session.push().
150- func (fut * Future ) AppendPush (header Header , body io.Reader ) error {
151- fut .mutex .Lock ()
152- defer fut .mutex .Unlock ()
153-
154- if fut .isDone () {
155- return nil
156- }
157- resp , err := createPushResponse (header , body )
158- if err != nil {
159- return err
160- }
161- fut .pushes = append (fut .pushes , resp )
162-
163- fut .ready <- struct {}{}
164- return nil
165- }
166-
16750// SetResponse sets a response for the future and finishes the future.
16851func (fut * Future ) SetResponse (header Header , body io.Reader ) error {
16952 fut .mutex .Lock ()
@@ -235,24 +118,6 @@ func (fut *Future) GetTyped(result interface{}) error {
235118 return fut .resp .DecodeTyped (result )
236119}
237120
238- // GetIterator returns an iterator for iterating through push messages
239- // and a response. Push messages and the response will contain deserialized
240- // result in Data field as for the Get() function.
241- //
242- // # See also
243- //
244- // - box.session.push():
245- // https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/push/
246- //
247- // Deprecated: the method will be removed in the next major version,
248- // use Connector.NewWatcher() instead of box.session.push().
249- func (fut * Future ) GetIterator () (it TimeoutResponseIterator ) {
250- futit := & asyncResponseIterator {
251- fut : fut ,
252- }
253- return futit
254- }
255-
256121var closedChan = make (chan struct {})
257122
258123func init () {
0 commit comments