@@ -3,6 +3,7 @@ package rawtopicreader
33import (
44 "errors"
55 "fmt"
6+ "github.com/ydb-platform/ydb-go-sdk/v3/trace"
67 "io"
78 "reflect"
89
@@ -22,16 +23,36 @@ type GrpcStream interface {
2223}
2324
2425type StreamReader struct {
25- Stream GrpcStream
26+ Stream GrpcStream
27+ ReaderID int64
28+
29+ Tracer * trace.Topic
30+ sessionID string
31+ sentMessageCount int
32+ receiveMessageCount int
2633}
2734
2835func (s StreamReader ) CloseSend () error {
2936 return s .Stream .CloseSend ()
3037}
3138
3239//nolint:funlen
33- func (s StreamReader ) Recv () (ServerMessage , error ) {
40+ func (s StreamReader ) Recv () (_ ServerMessage , resErr error ) {
3441 grpcMess , err := s .Stream .Recv ()
42+
43+ defer func () {
44+ s .receiveMessageCount ++
45+
46+ trace .TopicOnReaderReceiveGRPCMessage (
47+ s .Tracer ,
48+ s .ReaderID ,
49+ s .sessionID ,
50+ s .receiveMessageCount ,
51+ grpcMess ,
52+ resErr ,
53+ )
54+ }()
55+
3556 if xerrors .Is (err , io .EOF ) {
3657 return nil , err
3758 }
@@ -53,6 +74,8 @@ func (s StreamReader) Recv() (ServerMessage, error) {
5374
5475 switch m := grpcMess .GetServerMessage ().(type ) {
5576 case * Ydb_Topic.StreamReadMessage_FromServer_InitResponse :
77+ s .sessionID = m .InitResponse .GetSessionId ()
78+
5679 resp := & InitResponse {}
5780 resp .ServerMessageMetadata = meta
5881 resp .fromProto (m .InitResponse )
@@ -122,66 +145,74 @@ func (s StreamReader) Recv() (ServerMessage, error) {
122145 }
123146}
124147
125- func (s StreamReader ) Send (msg ClientMessage ) (err error ) {
148+ func (s StreamReader ) Send (msg ClientMessage ) (resErr error ) {
126149 defer func () {
127- err = xerrors .Transport (err )
150+ resErr = xerrors .Transport (resErr )
128151 }()
152+
153+ var grpcMess * Ydb_Topic.StreamReadMessage_FromClient
129154 switch m := msg .(type ) {
130155 case * InitRequest :
131- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
156+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
132157 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_InitRequest {InitRequest : m .toProto ()},
133158 }
134159
135- return s .Stream .Send (grpcMess )
136160 case * ReadRequest :
137- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
161+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
138162 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_ReadRequest {ReadRequest : m .toProto ()},
139163 }
140-
141- return s .Stream .Send (grpcMess )
142164 case * StartPartitionSessionResponse :
143- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
165+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
144166 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_StartPartitionSessionResponse {
145167 StartPartitionSessionResponse : m .toProto (),
146168 },
147169 }
148-
149- return s .Stream .Send (grpcMess )
150170 case * StopPartitionSessionResponse :
151- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
171+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
152172 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_StopPartitionSessionResponse {
153173 StopPartitionSessionResponse : m .toProto (),
154174 },
155175 }
156176
157- return s .Stream .Send (grpcMess )
158177 case * CommitOffsetRequest :
159- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
178+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
160179 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_CommitOffsetRequest {
161180 CommitOffsetRequest : m .toProto (),
162181 },
163182 }
164183
165- return s .Stream .Send (grpcMess )
166184 case * PartitionSessionStatusRequest :
167- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
185+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
168186 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_PartitionSessionStatusRequest {
169187 PartitionSessionStatusRequest : m .toProto (),
170188 },
171189 }
172190
173- return s .Stream .Send (grpcMess )
174191 case * UpdateTokenRequest :
175- grpcMess : = & Ydb_Topic.StreamReadMessage_FromClient {
192+ grpcMess = & Ydb_Topic.StreamReadMessage_FromClient {
176193 ClientMessage : & Ydb_Topic.StreamReadMessage_FromClient_UpdateTokenRequest {
177194 UpdateTokenRequest : m .ToProto (),
178195 },
179196 }
197+ }
180198
181- return s .Stream .Send (grpcMess )
182- default :
183- return xerrors .WithStackTrace (fmt .Errorf ("ydb: send unexpected message type: %v" , reflect .TypeOf (msg )))
199+ if grpcMess == nil {
200+ resErr = xerrors .WithStackTrace (fmt .Errorf ("ydb: send unexpected message type: %v" , reflect .TypeOf (msg )))
201+ } else {
202+ resErr = s .Stream .Send (grpcMess )
184203 }
204+
205+ s .sentMessageCount ++
206+ trace .TopicOnReaderSentGRPCMessage (
207+ s .Tracer ,
208+ s .ReaderID ,
209+ s .sessionID ,
210+ s .sentMessageCount ,
211+ grpcMess ,
212+ resErr ,
213+ )
214+
215+ return resErr
185216}
186217
187218type ClientMessage interface {
0 commit comments