Skip to content

Commit 1edb132

Browse files
committed
refactor log/topic and upload with off gocognit linter
1 parent 50d2eae commit 1edb132

File tree

2 files changed

+181
-27
lines changed

2 files changed

+181
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Refactored `log/topic` and extract funcs
12
* Refactored `internal/table/scanner/scanner_test.go` and extract funcs
23
* Refactored `log/sql.go` and extract funcs
34
* Refactored `metrics/driver.go` and extract funcs

log/topic.go

Lines changed: 180 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,42 @@ func Topic(l Logger, d trace.Detailer, opts ...Option) (t trace.Topic) {
1212
return internalTopic(wrapLogger(l, opts...), d)
1313
}
1414

15-
func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocyclo
16-
t.OnReaderReconnect = func(
15+
func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
16+
t.OnReaderReconnect = onReaderReconnect(l, d)
17+
t.OnReaderReconnectRequest = onReaderReconnectRequest(l, d)
18+
t.OnReaderPartitionReadStartResponse = onReaderPartitionReadStartResponse(l, d)
19+
t.OnReaderPartitionReadStopResponse = onReaderPartitionReadStopResponse(l, d)
20+
t.OnReaderCommit = onReaderCommit(l, d)
21+
t.OnReaderSendCommitMessage = onReaderSendCommitMessage(l, d)
22+
t.OnReaderCommittedNotify = onReaderCommittedNotify(l, d)
23+
t.OnReaderClose = onReaderClose(l, d)
24+
25+
t.OnReaderInit = onReaderInit(l, d)
26+
t.OnReaderError = onReaderError(l, d)
27+
t.OnReaderUpdateToken = onReaderUpdateToken(l, d)
28+
t.OnReaderSentDataRequest = onReaderSentDataRequest(l, d)
29+
t.OnReaderReceiveDataResponse = onReaderReceiveDataResponse(l, d)
30+
t.OnReaderReadMessages = onReaderReadMessages(l, d)
31+
t.OnReaderUnknownGrpcMessage = onReaderUnknownGrpcMessage(l, d)
32+
33+
///
34+
/// Topic writer
35+
///
36+
t.OnWriterReconnect = onWriterReconnect(l, d)
37+
t.OnWriterInitStream = onWriterInitStream(l, d)
38+
t.OnWriterClose = onWriterClose(l, d)
39+
t.OnWriterCompressMessages = onWriterCompressMessages(l, d)
40+
t.OnWriterSendMessages = onWriterSendMessages(l, d)
41+
t.OnWriterReadUnknownGrpcMessage = onWriterReadUnknownGrpcMessage(l, d)
42+
43+
return t
44+
}
45+
46+
func onReaderReconnect(
47+
l Logger,
48+
d trace.Detailer,
49+
) func(info trace.TopicReaderReconnectStartInfo) func(doneInfo trace.TopicReaderReconnectDoneInfo) {
50+
return func(
1751
info trace.TopicReaderReconnectStartInfo,
1852
) func(doneInfo trace.TopicReaderReconnectDoneInfo) {
1953
if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 {
@@ -30,7 +64,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
3064
)
3165
}
3266
}
33-
t.OnReaderReconnectRequest = func(info trace.TopicReaderReconnectRequestInfo) {
67+
}
68+
69+
func onReaderReconnectRequest(
70+
l Logger,
71+
d trace.Detailer,
72+
) func(info trace.TopicReaderReconnectRequestInfo) {
73+
return func(info trace.TopicReaderReconnectRequestInfo) {
3474
if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 {
3575
return
3676
}
@@ -40,7 +80,15 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
4080
Bool("was_sent", info.WasSent),
4181
)
4282
}
43-
t.OnReaderPartitionReadStartResponse = func(
83+
}
84+
85+
func onReaderPartitionReadStartResponse(
86+
l Logger,
87+
d trace.Detailer,
88+
) func(
89+
info trace.TopicReaderPartitionReadStartResponseStartInfo) func(
90+
stopInfo trace.TopicReaderPartitionReadStartResponseDoneInfo) {
91+
return func(
4492
info trace.TopicReaderPartitionReadStartResponseStartInfo,
4593
) func(stopInfo trace.TopicReaderPartitionReadStartResponseDoneInfo) {
4694
if d.Details()&trace.TopicReaderPartitionEvents == 0 {
@@ -85,7 +133,15 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
85133
}
86134
}
87135
}
88-
t.OnReaderPartitionReadStopResponse = func(
136+
}
137+
138+
func onReaderPartitionReadStopResponse(
139+
l Logger,
140+
d trace.Detailer,
141+
) func(
142+
info trace.TopicReaderPartitionReadStopResponseStartInfo) func(
143+
trace.TopicReaderPartitionReadStopResponseDoneInfo) {
144+
return func(
89145
info trace.TopicReaderPartitionReadStopResponseStartInfo,
90146
) func(trace.TopicReaderPartitionReadStopResponseDoneInfo) {
91147
if d.Details()&trace.TopicReaderPartitionEvents == 0 {
@@ -123,7 +179,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
123179
}
124180
}
125181
}
126-
t.OnReaderCommit = func(info trace.TopicReaderCommitStartInfo) func(doneInfo trace.TopicReaderCommitDoneInfo) {
182+
}
183+
184+
func onReaderCommit(
185+
l Logger,
186+
d trace.Detailer,
187+
) func(info trace.TopicReaderCommitStartInfo) func(doneInfo trace.TopicReaderCommitDoneInfo) {
188+
return func(info trace.TopicReaderCommitStartInfo) func(doneInfo trace.TopicReaderCommitDoneInfo) {
127189
if d.Details()&trace.TopicReaderStreamEvents == 0 {
128190
return nil
129191
}
@@ -158,7 +220,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
158220
}
159221
}
160222
}
161-
t.OnReaderSendCommitMessage = func(
223+
}
224+
225+
func onReaderSendCommitMessage(
226+
l Logger,
227+
d trace.Detailer,
228+
) func(info trace.TopicReaderSendCommitMessageStartInfo) func(trace.TopicReaderSendCommitMessageDoneInfo) {
229+
return func(
162230
info trace.TopicReaderSendCommitMessageStartInfo,
163231
) func(trace.TopicReaderSendCommitMessageDoneInfo) {
164232
if d.Details()&trace.TopicReaderStreamEvents == 0 {
@@ -201,7 +269,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
201269
}
202270
}
203271
}
204-
t.OnReaderCommittedNotify = func(info trace.TopicReaderCommittedNotifyInfo) {
272+
}
273+
274+
func onReaderCommittedNotify(
275+
l Logger,
276+
d trace.Detailer,
277+
) func(info trace.TopicReaderCommittedNotifyInfo) {
278+
return func(info trace.TopicReaderCommittedNotifyInfo) {
205279
if d.Details()&trace.TopicReaderStreamEvents == 0 {
206280
return
207281
}
@@ -214,7 +288,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
214288
Int64("committed_offset", info.CommittedOffset),
215289
)
216290
}
217-
t.OnReaderClose = func(info trace.TopicReaderCloseStartInfo) func(doneInfo trace.TopicReaderCloseDoneInfo) {
291+
}
292+
293+
func onReaderClose(
294+
l Logger,
295+
d trace.Detailer,
296+
) func(info trace.TopicReaderCloseStartInfo) func(doneInfo trace.TopicReaderCloseDoneInfo) {
297+
return func(info trace.TopicReaderCloseStartInfo) func(doneInfo trace.TopicReaderCloseDoneInfo) {
218298
if d.Details()&trace.TopicReaderStreamEvents == 0 {
219299
return nil
220300
}
@@ -242,8 +322,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
242322
}
243323
}
244324
}
325+
}
245326

246-
t.OnReaderInit = func(info trace.TopicReaderInitStartInfo) func(doneInfo trace.TopicReaderInitDoneInfo) {
327+
func onReaderInit(
328+
l Logger,
329+
d trace.Detailer,
330+
) func(info trace.TopicReaderInitStartInfo) func(doneInfo trace.TopicReaderInitDoneInfo) {
331+
return func(info trace.TopicReaderInitStartInfo) func(doneInfo trace.TopicReaderInitDoneInfo) {
247332
if d.Details()&trace.TopicReaderStreamEvents == 0 {
248333
return nil
249334
}
@@ -274,7 +359,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
274359
}
275360
}
276361
}
277-
t.OnReaderError = func(info trace.TopicReaderErrorInfo) {
362+
}
363+
364+
func onReaderError(
365+
l Logger,
366+
d trace.Detailer,
367+
) func(info trace.TopicReaderErrorInfo) {
368+
return func(info trace.TopicReaderErrorInfo) {
278369
if d.Details()&trace.TopicReaderStreamEvents == 0 {
279370
return
280371
}
@@ -285,7 +376,15 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
285376
versionField(),
286377
)
287378
}
288-
t.OnReaderUpdateToken = func(
379+
}
380+
381+
func onReaderUpdateToken(
382+
l Logger,
383+
d trace.Detailer,
384+
) func(info trace.OnReadUpdateTokenStartInfo) func(
385+
updateTokenInfo trace.OnReadUpdateTokenMiddleTokenReceivedInfo) func(
386+
doneInfo trace.OnReadStreamUpdateTokenDoneInfo) {
387+
return func(
289388
info trace.OnReadUpdateTokenStartInfo,
290389
) func(
291390
updateTokenInfo trace.OnReadUpdateTokenMiddleTokenReceivedInfo,
@@ -337,7 +436,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
337436
}
338437
}
339438
}
340-
t.OnReaderSentDataRequest = func(info trace.TopicReaderSentDataRequestInfo) {
439+
}
440+
441+
func onReaderSentDataRequest(
442+
l Logger,
443+
d trace.Detailer,
444+
) func(info trace.TopicReaderSentDataRequestInfo) {
445+
return func(info trace.TopicReaderSentDataRequestInfo) {
341446
if d.Details()&trace.TopicReaderMessageEvents == 0 {
342447
return
343448
}
@@ -348,7 +453,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
348453
Int("local_capacity", info.LocalBufferSizeAfterSent),
349454
)
350455
}
351-
t.OnReaderReceiveDataResponse = func(
456+
}
457+
458+
func onReaderReceiveDataResponse(
459+
l Logger,
460+
d trace.Detailer,
461+
) func(info trace.TopicReaderReceiveDataResponseStartInfo) func(trace.TopicReaderReceiveDataResponseDoneInfo) {
462+
return func(
352463
info trace.TopicReaderReceiveDataResponseStartInfo,
353464
) func(trace.TopicReaderReceiveDataResponseDoneInfo) {
354465
if d.Details()&trace.TopicReaderMessageEvents == 0 {
@@ -392,7 +503,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
392503
}
393504
}
394505
}
395-
t.OnReaderReadMessages = func(
506+
}
507+
508+
func onReaderReadMessages(
509+
l Logger,
510+
d trace.Detailer,
511+
) func(info trace.TopicReaderReadMessagesStartInfo) func(doneInfo trace.TopicReaderReadMessagesDoneInfo) {
512+
return func(
396513
info trace.TopicReaderReadMessagesStartInfo,
397514
) func(doneInfo trace.TopicReaderReadMessagesDoneInfo) {
398515
if d.Details()&trace.TopicReaderMessageEvents == 0 {
@@ -426,7 +543,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
426543
}
427544
}
428545
}
429-
t.OnReaderUnknownGrpcMessage = func(info trace.OnReadUnknownGrpcMessageInfo) {
546+
}
547+
548+
func onReaderUnknownGrpcMessage(
549+
l Logger,
550+
d trace.Detailer,
551+
) func(info trace.OnReadUnknownGrpcMessageInfo) {
552+
return func(info trace.OnReadUnknownGrpcMessageInfo) {
430553
if d.Details()&trace.TopicReaderMessageEvents == 0 {
431554
return
432555
}
@@ -436,11 +559,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
436559
String("reader_connection_id", info.ReaderConnectionID),
437560
)
438561
}
562+
}
439563

440-
///
441-
/// Topic writer
442-
///
443-
t.OnWriterReconnect = func(
564+
func onWriterReconnect(
565+
l Logger,
566+
d trace.Detailer,
567+
) func(info trace.TopicWriterReconnectStartInfo) func(doneInfo trace.TopicWriterReconnectDoneInfo) {
568+
return func(
444569
info trace.TopicWriterReconnectStartInfo,
445570
) func(doneInfo trace.TopicWriterReconnectDoneInfo) {
446571
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
@@ -476,7 +601,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
476601
}
477602
}
478603
}
479-
t.OnWriterInitStream = func(
604+
}
605+
606+
func onWriterInitStream(
607+
l Logger,
608+
d trace.Detailer,
609+
) func(info trace.TopicWriterInitStreamStartInfo) func(doneInfo trace.TopicWriterInitStreamDoneInfo) {
610+
return func(
480611
info trace.TopicWriterInitStreamStartInfo,
481612
) func(doneInfo trace.TopicWriterInitStreamDoneInfo) {
482613
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
@@ -512,7 +643,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
512643
}
513644
}
514645
}
515-
t.OnWriterClose = func(info trace.TopicWriterCloseStartInfo) func(doneInfo trace.TopicWriterCloseDoneInfo) {
646+
}
647+
648+
func onWriterClose(
649+
l Logger,
650+
d trace.Detailer,
651+
) func(info trace.TopicWriterCloseStartInfo) func(doneInfo trace.TopicWriterCloseDoneInfo) {
652+
return func(info trace.TopicWriterCloseStartInfo) func(doneInfo trace.TopicWriterCloseDoneInfo) {
516653
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
517654
return nil
518655
}
@@ -541,7 +678,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
541678
}
542679
}
543680
}
544-
t.OnWriterCompressMessages = func(
681+
}
682+
683+
func onWriterCompressMessages(
684+
l Logger,
685+
d trace.Detailer,
686+
) func(info trace.TopicWriterCompressMessagesStartInfo) func(doneInfo trace.TopicWriterCompressMessagesDoneInfo) {
687+
return func(
545688
info trace.TopicWriterCompressMessagesStartInfo,
546689
) func(doneInfo trace.TopicWriterCompressMessagesDoneInfo) {
547690
if d.Details()&trace.TopicWriterStreamEvents == 0 {
@@ -584,7 +727,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
584727
}
585728
}
586729
}
587-
t.OnWriterSendMessages = func(
730+
}
731+
732+
func onWriterSendMessages(
733+
l Logger,
734+
d trace.Detailer,
735+
) func(info trace.TopicWriterSendMessagesStartInfo) func(doneInfo trace.TopicWriterSendMessagesDoneInfo) {
736+
return func(
588737
info trace.TopicWriterSendMessagesStartInfo,
589738
) func(doneInfo trace.TopicWriterSendMessagesDoneInfo) {
590739
if d.Details()&trace.TopicWriterStreamEvents == 0 {
@@ -623,7 +772,13 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
623772
}
624773
}
625774
}
626-
t.OnWriterReadUnknownGrpcMessage = func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) {
775+
}
776+
777+
func onWriterReadUnknownGrpcMessage(
778+
l Logger,
779+
d trace.Detailer,
780+
) func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) {
781+
return func(info trace.TopicOnWriterReadUnknownGrpcMessageInfo) {
627782
if d.Details()&trace.TopicWriterStreamEvents == 0 {
628783
return
629784
}
@@ -634,6 +789,4 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { //nolint:gocycl
634789
String("session_id", info.SessionID),
635790
)
636791
}
637-
638-
return t
639792
}

0 commit comments

Comments
 (0)