66 * LICENSE file in the root directory of this source tree.
77 */
88
9- use std:: mem:: swap;
109use std:: mem:: take;
1110use std:: ops:: DerefMut ;
1211use std:: sync:: Arc ;
@@ -19,6 +18,9 @@ use tokio::io::AsyncWrite;
1918use tokio:: io:: AsyncWriteExt ;
2019use tokio:: io:: BufReader ;
2120
21+ /// Maximum byte size of a single log line before truncation
22+ const MAX_BYTE_SIZE_LOG_LINE : usize = 256 * 1024 ;
23+
2224/// A tailer (ring buffer) of (text) log lines.
2325pub struct LogTailer {
2426 state : Arc < Mutex < State > > ,
@@ -32,6 +34,25 @@ struct State {
3234}
3335
3436impl LogTailer {
37+ /// Helper method to push a line to the ring buffer
38+ fn push_line_to_buffer ( state : & Arc < Mutex < State > > , byte_buffer : & mut [ u8 ] , max : usize ) {
39+ // use lossy string rather than truncated valid utf8
40+ // from_utf8_lossy(b"Hello\xFF\xFEWorld") returns "Hello��World"
41+ let mut buffer: String = String :: from_utf8_lossy ( byte_buffer) . to_string ( ) ;
42+ // Remove trailing newline if present
43+ while buffer. ends_with ( '\n' ) {
44+ buffer. pop ( ) ;
45+ }
46+ let mut locked = state. lock ( ) . unwrap ( ) ;
47+ let next = locked. next ;
48+ if next < locked. lines . len ( ) {
49+ locked. lines [ next] = buffer;
50+ } else {
51+ locked. lines . push ( buffer. clone ( ) ) ;
52+ }
53+ locked. next = ( next + 1 ) % max;
54+ }
55+
3556 /// Create a new tailer given a `stream`. The tailer tails the reader in the
3657 /// background, while keeping at most `max` log lines in its buffer. The tailer
3758 /// stops when the stream is ended (i.e., returns an EOF).
@@ -58,25 +79,66 @@ impl LogTailer {
5879 // and make this awaitable, etc
5980 let handle = tokio:: spawn ( async move {
6081 let mut reader = BufReader :: new ( stream) ;
61- let mut buffer = String :: new ( ) ;
82+ let mut skip_until_newline = false ;
83+ let mut byte_buffer: Vec < u8 > = Vec :: new ( ) ;
6284 loop {
63- buffer. clear ( ) ; // clear retains the buffer
64- // TODO: we should probably limit line length
65- if reader. read_line ( & mut buffer) . await ? == 0 {
85+ // this gives at most a reference to 8KB of data in the internal buffer
86+ // based on internal implementation of BufReader's `DEFAULT_BUF_SIZE`
87+ let reader_buf = reader. fill_buf ( ) . await ?;
88+
89+ if reader_buf. is_empty ( ) {
90+ // EOF reached, write any remaining buffer content as a line
91+ if !byte_buffer. is_empty ( ) {
92+ Self :: push_line_to_buffer ( & state, & mut byte_buffer, max) ;
93+ }
6694 break Ok ( ( ) ) ;
6795 }
68- let _ = tee. write_all ( buffer. as_bytes ( ) ) . await ;
69- while buffer. ends_with ( '\n' ) {
70- buffer. pop ( ) ;
96+
97+ // find newline pos or the end of buffer if no newline found
98+ let new_line_pos = reader_buf
99+ . iter ( )
100+ . position ( |& b| b == b'\n' )
101+ . unwrap_or ( reader_buf. len ( ) ) ;
102+
103+ if skip_until_newline {
104+ // funnel through the tee stream
105+ let mut to_consume = reader_buf. len ( ) ;
106+ if new_line_pos != reader_buf. len ( ) {
107+ to_consume = new_line_pos + 1 ;
108+ skip_until_newline = false ;
109+ }
110+ tee. write_all ( & reader_buf[ ..to_consume] ) . await ?;
111+ reader. consume ( to_consume) ;
112+ continue ;
71113 }
72- let mut locked = state. lock ( ) . unwrap ( ) ;
73- let next = locked. next ;
74- if next < locked. lines . len ( ) {
75- swap ( & mut locked. lines [ next] , & mut buffer) ;
114+
115+ let to_be_consumed = if new_line_pos != reader_buf. len ( ) {
116+ new_line_pos + 1
76117 } else {
77- locked. lines . push ( buffer. clone ( ) ) ;
118+ reader_buf. len ( )
119+ } ;
120+
121+ byte_buffer. extend ( & reader_buf[ ..to_be_consumed] ) ;
122+ tee. write_all ( & reader_buf[ ..to_be_consumed] ) . await ?;
123+ if byte_buffer. len ( ) >= MAX_BYTE_SIZE_LOG_LINE || new_line_pos != reader_buf. len ( ) {
124+ skip_until_newline = byte_buffer. len ( ) >= MAX_BYTE_SIZE_LOG_LINE
125+ && new_line_pos == reader_buf. len ( ) ;
126+ // Truncate to MAX_BYTE_SIZE_LOG_LINE if necessary before pushing
127+ if byte_buffer. len ( ) > MAX_BYTE_SIZE_LOG_LINE {
128+ byte_buffer. truncate ( MAX_BYTE_SIZE_LOG_LINE ) ;
129+ }
130+
131+ // we are pushing a line that doesnt have a newline
132+ if byte_buffer. len ( ) == MAX_BYTE_SIZE_LOG_LINE
133+ && new_line_pos == reader_buf. len ( )
134+ {
135+ byte_buffer. extend_from_slice ( "<TRUNCATED>" . as_bytes ( ) ) ;
136+ }
137+ Self :: push_line_to_buffer ( & state, & mut byte_buffer, max) ;
138+ byte_buffer. clear ( ) ;
78139 }
79- locked. next = ( next + 1 ) % max;
140+
141+ reader. consume ( to_be_consumed) ;
80142 }
81143 } ) ;
82144
@@ -92,7 +154,6 @@ impl LogTailer {
92154 lines. rotate_left ( next) ;
93155 lines
94156 }
95-
96157 /// Abort the tailer. This will stop any ongoing reads, and drop the
97158 /// stream. Abort is complete after `join` returns.
98159 pub fn abort ( & self ) {
@@ -143,6 +204,83 @@ mod tests {
143204 assert_eq ! ( lines. next_line( ) . await . unwrap( ) . unwrap( ) , "world" ) ;
144205 }
145206
207+ #[ tokio:: test]
208+ async fn test_read_buffer_boundary ( ) {
209+ let mut input_bytes = Vec :: new ( ) ;
210+ // reader buffer's default size is 8KB. We assert that the tee function reads
211+ // correctly when the lines are exactly 8KB and 8KB + 1 bytes
212+ input_bytes. extend ( vec ! [ b'a' ; 8191 ] ) ;
213+ input_bytes. extend ( [ b'\n' ] ) ;
214+ input_bytes. extend ( vec ! [ b'b' ; 8192 ] ) ;
215+ let reader = Cursor :: new ( input_bytes) ;
216+
217+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
218+ assert ! ( result. is_ok( ) ) ;
219+
220+ // Should have 3 lines
221+ assert_eq ! ( lines. len( ) , 2 ) ;
222+
223+ assert_eq ! ( lines[ 0 ] , format!( "{}" , "a" . repeat( 8191 ) ) ) ;
224+
225+ assert_eq ! ( lines[ 1 ] , format!( "{}" , "b" . repeat( 8192 ) ) ) ;
226+ }
227+
228+ #[ tokio:: test]
229+ async fn test_line_truncation ( ) {
230+ // Create input with 3 MAX_BYTE_SIZE_LOG_LINE-byte lines
231+ let mut input_bytes = Vec :: new ( ) ;
232+ // first line is exactly `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
233+ input_bytes. extend ( vec ! [ b'a' ; MAX_BYTE_SIZE_LOG_LINE - 1 ] ) ;
234+ input_bytes. extend ( [ b'\n' ] ) ;
235+
236+ // second line is more than `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
237+ input_bytes. extend ( vec ! [ b'b' ; MAX_BYTE_SIZE_LOG_LINE ] ) ;
238+ input_bytes. extend ( [ b'\n' ] ) ;
239+
240+ // last line of the input stream is < `MAX_BYTE_SIZE_LOG_LINE` bytes to ensure complete flush
241+ input_bytes. extend ( vec ! [ b'c' ; MAX_BYTE_SIZE_LOG_LINE - 1 ] ) ;
242+
243+ let reader = Cursor :: new ( input_bytes) ;
244+
245+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
246+ assert ! ( result. is_ok( ) ) ;
247+
248+ // Should have 3 lines
249+ assert_eq ! ( lines. len( ) , 3 ) ;
250+
251+ // First line should be MAX_BYTE_SIZE_LOG_LINE-1 'a's
252+ assert_eq ! (
253+ lines[ 0 ] ,
254+ format!( "{}" , "a" . repeat( MAX_BYTE_SIZE_LOG_LINE - 1 ) )
255+ ) ;
256+
257+ // Second line should be `MAX_BYTE_SIZE_LOG_LINE` 'b's + "<TRUNCATED>"
258+ assert_eq ! (
259+ lines[ 1 ] ,
260+ format!( "{}<TRUNCATED>" , "b" . repeat( MAX_BYTE_SIZE_LOG_LINE ) )
261+ ) ;
262+
263+ // last line before stream closes should be MAX_BYTE_SIZE_LOG_LINE-1 c's
264+ assert_eq ! ( lines[ 2 ] , "c" . repeat( MAX_BYTE_SIZE_LOG_LINE - 1 ) ) ;
265+ }
266+
267+ #[ tokio:: test]
268+ async fn test_ring_buffer_behavior ( ) {
269+ let input = "line1\n line2\n line3\n line4\n line5\n line6\n line7\n " ;
270+ let reader = Cursor :: new ( input. as_bytes ( ) ) ;
271+ let max_lines = 3 ; // Small ring buffer for easy testing
272+
273+ let ( lines, result) = LogTailer :: new ( max_lines, reader) . join ( ) . await ;
274+ assert ! ( result. is_ok( ) ) ;
275+
276+ // Should only have the last 3 lines (ring buffer behavior)
277+ // Lines 1-4 should be overwritten (lost due to ring buffer)
278+ assert_eq ! ( lines. len( ) , 3 ) ;
279+ assert_eq ! ( lines[ 0 ] , "line5" ) ; // oldest in current buffer
280+ assert_eq ! ( lines[ 1 ] , "line6" ) ; // middle
281+ assert_eq ! ( lines[ 2 ] , "line7" ) ; // newest
282+ }
283+
146284 #[ tokio:: test]
147285 async fn test_streaming_logtailer ( ) {
148286 let ( reader, mut writer) = tokio:: io:: simplex ( 1 ) ;
@@ -184,4 +322,56 @@ mod tests {
184322 tailer. abort ( ) ;
185323 tailer. join ( ) . await . 1 . unwrap_err ( ) ;
186324 }
325+
326+ #[ tokio:: test]
327+ async fn test_multibyte_character_on_internal_buffer_boundary ( ) {
328+ // Test: Multi-byte characters split across internal buffer boundaries
329+ let mut input_bytes = Vec :: new ( ) ;
330+ input_bytes. extend ( vec ! [ b'a' ; 8191 ] ) ;
331+ let euro_bytes = "€" . as_bytes ( ) ; // [0xE2, 0x82, 0xAC]
332+ // add 3 bytes of the euro sign, but across internal buffer
333+ // 1st byte will be part of the first buffer call but remaining will spillover
334+ // to the next buffer call
335+ input_bytes. extend ( euro_bytes) ;
336+ input_bytes. push ( b'\n' ) ;
337+ input_bytes. extend ( vec ! [ b'b' ; 8192 ] ) ;
338+ let reader = Cursor :: new ( input_bytes) ;
339+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
340+
341+ assert ! ( result. is_ok( ) ) ;
342+ assert_eq ! ( lines. len( ) , 2 ) ;
343+ assert_eq ! ( lines[ 0 ] , format!( "{}€" , "a" . repeat( 8191 ) ) ) ;
344+ assert_eq ! ( lines[ 1 ] , format!( "{}" , "b" . repeat( 8192 ) ) ) ;
345+ }
346+
347+ #[ tokio:: test]
348+ async fn test_truncation_with_utf8_errors ( ) {
349+ // Test: UTF-8 errors interacting with line length limits
350+ let mut input_bytes = Vec :: new ( ) ;
351+
352+ // Fill near max capacity, then add invalid bytes
353+ input_bytes. extend ( vec ! [ b'a' ; MAX_BYTE_SIZE_LOG_LINE - 1 ] ) ;
354+ input_bytes. push ( 0xFF ) ; // Invalid byte at the boundary of the limit
355+ input_bytes. extend ( vec ! [ b'b' ; 100 ] ) ; // Exceed limit, so skipped
356+ input_bytes. push ( b'\n' ) ;
357+ input_bytes. extend ( vec ! [ b'c' ; 100 ] ) ; // new string after newline
358+ input_bytes. push ( b'\n' ) ;
359+ input_bytes. push ( 0xFF ) ; // Invalid byte at the start, expect <INVALID_UTF8>
360+
361+ let reader = Cursor :: new ( input_bytes) ;
362+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
363+
364+ assert ! ( result. is_ok( ) ) ;
365+ assert_eq ! ( lines. len( ) , 3 ) ;
366+ assert_eq ! (
367+ lines[ 0 ] ,
368+ format!(
369+ "{}{}" ,
370+ "a" . repeat( MAX_BYTE_SIZE_LOG_LINE - 1 ) ,
371+ "�<TRUNCATED>"
372+ )
373+ ) ;
374+ assert_eq ! ( lines[ 1 ] , format!( "{}" , "c" . repeat( 100 ) ) ) ;
375+ assert_eq ! ( lines[ 2 ] , "�" ) ;
376+ }
187377}
0 commit comments