From e0f51d9d55a70acfe59a15ba2296484803fe11a0 Mon Sep 17 00:00:00 2001 From: Viktor Tsapovskiy Date: Fri, 7 Nov 2025 23:15:02 +0300 Subject: [PATCH] api: removed deprecated methods Removed `box.session.push()` usage: * `push_func()` function in `config.lua`; * `Future.AppendPush()`, `Future.GetIterator()` methods; * `ResponseIterator` and `TimeoutResponseIterator` types; * `pushes[]` field in `Future` and related methods. Closes #480 --- CHANGELOG.md | 5 + MIGRATION.md | 3 + config.lua | 8 -- connection.go | 36 +------ example_test.go | 35 ------ future.go | 135 ----------------------- future_test.go | 269 ---------------------------------------------- response_it.go | 34 ------ tarantool_test.go | 121 --------------------- 9 files changed, 10 insertions(+), 636 deletions(-) delete mode 100644 response_it.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a0d43749b..32253ec4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,11 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * Now cases of `<-ctx.Done()` returns wrapped error provided by `ctx.Cause()`. Allows you compare it using `errors.Is/As` (#457). * Removed deprecated `pool` methods, related interfaces and tests are updated (#478). +* Removed deprecated `box.session.push()` support: Future.AppendPush() + and Future.GetIterator() methods, ResponseIterator and TimeoutResponseIterator types, + Future.pushes[] (#480). +* Removed `push_func()` function in `config.lua` + and `code == iproto.IPROTO_CHUNK` checking in `Connection.reader` (#480). ### Fixed diff --git a/MIGRATION.md b/MIGRATION.md index 3044dbffc..5374ff721 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -12,6 +12,9 @@ TODO * `box.New` returns an error instead of panic * Added `box.MustNew` wrapper for `box.New` without an error * Removed deprecated `pool` methods, related interfaces and tests are updated. +* Removed `box.session.push()` support: Future.AppendPush() and Future.GetIterator() + methods, ResponseIterator and TimeoutResponseIterator types, `Future.pushes[]`, +* Removed `push_func()` function in `config.lua` ## Migration from v1.x.x to v2.x.x diff --git a/config.lua b/config.lua index f0bf05b65..8a35cd174 100644 --- a/config.lua +++ b/config.lua @@ -174,14 +174,6 @@ local function simple_concat(a) end rawset(_G, 'simple_concat', simple_concat) -local function push_func(cnt) - for i = 1, cnt do - box.session.push(i) - end - return cnt -end -rawset(_G, 'push_func', push_func) - local function create_spaces() for i=1,10 do local s = box.schema.space.create('test' .. tostring(i), { diff --git a/connection.go b/connection.go index f8f04d014..93afb9f47 100644 --- a/connection.go +++ b/connection.go @@ -61,8 +61,6 @@ const ( LogUnexpectedResultId // LogWatchEventReadFailed is logged when failed to read a watch event. LogWatchEventReadFailed - // LogAppendPushFailed is logged when failed to append a push response. - LogAppendPushFailed ) // ConnEvent is sent throw Notify channel specified in Opts. @@ -118,9 +116,6 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac case LogWatchEventReadFailed: err := v[0].(error) log.Printf("tarantool: unable to parse watch event: %s", err) - case LogAppendPushFailed: - err := v[0].(error) - log.Printf("tarantool: unable to append a push response: %s", err) default: args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...) log.Print(args...) @@ -882,15 +877,8 @@ func (conn *Connection) reader(r io.Reader, c Conn) { } continue } else if code == iproto.IPROTO_CHUNK { - if fut = conn.peekFuture(header.RequestId); fut != nil { - if err := fut.AppendPush(header, &buf); err != nil { - err = ClientError{ - ErrProtocolError, - fmt.Sprintf("failed to append push response: %s", err), - } - conn.opts.Logger.Report(LogAppendPushFailed, conn, err) - } - } + // IPROTO_CHUNK is unexpected because box.session.push is unsupported. + log.Printf("unexpected IPROTO_CHUNK for request %d", header.RequestId) } else { if fut = conn.fetchFuture(header.RequestId); fut != nil { if err := fut.SetResponse(header, &buf); err != nil { @@ -1131,26 +1119,6 @@ func (conn *Connection) markDone(fut *Future) { conn.decrementRequestCnt() } -func (conn *Connection) peekFuture(reqid uint32) (fut *Future) { - shard := &conn.shard[reqid&(conn.opts.Concurrency-1)] - pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1) - shard.rmut.Lock() - defer shard.rmut.Unlock() - - if conn.opts.Timeout > 0 { - if fut = conn.getFutureImp(reqid, true); fut != nil { - pair := &shard.requests[pos] - *pair.last = fut - pair.last = &fut.next - fut.timeout = time.Since(epoch) + conn.opts.Timeout - } - } else { - fut = conn.getFutureImp(reqid, false) - } - - return fut -} - func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) { shard := &conn.shard[reqid&(conn.opts.Concurrency-1)] shard.rmut.Lock() diff --git a/example_test.go b/example_test.go index b4411fb69..9eadf5971 100644 --- a/example_test.go +++ b/example_test.go @@ -1102,41 +1102,6 @@ func ExampleErrorNo() { // Success. } -func ExampleFuture_GetIterator() { - conn := exampleConnect(dialer, opts) - defer conn.Close() - - const timeout = 3 * time.Second - fut := conn.Do(tarantool.NewCallRequest("push_func"). - Args([]interface{}{4}), - ) - - var it tarantool.ResponseIterator - for it = fut.GetIterator().WithTimeout(timeout); it.Next(); { - resp := it.Value() - data, _ := resp.Decode() - if it.IsPush() { - // It is a push message. - fmt.Printf("push message: %v\n", data[0]) - } else if resp.Header().Error == tarantool.ErrorNo { - // It is a regular response. - fmt.Printf("response: %v", data[0]) - } else { - fmt.Printf("an unexpected response code %d", resp.Header().Error) - } - } - if err := it.Err(); err != nil { - fmt.Printf("error in call of push_func is %v", err) - return - } - // Output: - // push message: 1 - // push message: 2 - // push message: 3 - // push message: 4 - // response: 4 -} - func ExampleConnect() { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() diff --git a/future.go b/future.go index 64e2805f1..ed3d89cdc 100644 --- a/future.go +++ b/future.go @@ -13,7 +13,6 @@ type Future struct { next *Future timeout time.Duration mutex sync.Mutex - pushes []Response resp Response err error ready chan struct{} @@ -39,131 +38,15 @@ func (fut *Future) isDone() bool { } } -type asyncResponseIterator struct { - fut *Future - timeout time.Duration - resp Response - err error - curPos int - done bool -} - -func (it *asyncResponseIterator) Next() bool { - if it.done || it.err != nil { - it.resp = nil - return false - } - - var last = false - var exit = false - for !exit { - // We try to read at least once. - it.fut.mutex.Lock() - it.resp = it.nextResponse() - it.err = it.fut.err - last = it.resp == it.fut.resp - it.fut.mutex.Unlock() - - if it.timeout == 0 || it.resp != nil || it.err != nil { - break - } - - select { - case <-it.fut.ready: - case <-time.After(it.timeout): - exit = true - } - } - - if it.resp == nil { - return false - } - - if last { - it.done = true - } else { - it.err = nil - it.curPos += 1 - } - - return true -} - -func (it *asyncResponseIterator) Value() Response { - return it.resp -} - -func (it *asyncResponseIterator) IsPush() bool { - return !it.done -} - -func (it *asyncResponseIterator) Err() error { - return it.err -} - -func (it *asyncResponseIterator) WithTimeout(timeout time.Duration) TimeoutResponseIterator { - it.timeout = timeout - return it -} - -func (it *asyncResponseIterator) nextResponse() (resp Response) { - fut := it.fut - pushesLen := len(fut.pushes) - - if it.curPos < pushesLen { - resp = fut.pushes[it.curPos] - } else if it.curPos == pushesLen { - resp = fut.resp - } - - return resp -} - -// PushResponse is used for push requests for the Future. -type PushResponse struct { - baseResponse -} - -func createPushResponse(header Header, body io.Reader) (Response, error) { - resp, err := createBaseResponse(header, body) - if err != nil { - return nil, err - } - return &PushResponse{resp}, nil -} - // NewFuture creates a new empty Future for a given Request. func NewFuture(req Request) (fut *Future) { fut = &Future{} fut.ready = make(chan struct{}, 1000000000) fut.done = make(chan struct{}) - fut.pushes = make([]Response, 0) fut.req = req return fut } -// AppendPush appends the push response to the future. -// Note: it works only before SetResponse() or SetError() -// -// Deprecated: the method will be removed in the next major version, -// use Connector.NewWatcher() instead of box.session.push(). -func (fut *Future) AppendPush(header Header, body io.Reader) error { - fut.mutex.Lock() - defer fut.mutex.Unlock() - - if fut.isDone() { - return nil - } - resp, err := createPushResponse(header, body) - if err != nil { - return err - } - fut.pushes = append(fut.pushes, resp) - - fut.ready <- struct{}{} - return nil -} - // SetResponse sets a response for the future and finishes the future. func (fut *Future) SetResponse(header Header, body io.Reader) error { fut.mutex.Lock() @@ -235,24 +118,6 @@ func (fut *Future) GetTyped(result interface{}) error { return fut.resp.DecodeTyped(result) } -// GetIterator returns an iterator for iterating through push messages -// and a response. Push messages and the response will contain deserialized -// result in Data field as for the Get() function. -// -// # See also -// -// - box.session.push(): -// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_session/push/ -// -// Deprecated: the method will be removed in the next major version, -// use Connector.NewWatcher() instead of box.session.push(). -func (fut *Future) GetIterator() (it TimeoutResponseIterator) { - futit := &asyncResponseIterator{ - fut: fut, - } - return futit -} - var closedChan = make(chan struct{}) func init() { diff --git a/future_test.go b/future_test.go index fe13bc103..47f4e3c20 100644 --- a/future_test.go +++ b/future_test.go @@ -3,16 +3,12 @@ package tarantool_test import ( "bytes" "context" - "errors" "io" - "sync" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/tarantool/go-iproto" . "github.com/tarantool/go-tarantool/v3" - "github.com/tarantool/go-tarantool/v3/test_helpers" "github.com/vmihailenco/msgpack/v5" ) @@ -80,271 +76,6 @@ func createFutureMockResponse(header Header, body io.Reader) (Response, error) { return &futureMockResponse{header: header, data: data}, nil } -func assertResponseIteratorValue(t testing.TB, it ResponseIterator, - isPush bool, resp Response) { - t.Helper() - - if it.Err() != nil { - t.Errorf("An unexpected iteration error: %q", it.Err().Error()) - } - - if it.Value() == nil { - t.Errorf("An unexpected nil value") - } else if it.IsPush() != isPush { - if isPush { - t.Errorf("An unexpected response type, expected to be push") - } else { - t.Errorf("An unexpected response type, expected not to be push") - } - } - - assert.Equalf(t, it.Value(), resp, "An unexpected response %v, expected %v", it.Value(), resp) -} - -func assertResponseIteratorFinished(t testing.TB, it ResponseIterator) { - t.Helper() - - if it.Err() != nil { - t.Errorf("An unexpected iteration error: %q", it.Err().Error()) - } - if it.Value() != nil { - t.Errorf("An unexpected value %v", it.Value()) - } -} - -func TestFutureGetIteratorNoItems(t *testing.T) { - fut := NewFuture(test_helpers.NewMockRequest()) - - it := fut.GetIterator() - if it.Next() { - t.Errorf("An unexpected next value.") - } else { - assertResponseIteratorFinished(t, it) - } -} - -func TestFutureGetIteratorNoResponse(t *testing.T) { - pushHeader := Header{} - push := &PushResponse{} - fut := NewFuture(test_helpers.NewMockRequest()) - fut.AppendPush(pushHeader, nil) - - if it := fut.GetIterator(); it.Next() { - assertResponseIteratorValue(t, it, true, push) - if it.Next() == true { - t.Errorf("An unexpected next value.") - } - assertResponseIteratorFinished(t, it) - } else { - t.Errorf("A push message expected.") - } -} - -func TestFutureGetIteratorNoResponseTimeout(t *testing.T) { - pushHeader := Header{} - push := &PushResponse{} - fut := NewFuture(test_helpers.NewMockRequest()) - fut.AppendPush(pushHeader, nil) - - if it := fut.GetIterator().WithTimeout(1 * time.Nanosecond); it.Next() { - assertResponseIteratorValue(t, it, true, push) - if it.Next() == true { - t.Errorf("An unexpected next value.") - } - assertResponseIteratorFinished(t, it) - } else { - t.Errorf("A push message expected.") - } -} - -func TestFutureGetIteratorResponseOnTimeout(t *testing.T) { - pushHeader := Header{} - respHeader := Header{} - push := &PushResponse{} - resp := &test_helpers.MockResponse{} - fut := NewFuture(test_helpers.NewMockRequest()) - fut.AppendPush(pushHeader, nil) - - var done sync.WaitGroup - var wait sync.WaitGroup - wait.Add(1) - done.Add(1) - - go func() { - defer done.Done() - - var it ResponseIterator - var cnt = 0 - for it = fut.GetIterator().WithTimeout(5 * time.Second); it.Next(); { - var r Response - isPush := true - r = push - if cnt == 1 { - isPush = false - r = resp - } - assertResponseIteratorValue(t, it, isPush, r) - cnt += 1 - if cnt == 1 { - wait.Done() - } - } - assertResponseIteratorFinished(t, it) - - if cnt != 2 { - t.Errorf("An unexpected count of responses %d != %d", cnt, 2) - } - }() - - wait.Wait() - - fut.SetResponse(respHeader, nil) - done.Wait() -} - -func TestFutureGetIteratorFirstResponse(t *testing.T) { - resp := &test_helpers.MockResponse{} - fut := NewFuture(test_helpers.NewMockRequest()) - fut.SetResponse(Header{}, nil) - fut.SetResponse(Header{}, nil) - - if it := fut.GetIterator(); it.Next() { - assertResponseIteratorValue(t, it, false, resp) - if it.Next() == true { - t.Errorf("An unexpected next value.") - } - assertResponseIteratorFinished(t, it) - } else { - t.Errorf("A response expected.") - } -} - -func TestFutureGetIteratorFirstError(t *testing.T) { - const errMsg1 = "error1" - const errMsg2 = "error2" - - fut := NewFuture(test_helpers.NewMockRequest()) - fut.SetError(errors.New(errMsg1)) - fut.SetError(errors.New(errMsg2)) - - it := fut.GetIterator() - if it.Next() { - t.Errorf("An unexpected value.") - } else if it.Err() == nil { - t.Errorf("An error expected.") - } else if it.Err().Error() != errMsg1 { - t.Errorf("An unexpected error %q, expected %q", it.Err().Error(), errMsg1) - } -} - -func TestFutureGetIteratorResponse(t *testing.T) { - responses := []Response{ - &PushResponse{}, - &PushResponse{}, - &test_helpers.MockResponse{}, - } - header := Header{} - fut := NewFuture(test_helpers.NewMockRequest()) - for i := range responses { - if i == len(responses)-1 { - fut.SetResponse(header, nil) - } else { - fut.AppendPush(header, nil) - } - } - - var its = []ResponseIterator{ - fut.GetIterator(), - fut.GetIterator().WithTimeout(5 * time.Second), - } - for _, it := range its { - var cnt = 0 - for it.Next() { - isPush := true - if cnt == len(responses)-1 { - isPush = false - } - assertResponseIteratorValue(t, it, isPush, responses[cnt]) - cnt += 1 - } - assertResponseIteratorFinished(t, it) - - if cnt != len(responses) { - t.Errorf("An unexpected count of responses %d != %d", cnt, len(responses)) - } - } -} - -func TestFutureGetIteratorError(t *testing.T) { - const errMsg = "error message" - responses := []*PushResponse{ - {}, - {}, - } - err := errors.New(errMsg) - fut := NewFuture(test_helpers.NewMockRequest()) - for range responses { - fut.AppendPush(Header{}, nil) - } - fut.SetError(err) - - var its = []ResponseIterator{ - fut.GetIterator(), - fut.GetIterator().WithTimeout(5 * time.Second), - } - for _, it := range its { - var cnt = 0 - for it.Next() { - assertResponseIteratorValue(t, it, true, responses[cnt]) - cnt += 1 - } - if err = it.Err(); err != nil { - if err.Error() != errMsg { - t.Errorf("An unexpected error %q, expected %q", err.Error(), errMsg) - } - } else { - t.Errorf("An error expected.") - } - - if cnt != len(responses) { - t.Errorf("An unexpected count of responses %d != %d", cnt, len(responses)) - } - } -} - -func TestFutureSetStateRaceCondition(t *testing.T) { - err := errors.New("any error") - - for i := 0; i < 1000; i++ { - fut := NewFuture(test_helpers.NewMockRequest()) - for j := 0; j < 9; j++ { - go func(opt int) { - if opt%3 == 0 { - fut.AppendPush(Header{}, nil) - } else if opt%3 == 1 { - fut.SetError(err) - } else { - fut.SetResponse(Header{}, nil) - } - }(j) - } - } - // It may be false-positive, but very rarely - it's ok for such very - // simple race conditions tests. -} - -func TestFutureGetIteratorIsPush(t *testing.T) { - fut := NewFuture(test_helpers.NewMockRequest()) - fut.AppendPush(Header{}, nil) - fut.SetResponse(Header{}, nil) - it := fut.GetIterator() - - it.Next() - assert.True(t, it.IsPush()) - it.Next() - assert.False(t, it.IsPush()) -} - func TestFuture_Get(t *testing.T) { fut := NewFuture(&futureMockRequest{}) fut.SetResponse(Header{}, bytes.NewReader([]byte{'v', '2'})) diff --git a/response_it.go b/response_it.go deleted file mode 100644 index f5a4517e0..000000000 --- a/response_it.go +++ /dev/null @@ -1,34 +0,0 @@ -package tarantool - -import ( - "time" -) - -// ResponseIterator is an interface for iteration over a set of responses. -// -// Deprecated: the method will be removed in the next major version, -// use Connector.NewWatcher() instead of box.session.push(). -type ResponseIterator interface { - // Next tries to switch to a next Response and returns true if it exists. - Next() bool - // Value returns a current Response if it exists, nil otherwise. - Value() Response - // IsPush returns true if the current response is a push response. - IsPush() bool - // Err returns error if it happens. - Err() error -} - -// TimeoutResponseIterator is an interface that extends ResponseIterator -// and adds the ability to change a timeout for the Next() call. -// -// Deprecated: the method will be removed in the next major version, -// use Connector.NewWatcher() instead of box.session.push(). -type TimeoutResponseIterator interface { - ResponseIterator - // WithTimeout allows to set up a timeout for the Next() call. - // Note: in the current implementation, there is a timeout for each - // response (the timeout for the request is reset by each push message): - // Connection's Opts.Timeout. You need to increase the value if necessary. - WithTimeout(timeout time.Duration) TimeoutResponseIterator -} diff --git a/tarantool_test.go b/tarantool_test.go index 08cbd4a89..67ba64418 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -873,127 +873,6 @@ func TestClient(t *testing.T) { } } -func TestClientSessionPush(t *testing.T) { - conn := test_helpers.ConnectWithValidation(t, dialer, opts) - defer conn.Close() - - var it ResponseIterator - const pushMax = 3 - // It will be iterated immediately. - fut0 := conn.Call17Async("push_func", []interface{}{pushMax}) - respCnt := 0 - for it = fut0.GetIterator(); it.Next(); { - err := it.Err() - resp := it.Value() - if err != nil { - t.Errorf("Unexpected error after it.Next() == true: %q", err.Error()) - break - } - if resp == nil { - t.Errorf("Response is empty after it.Next() == true") - break - } - respCnt += 1 - } - if err := it.Err(); err != nil { - t.Errorf("An unexpected iteration error: %s", err.Error()) - } - if respCnt > pushMax+1 { - t.Errorf("Unexpected respCnt = %d, expected 0 <= respCnt <= %d", respCnt, pushMax+1) - } - _, _ = fut0.Get() - - // It will wait a response before iteration. - fut1 := conn.Call17Async("push_func", []interface{}{pushMax}) - // Future.Get ignores push messages. - data, err := fut1.Get() - if err != nil { - t.Errorf("Failed to Call17: %s", err) - } else if len(data) < 1 { - t.Errorf("Response.Data is empty after Call17Async") - } else if val, err := test_helpers.ConvertUint64(data[0]); err != nil || val != pushMax { - t.Errorf("Result is not %d: %v", pushMax, data) - } - - // It will will be iterated with a timeout. - fut2 := conn.Call17Async("push_func", []interface{}{pushMax}) - - var its = []ResponseIterator{ - fut1.GetIterator(), - fut2.GetIterator().WithTimeout(5 * time.Second), - } - - for i := 0; i < len(its); i++ { - pushCnt := uint64(0) - respCnt := uint64(0) - - it = its[i] - for it.Next() { - resp := it.Value() - if resp == nil { - t.Errorf("Response is empty after it.Next() == true") - break - } - data, err := resp.Decode() - if err != nil { - t.Errorf("Failed to Decode: %s", err) - break - } - if len(data) < 1 { - t.Errorf("Response.Data is empty after CallAsync") - break - } - if it.IsPush() { - pushCnt += 1 - val, err := test_helpers.ConvertUint64(data[0]) - if err != nil || val != pushCnt { - t.Errorf("Unexpected push data = %v", data) - } - } else { - respCnt += 1 - val, err := test_helpers.ConvertUint64(data[0]) - if err != nil || val != pushMax { - t.Errorf("Result is not %d: %v", pushMax, data) - } - } - } - - if err = it.Err(); err != nil { - t.Errorf("An unexpected iteration error: %s", err.Error()) - } - - if pushCnt != pushMax { - t.Errorf("Expect %d pushes but got %d", pushMax, pushCnt) - } - - if respCnt != 1 { - t.Errorf("Expect %d responses but got %d", 1, respCnt) - } - } - - // We can collect original responses after iterations. - for _, fut := range []*Future{fut0, fut1, fut2} { - data, err := fut.Get() - if err != nil { - t.Errorf("Unable to call fut.Get(): %s", err) - } - val, err := test_helpers.ConvertUint64(data[0]) - if err != nil || val != pushMax { - t.Errorf("Result is not %d: %v", pushMax, data) - } - - tpl := struct { - Val int - }{} - err = fut.GetTyped(&tpl) - if err != nil { - t.Errorf("Unable to call fut.GetTyped(): %s", err) - } else if tpl.Val != pushMax { - t.Errorf("Result is not %d: %d", pushMax, tpl.Val) - } - } -} - const ( createTableQuery = "CREATE TABLE SQL_SPACE (ID STRING PRIMARY KEY, NAME " + "STRING COLLATE \"unicode\" DEFAULT NULL);"