55 "fmt"
66 "io"
77 "net"
8+ "time"
89
910 "github.com/rs/zerolog"
1011
@@ -73,7 +74,10 @@ func (link *ToxicLink) Start(
7374 dest io.WriteCloser ,
7475) {
7576 logger := link .Logger
76- logger .Debug ().Msg ("Setup connection" )
77+ logger .
78+ Debug ().
79+ Str ("direction" , link .Direction ()).
80+ Msg ("Setup connection" )
7781
7882 labels := []string {
7983 link .Direction (),
@@ -133,23 +137,33 @@ func (link *ToxicLink) read(
133137func (link * ToxicLink ) write (
134138 metricLabels []string ,
135139 name string ,
136- server * ApiServer ,
140+ server * ApiServer , // TODO: Replace with AppConfig for Metrics and Logger
137141 dest io.WriteCloser ,
138142) {
139- logger := link .Logger
143+ logger := link .Logger .
144+ With ().
145+ Str ("component" , "ToxicLink" ).
146+ Str ("method" , "write" ).
147+ Str ("link" , name ).
148+ Str ("proxy" , link .proxy .Name ).
149+ Str ("link_addr" , fmt .Sprintf ("%p" , link )).
150+ Logger ()
151+
140152 bytes , err := io .Copy (dest , link .output )
141153 if err != nil {
142154 logger .Warn ().
143155 Int64 ("bytes" , bytes ).
144156 Err (err ).
145- Msg ("Destination terminated" )
146- }
147- if server .Metrics .proxyMetricsEnabled () {
157+ Msg ("Could not write to destination" )
158+ } else if server .Metrics .proxyMetricsEnabled () {
148159 server .Metrics .ProxyMetrics .SentBytesTotal .
149160 WithLabelValues (metricLabels ... ).Add (float64 (bytes ))
150161 }
162+
151163 dest .Close ()
164+ logger .Trace ().Msgf ("Remove link %s from ToxicCollection" , name )
152165 link .toxics .RemoveLink (name )
166+ logger .Trace ().Msgf ("RemoveConnection %s from Proxy %s" , name , link .proxy .Name )
153167 link .proxy .RemoveConnection (name )
154168}
155169
@@ -211,11 +225,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
211225 }
212226 }
213227
214- log .Trace ().Msg ("Interrupt the previous toxic to update its output" )
228+ log .Trace ().Msg ("Interrupting the previous toxic to update its output" )
215229 stop := make (chan bool )
216- go func () {
217- stop <- link . stubs [ toxic_index - 1 ] .InterruptToxic ()
218- }()
230+ go func (stub * toxics. ToxicStub , stop chan bool ) {
231+ stop <- stub .InterruptToxic ()
232+ }(link . stubs [ toxic_index - 1 ], stop )
219233
220234 // Unblock the previous toxic if it is trying to flush
221235 // If the previous toxic is closed, continue flusing until we reach the end.
@@ -231,9 +245,14 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
231245 if ! stopped {
232246 <- stop
233247 }
234- return
248+ return // TODO: There are some steps after this to clean buffer
249+ }
250+
251+ err := link .stubs [toxic_index ].WriteOutput (tmp , 5 * time .Second )
252+ if err != nil {
253+ log .Err (err ).
254+ Msg ("Could not write last packets after interrupt to Output" )
235255 }
236- link .stubs [toxic_index ].Output <- tmp
237256 }
238257 }
239258
@@ -244,7 +263,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp
244263 link .stubs [toxic_index ].Close ()
245264 return
246265 }
247- link .stubs [toxic_index ].Output <- tmp
266+ err := link .stubs [toxic_index ].WriteOutput (tmp , 5 * time .Second )
267+ if err != nil {
268+ log .Err (err ).
269+ Msg ("Could not write last packets after interrupt to Output" )
270+ }
248271 }
249272
250273 link .stubs [toxic_index - 1 ].Output = link .stubs [toxic_index ].Output
0 commit comments