Skip to content

Commit feb0ec8

Browse files
committed
pushAPI: removed deprecated methods
Removed `box.session.push()` usage: `Future.AppendPush()`, `Future.GetIterator()` methods, `ResponseIterator` and `TimeoutResponseIterator` types, `pushes[]` field in `Future` and related methods. Removed tests which became unnecessary. Closes #480
1 parent c972ce4 commit feb0ec8

File tree

8 files changed

+8
-623
lines changed

8 files changed

+8
-623
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2020
* Now cases of `<-ctx.Done()` returns wrapped error provided by `ctx.Cause()`.
2121
Allows you compare it using `errors.Is/As` (#457).
2222
* Removed deprecated `pool` methods, related interfaces and tests are updated (#478).
23+
* Removed deprecated `box.session.push()` usage: Future.AppendPush()
24+
and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types,
25+
Future.pushes[] (#480).
26+
* Removed `code == iproto.IPROTO_CHUNK` checking in `Connection.reader` due to removing
27+
`box.session.push()` usage (#480)
2328

2429
### Fixed
2530

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ TODO
1212
* `box.New` returns an error instead of panic
1313
* Added `box.MustNew` wrapper for `box.New` without an error
1414
* Removed deprecated `pool` methods, related interfaces and tests are updated.
15+
* Removed `box.session.push()` usage: Future.AppendPush() and Future.GetIterator()
16+
methods, ResponseIterator and TimeoutResponseIterator types, `Future.pushes[]`.
1517

1618
## Migration from v1.x.x to v2.x.x
1719

connection.go

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -882,15 +882,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
882882
}
883883
continue
884884
} else if code == iproto.IPROTO_CHUNK {
885-
if fut = conn.peekFuture(header.RequestId); fut != nil {
886-
if err := fut.AppendPush(header, &buf); err != nil {
887-
err = ClientError{
888-
ErrProtocolError,
889-
fmt.Sprintf("failed to append push response: %s", err),
890-
}
891-
conn.opts.Logger.Report(LogAppendPushFailed, conn, err)
892-
}
893-
}
885+
// not implemented
894886
} else {
895887
if fut = conn.fetchFuture(header.RequestId); fut != nil {
896888
if err := fut.SetResponse(header, &buf); err != nil {
@@ -1131,26 +1123,6 @@ func (conn *Connection) markDone(fut *Future) {
11311123
conn.decrementRequestCnt()
11321124
}
11331125

1134-
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
1135-
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1136-
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
1137-
shard.rmut.Lock()
1138-
defer shard.rmut.Unlock()
1139-
1140-
if conn.opts.Timeout > 0 {
1141-
if fut = conn.getFutureImp(reqid, true); fut != nil {
1142-
pair := &shard.requests[pos]
1143-
*pair.last = fut
1144-
pair.last = &fut.next
1145-
fut.timeout = time.Since(epoch) + conn.opts.Timeout
1146-
}
1147-
} else {
1148-
fut = conn.getFutureImp(reqid, false)
1149-
}
1150-
1151-
return fut
1152-
}
1153-
11541126
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
11551127
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
11561128
shard.rmut.Lock()

example_test.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,41 +1102,6 @@ func ExampleErrorNo() {
11021102
// Success.
11031103
}
11041104

1105-
func ExampleFuture_GetIterator() {
1106-
conn := exampleConnect(dialer, opts)
1107-
defer conn.Close()
1108-
1109-
const timeout = 3 * time.Second
1110-
fut := conn.Do(tarantool.NewCallRequest("push_func").
1111-
Args([]interface{}{4}),
1112-
)
1113-
1114-
var it tarantool.ResponseIterator
1115-
for it = fut.GetIterator().WithTimeout(timeout); it.Next(); {
1116-
resp := it.Value()
1117-
data, _ := resp.Decode()
1118-
if it.IsPush() {
1119-
// It is a push message.
1120-
fmt.Printf("push message: %v\n", data[0])
1121-
} else if resp.Header().Error == tarantool.ErrorNo {
1122-
// It is a regular response.
1123-
fmt.Printf("response: %v", data[0])
1124-
} else {
1125-
fmt.Printf("an unexpected response code %d", resp.Header().Error)
1126-
}
1127-
}
1128-
if err := it.Err(); err != nil {
1129-
fmt.Printf("error in call of push_func is %v", err)
1130-
return
1131-
}
1132-
// Output:
1133-
// push message: 1
1134-
// push message: 2
1135-
// push message: 3
1136-
// push message: 4
1137-
// response: 4
1138-
}
1139-
11401105
func ExampleConnect() {
11411106
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
11421107
defer cancel()

future.go

Lines changed: 0 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ type Future struct {
1313
next *Future
1414
timeout time.Duration
1515
mutex sync.Mutex
16-
pushes []Response
1716
resp Response
1817
err error
1918
ready chan struct{}
@@ -39,131 +38,15 @@ func (fut *Future) isDone() bool {
3938
}
4039
}
4140

42-
type asyncResponseIterator struct {
43-
fut *Future
44-
timeout time.Duration
45-
resp Response
46-
err error
47-
curPos int
48-
done bool
49-
}
50-
51-
func (it *asyncResponseIterator) Next() bool {
52-
if it.done || it.err != nil {
53-
it.resp = nil
54-
return false
55-
}
56-
57-
var last = false
58-
var exit = false
59-
for !exit {
60-
// We try to read at least once.
61-
it.fut.mutex.Lock()
62-
it.resp = it.nextResponse()
63-
it.err = it.fut.err
64-
last = it.resp == it.fut.resp
65-
it.fut.mutex.Unlock()
66-
67-
if it.timeout == 0 || it.resp != nil || it.err != nil {
68-
break
69-
}
70-
71-
select {
72-
case <-it.fut.ready:
73-
case <-time.After(it.timeout):
74-
exit = true
75-
}
76-
}
77-
78-
if it.resp == nil {
79-
return false
80-
}
81-
82-
if last {
83-
it.done = true
84-
} else {
85-
it.err = nil
86-
it.curPos += 1
87-
}
88-
89-
return true
90-
}
91-
92-
func (it *asyncResponseIterator) Value() Response {
93-
return it.resp
94-
}
95-
96-
func (it *asyncResponseIterator) IsPush() bool {
97-
return !it.done
98-
}
99-
100-
func (it *asyncResponseIterator) Err() error {
101-
return it.err
102-
}
103-
104-
func (it *asyncResponseIterator) WithTimeout(timeout time.Duration) TimeoutResponseIterator {
105-
it.timeout = timeout
106-
return it
107-
}
108-
109-
func (it *asyncResponseIterator) nextResponse() (resp Response) {
110-
fut := it.fut
111-
pushesLen := len(fut.pushes)
112-
113-
if it.curPos < pushesLen {
114-
resp = fut.pushes[it.curPos]
115-
} else if it.curPos == pushesLen {
116-
resp = fut.resp
117-
}
118-
119-
return resp
120-
}
121-
122-
// PushResponse is used for push requests for the Future.
123-
type PushResponse struct {
124-
baseResponse
125-
}
126-
127-
func createPushResponse(header Header, body io.Reader) (Response, error) {
128-
resp, err := createBaseResponse(header, body)
129-
if err != nil {
130-
return nil, err
131-
}
132-
return &PushResponse{resp}, nil
133-
}
134-
13541
// NewFuture creates a new empty Future for a given Request.
13642
func NewFuture(req Request) (fut *Future) {
13743
fut = &Future{}
13844
fut.ready = make(chan struct{}, 1000000000)
13945
fut.done = make(chan struct{})
140-
fut.pushes = make([]Response, 0)
14146
fut.req = req
14247
return fut
14348
}
14449

145-
// AppendPush appends the push response to the future.
146-
// Note: it works only before SetResponse() or SetError()
147-
//
148-
// Deprecated: the method will be removed in the next major version,
149-
// use Connector.NewWatcher() instead of box.session.push().
150-
func (fut *Future) AppendPush(header Header, body io.Reader) error {
151-
fut.mutex.Lock()
152-
defer fut.mutex.Unlock()
153-
154-
if fut.isDone() {
155-
return nil
156-
}
157-
resp, err := createPushResponse(header, body)
158-
if err != nil {
159-
return err
160-
}
161-
fut.pushes = append(fut.pushes, resp)
162-
163-
fut.ready <- struct{}{}
164-
return nil
165-
}
166-
16750
// SetResponse sets a response for the future and finishes the future.
16851
func (fut *Future) SetResponse(header Header, body io.Reader) error {
16952
fut.mutex.Lock()
@@ -235,24 +118,6 @@ func (fut *Future) GetTyped(result interface{}) error {
235118
return fut.resp.DecodeTyped(result)
236119
}
237120

238-
// GetIterator returns an iterator for iterating through push messages
239-
// and a response. Push messages and the response will contain deserialized
240-
// result in Data field as for the Get() function.
241-
//
242-
// # See also
243-
//
244-
// - box.session.push():
245-
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/push/
246-
//
247-
// Deprecated: the method will be removed in the next major version,
248-
// use Connector.NewWatcher() instead of box.session.push().
249-
func (fut *Future) GetIterator() (it TimeoutResponseIterator) {
250-
futit := &asyncResponseIterator{
251-
fut: fut,
252-
}
253-
return futit
254-
}
255-
256121
var closedChan = make(chan struct{})
257122

258123
func init() {

0 commit comments

Comments
 (0)