Skip to content

Commit 3540852

Browse files
committed
api: updated response/future methods
Added method Release to Future and Response's interface, that allows to free used data directly by calling. Closes #493
1 parent 6a24a64 commit 3540852

File tree

11 files changed

+121
-24
lines changed

11 files changed

+121
-24
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
* Added missing IPROTO feature flags to greeting negotiation
1616
(iproto.IPROTO_FEATURE_IS_SYNC, iproto.IPROTO_FEATURE_INSERT_ARROW) (#466).
1717
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).
18+
* Method `Release` for `Future` and `Response` interface that allows
19+
to free used data directly by calling (#493).
1820

1921
### Changed
2022

connection.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ var (
3939
"to the current connection or connection pool")
4040
)
4141

42+
var smallBufPool = &sync.Pool{
43+
New: func() interface{} {
44+
return &smallBuf{}
45+
},
46+
}
47+
48+
var slicePool = &sync.Pool{
49+
New: func() interface{} {
50+
return make([]byte, 1024)
51+
},
52+
}
53+
4254
const (
4355
// Connected signals that connection is established or reestablished.
4456
Connected ConnEventKind = iota + 1
@@ -373,7 +385,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
373385
}
374386

375387
conn.cond = sync.NewCond(&conn.mutex)
376-
377388
if conn.opts.Reconnect > 0 {
378389
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379390
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -860,8 +871,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
860871
conn.reconnect(err, c)
861872
return
862873
}
863-
buf := smallBuf{b: respBytes}
864-
header, code, err := decodeHeader(conn.dec, &buf)
874+
buf := smallBufPool.Get().(*smallBuf)
875+
buf.b = respBytes
876+
header, code, err := decodeHeader(conn.dec, buf)
865877
if err != nil {
866878
err = ClientError{
867879
ErrProtocolError,
@@ -873,7 +885,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
873885

874886
var fut *Future = nil
875887
if code == iproto.IPROTO_EVENT {
876-
if event, err := readWatchEvent(&buf); err == nil {
888+
if event, err := readWatchEvent(buf); err == nil {
877889
events <- event
878890
} else {
879891
err = ClientError{
@@ -887,7 +899,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
887899
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
888900
} else {
889901
if fut = conn.fetchFuture(header.RequestId); fut != nil {
890-
if err := fut.SetResponse(header, &buf); err != nil {
902+
if err := fut.SetResponse(header, buf); err != nil {
891903
fut.SetError(fmt.Errorf("failed to set response: %w", err))
892904
}
893905
conn.markDone(fut)
@@ -1190,6 +1202,8 @@ func (conn *Connection) timeouts() {
11901202
}
11911203
}
11921204

1205+
// read uses args to allocate slices for responses using sync.Pool.
1206+
// data could be released later using Release.
11931207
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
11941208
var length uint64
11951209

@@ -1214,7 +1228,14 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
12141228
return
12151229
}
12161230

1217-
response = make([]byte, length)
1231+
ptr := slicePool.Get().([]byte)
1232+
if cap(ptr) < int(length) {
1233+
response = make([]byte, length)
1234+
slicePool.Put(ptr) // nolint
1235+
} else {
1236+
response = ptr
1237+
response = response[:length]
1238+
}
12181239
_, err = io.ReadFull(r, response)
12191240

12201241
return

dial.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ func identify(ctx context.Context, conn Conn) (ProtocolInfo, error) {
481481
}
482482
return info, err
483483
}
484+
defer resp.Release()
484485
data, err := resp.Decode()
485486
if err != nil {
486487
return info, err
@@ -536,6 +537,7 @@ func checkProtocolInfo(required ProtocolInfo, actual ProtocolInfo) error {
536537
func authenticate(ctx context.Context, c Conn, auth Auth, user, pass, salt string) error {
537538
var req Request
538539
var err error
540+
var resp Response
539541

540542
switch auth {
541543
case ChapSha1Auth:
@@ -552,9 +554,10 @@ func authenticate(ctx context.Context, c Conn, auth Auth, user, pass, salt strin
552554
if err = writeRequest(ctx, c, req); err != nil {
553555
return err
554556
}
555-
if _, err = readResponse(ctx, c, req); err != nil {
557+
if resp, err = readResponse(ctx, c, req); err != nil {
556558
return err
557559
}
560+
resp.Release()
558561
return nil
559562
}
560563

@@ -620,17 +623,18 @@ func readResponse(ctx context.Context, conn Conn, req Request) (Response, error)
620623
return nil, fmt.Errorf("read error: %w", err)
621624
}
622625

623-
buf := smallBuf{b: respBytes}
626+
buf := smallBufPool.Get().(*smallBuf)
627+
buf.b = respBytes
624628

625-
d := getDecoder(&buf)
629+
d := getDecoder(buf)
626630
defer putDecoder(d)
627631

628-
header, _, err := decodeHeader(d, &buf)
632+
header, _, err := decodeHeader(d, buf)
629633
if err != nil {
630634
return nil, fmt.Errorf("decode response header error: %w", err)
631635
}
632636

633-
resp, err := req.Response(header, &buf)
637+
resp, err := req.Response(header, buf)
634638
if err != nil {
635639
return nil, fmt.Errorf("creating response error: %w", err)
636640
}

example_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ func ExampleConnection_Do_failure() {
13131313

13141314
// We got a future, the request actually not performed yet.
13151315
future := conn.Do(req)
1316+
defer future.Release()
13161317

13171318
// When the future receives the response, the result of the Future is set
13181319
// and becomes available. We could wait for that moment with Future.Get(),

future.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,11 @@ func (fut *Future) WaitChan() <-chan struct{} {
155155

156156
return fut.done
157157
}
158+
159+
// Release is freeing the Future resources.
160+
// After this, using this Future becomes invalid.
161+
func (fut *Future) Release() {
162+
if fut.resp != nil {
163+
fut.resp.Release()
164+
}
165+
}

future_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (resp *futureMockResponse) Header() Header {
5353
return resp.header
5454
}
5555

56+
func (resp *futureMockResponse) Release() {
57+
// Releasing futureMockResponse data.
58+
}
59+
5660
func (resp *futureMockResponse) Decode() ([]interface{}, error) {
5761
resp.decodeCnt++
5862

prepared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
9090
if err != nil {
9191
return nil, err
9292
}
93-
return &PrepareResponse{baseResponse: baseResp}, nil
93+
return &PrepareResponse{baseResponse: *baseResp}, nil
9494
}
9595

9696
// UnprepareRequest helps you to create an unprepare request object for
@@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
204204
if err != nil {
205205
return nil, err
206206
}
207-
return &ExecuteResponse{baseResponse: baseResp}, nil
207+
return &ExecuteResponse{baseResponse: *baseResp}, nil
208208
}

request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,11 +620,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {
620620

621621
// Response creates a response for the SelectRequest.
622622
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
623-
baseResp, err := createBaseResponse(header, body)
623+
SelectResp, err := createSelectResponse(header, body)
624624
if err != nil {
625625
return nil, err
626626
}
627-
return &SelectResponse{baseResponse: baseResp}, nil
627+
return SelectResp, nil
628628
}
629629

630630
// InsertRequest helps you to create an insert request object for execution
@@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
11541154
if err != nil {
11551155
return nil, err
11561156
}
1157-
return &ExecuteResponse{baseResponse: baseResp}, nil
1157+
return &ExecuteResponse{baseResponse: *baseResp}, nil
11581158
}
11591159

11601160
// WatchOnceRequest synchronously fetches the value currently associated with a

response.go

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"fmt"
55
"io"
6+
"sync"
67

78
"github.com/tarantool/go-iproto"
89
"github.com/vmihailenco/msgpack/v5"
@@ -12,6 +13,8 @@ import (
1213
type Response interface {
1314
// Header returns a response header.
1415
Header() Header
16+
// Release free responses data.
17+
Release()
1518
// Decode decodes a response.
1619
Decode() ([]interface{}, error)
1720
// DecodeTyped decodes a response into a given container res.
@@ -31,24 +34,38 @@ type baseResponse struct {
3134
err error
3235
}
3336

34-
func createBaseResponse(header Header, body io.Reader) (baseResponse, error) {
37+
func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) {
38+
resp := &baseResponse{}
3539
if body == nil {
36-
return baseResponse{header: header}, nil
40+
resp.header = header
41+
return resp, nil
3742
}
3843
if buf, ok := body.(*smallBuf); ok {
39-
return baseResponse{header: header, buf: *buf}, nil
44+
resp.header = header
45+
resp.buf.b = buf.b
46+
resp.buf.p = buf.p
47+
return resp, nil
4048
}
4149
data, err := io.ReadAll(body)
4250
if err != nil {
43-
return baseResponse{}, err
51+
return resp, err
4452
}
45-
return baseResponse{header: header, buf: smallBuf{b: data}}, nil
53+
resp.header = header
54+
resp.buf.b = data
55+
return resp, nil
56+
}
57+
58+
func (resp *baseResponse) Release() {
59+
slicePool.Put(resp.buf.b) // nolint
60+
resp.buf.b = nil
61+
resp.buf.p = 0
62+
smallBufPool.Put(&resp.buf)
4663
}
4764

4865
// DecodeBaseResponse parse response header and body.
4966
func DecodeBaseResponse(header Header, body io.Reader) (Response, error) {
5067
resp, err := createBaseResponse(header, body)
51-
return &resp, err
68+
return resp, err
5269
}
5370

5471
// SelectResponse is used for the select requests.
@@ -670,6 +687,41 @@ func (resp *baseResponse) Header() Header {
670687
return resp.header
671688
}
672689

690+
var selectResponsePool *sync.Pool = &sync.Pool{
691+
New: func() interface{} {
692+
return &SelectResponse{}
693+
},
694+
}
695+
696+
func createSelectResponse(header Header, body io.Reader) (*SelectResponse, error) {
697+
resp := selectResponsePool.Get().(*SelectResponse)
698+
if body == nil {
699+
resp.header = header
700+
return resp, nil
701+
}
702+
if buf, ok := body.(*smallBuf); ok {
703+
resp.header = header
704+
resp.buf.b = buf.b
705+
resp.buf.p = buf.p
706+
return resp, nil
707+
}
708+
data, err := io.ReadAll(body)
709+
if err != nil {
710+
return resp, err
711+
}
712+
resp.header = header
713+
resp.buf.b = data
714+
return resp, nil
715+
}
716+
717+
func (resp *SelectResponse) Release() {
718+
resp.baseResponse.Release()
719+
resp.baseResponse = baseResponse{}
720+
resp.pos = nil
721+
722+
selectResponsePool.Put(resp)
723+
}
724+
673725
// Pos returns a position descriptor of the last selected tuple for the SelectResponse.
674726
// If the response was not decoded, this method will call Decode().
675727
func (resp *SelectResponse) Pos() ([]byte, error) {

tarantool_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,14 +220,15 @@ func BenchmarkSync_naive_with_custom_type(b *testing.B) {
220220
b.ResetTimer()
221221

222222
for b.Loop() {
223-
err := conn.Do(req).GetTyped(&tuple)
224-
if err != nil {
223+
fut := conn.Do(req)
224+
if err := fut.GetTyped(&tuple); err != nil {
225225
b.Errorf("request error: %s", err)
226226
}
227227

228228
if tuple.id != 1111 {
229229
b.Errorf("invalid result")
230230
}
231+
fut.Release()
231232
}
232233
}
233234

0 commit comments

Comments
 (0)