Skip to content

Commit 7612a09

Browse files
cosmo0920edsiper
authored andcommitted
in_tail: Implement long ling truncation
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent aea80e6 commit 7612a09

File tree

4 files changed

+143
-9
lines changed

4 files changed

+143
-9
lines changed

plugins/in_tail/tail.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,12 @@ static struct flb_config_map config_map[] = {
719719
0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines),
720720
"Allows to skip empty lines."
721721
},
722+
723+
{
724+
FLB_CONFIG_MAP_BOOL, "truncate_long_lines", "false",
725+
0, FLB_TRUE, offsetof(struct flb_tail_config, truncate_long_lines),
726+
"Truncate overlong lines after input encoding to UTF-8"
727+
},
722728
#ifdef __linux__
723729
{
724730
FLB_CONFIG_MAP_BOOL, "file_cache_advise", "true",

plugins/in_tail/tail_config.c

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
170170
if (sec == 0 && nsec == 0) {
171171
flb_plg_error(ctx->ins, "invalid 'refresh_interval' config "
172172
"value (%s)", tmp);
173-
flb_free(ctx);
173+
flb_tail_config_destroy(ctx);
174174
return NULL;
175175
}
176176

@@ -192,7 +192,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
192192
/* Config: seconds interval to monitor file after rotation */
193193
if (ctx->rotate_wait <= 0) {
194194
flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value");
195-
flb_free(ctx);
195+
flb_tail_config_destroy(ctx);
196196
return NULL;
197197
}
198198

@@ -215,7 +215,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
215215
}
216216
else {
217217
flb_plg_error(ctx->ins, "invalid encoding 'unicode.encoding' value");
218-
flb_free(ctx);
218+
flb_tail_config_destroy(ctx);
219219
return NULL;
220220
}
221221
}
@@ -230,11 +230,20 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
230230
}
231231
else {
232232
flb_plg_error(ctx->ins, "invalid encoding 'generic.encoding' value %s", tmp);
233-
flb_free(ctx);
233+
flb_tail_config_destroy(ctx);
234234
return NULL;
235235
}
236236
}
237237

238+
#ifdef FLB_HAVE_UNICODE_ENCODER
239+
if (ctx->preferred_input_encoding != FLB_UNICODE_ENCODING_UNSPECIFIED &&
240+
ctx->generic_input_encoding_type != FLB_GENERIC_UNSPECIFIED) {
241+
flb_plg_error(ctx->ins,
242+
"'unicode.encoding' and 'generic.encoding' cannot be specified at the same time");
243+
flb_tail_config_destroy(ctx);
244+
return NULL;
245+
}
246+
#endif
238247
#ifdef FLB_HAVE_PARSER
239248
/* Config: multi-line support */
240249
if (ctx->multiline == FLB_TRUE) {
@@ -258,7 +267,7 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
258267
/* Validate buffer limit */
259268
if (ctx->buf_chunk_size > ctx->buf_max_size) {
260269
flb_plg_error(ctx->ins, "buffer_max_size must be >= buffer_chunk");
261-
flb_free(ctx);
270+
flb_tail_config_destroy(ctx);
262271
return NULL;
263272
}
264273

@@ -485,6 +494,13 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
485494
"multiline_truncated_total",
486495
"Total number of truncated occurences for multilines",
487496
1, (char *[]) {"name"});
497+
ctx->cmt_long_line_truncated = \
498+
cmt_counter_create(ins->cmt,
499+
"fluentbit", "input",
500+
"long_line_truncated_total",
501+
"Total number of truncated occurences for long lines",
502+
1, (char *[]) {"name"});
503+
488504
/* OLD metrics */
489505
flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
490506
"files_opened", ctx->ins->metrics);
@@ -494,6 +510,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
494510
"files_rotated", ctx->ins->metrics);
495511
flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED,
496512
"multiline_truncated", ctx->ins->metrics);
513+
flb_metrics_add(FLB_TAIL_METRIC_L_TRUNCATED,
514+
"long_line_truncated", ctx->ins->metrics);
497515
#endif
498516

499517
return ctx;

plugins/in_tail/tail_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
4343
#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
4444
#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */
45+
#define FLB_TAIL_METRIC_L_TRUNCATED 104 /* number of truncated occurrences of long lines */
4546
#endif
4647

4748
struct flb_tail_config {
@@ -54,6 +55,7 @@ struct flb_tail_config {
5455
/* Buffer Config */
5556
size_t buf_chunk_size; /* allocation chunks */
5657
size_t buf_max_size; /* max size of a buffer */
58+
int truncate_long_lines; /* truncate long lines after re-encode */
5759

5860
/* Static files processor */
5961
size_t static_batch_size;
@@ -169,6 +171,7 @@ struct flb_tail_config {
169171
struct cmt_counter *cmt_files_closed;
170172
struct cmt_counter *cmt_files_rotated;
171173
struct cmt_counter *cmt_multiline_truncated;
174+
struct cmt_counter *cmt_long_line_truncated;
172175

173176
/* Hash: hash tables for quick acess to registered files */
174177
struct flb_hash_table *static_hash;

plugins/in_tail/tail_file.c

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,24 @@ static FLB_INLINE const char *flb_skip_leading_zeros_simd(const char *data, cons
457457
return data;
458458
}
459459

460+
/* Return a UTF-8 safe cut position <= max */
461+
static size_t utf8_safe_truncate_pos(const char *s, size_t len, size_t max)
462+
{
463+
size_t cut = 0;
464+
465+
cut = (len <= max) ? len : max;
466+
if (cut == len) {
467+
return cut;
468+
}
469+
470+
/* backtrack over continuation bytes 10xxxxxx */
471+
while (cut > 0 && ((unsigned char)s[cut] & 0xC0) == 0x80) {
472+
cut--;
473+
}
474+
475+
return cut;
476+
}
477+
460478
static int process_content(struct flb_tail_file *file, size_t *bytes)
461479
{
462480
size_t len;
@@ -481,6 +499,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
481499
#ifdef FLB_HAVE_UNICODE_ENCODER
482500
size_t decoded_len;
483501
#endif
502+
size_t cut = 0;
503+
size_t eff_max = 0;
504+
size_t dec_len = 0;
505+
size_t window = 0;
506+
int truncation_happened = FLB_FALSE;
507+
size_t bytes_override = 0;
508+
void *nl = NULL;
484509
#ifdef FLB_HAVE_METRICS
485510
uint64_t ts;
486511
char *name;
@@ -530,7 +555,8 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
530555
end - data);
531556
if (ret > 0) {
532557
data = decoded;
533-
end = data + strlen(decoded);
558+
/* Generic encoding conversion returns decoded length precisely with ret. */
559+
end = data + (size_t) ret;
534560
}
535561
else {
536562
flb_plg_error(ctx->ins, "encoding failed '%.*s' with status %d", end - data, data, ret);
@@ -542,6 +568,58 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
542568
data = (char *)flb_skip_leading_zeros_simd(data, end, &processed_bytes);
543569
}
544570

571+
if (ctx->truncate_long_lines == FLB_TRUE &&
572+
file->buf_size >= ctx->buf_max_size) {
573+
/* Use buf_max_size as the truncation threshold */
574+
if (ctx->buf_max_size > 0) {
575+
eff_max = ctx->buf_max_size - 1;
576+
}
577+
else {
578+
eff_max = 0;
579+
}
580+
dec_len = (size_t)(end - data);
581+
window = ctx->buf_max_size + 1;
582+
if (window > dec_len) {
583+
window = dec_len;
584+
}
585+
586+
nl = memchr(data, '\n', window);
587+
if (nl == NULL && eff_max > 0 && dec_len >= eff_max) {
588+
if (file->skip_next == FLB_TRUE) {
589+
bytes_override = (original_len > 0) ? original_len : file->buf_len;
590+
goto truncation_end;
591+
}
592+
cut = utf8_safe_truncate_pos(data, dec_len, eff_max);
593+
594+
if (cut > 0) {
595+
if (ctx->multiline == FLB_TRUE) {
596+
flb_tail_mult_flush(file, ctx);
597+
}
598+
599+
flb_tail_file_pack_line(NULL, data, cut, file, processed_bytes);
600+
file->skip_next = FLB_TRUE;
601+
602+
#ifdef FLB_HAVE_METRICS
603+
cmt_counter_inc(ctx->cmt_long_line_truncated,
604+
cfl_time_now(), 1,
605+
(char*[]){ (char*) flb_input_name(ctx->ins) });
606+
/* Old api */
607+
flb_metrics_sum(FLB_TAIL_METRIC_L_TRUNCATED, 1, ctx->ins->metrics);
608+
#endif
609+
if (original_len > 0) {
610+
bytes_override = original_len;
611+
}
612+
else {
613+
bytes_override = file->buf_len;
614+
}
615+
truncation_happened = FLB_TRUE;
616+
617+
lines++;
618+
goto truncation_end;
619+
}
620+
}
621+
}
622+
545623
while (data < end && (p = memchr(data, '\n', end - data))) {
546624
len = (p - data);
547625
crlf = 0;
@@ -700,6 +778,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
700778
file->last_processed_bytes = processed_bytes;
701779
}
702780

781+
truncation_end:
703782
if (decoded) {
704783
flb_free(decoded);
705784
decoded = NULL;
@@ -709,9 +788,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
709788

710789
if (lines > 0) {
711790
/* Append buffer content to a chunk */
712-
if (original_len > 0) {
791+
if (truncation_happened) {
792+
*bytes = bytes_override;
793+
}
794+
else if (original_len > 0) {
713795
*bytes = original_len;
714-
} else {
796+
}
797+
else {
715798
*bytes = processed_bytes;
716799
}
717800

@@ -1506,12 +1589,13 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
15061589
size_t file_buffer_capacity;
15071590
size_t stream_data_length;
15081591
ssize_t raw_data_length;
1509-
size_t processed_bytes;
1592+
size_t processed_bytes = 0;
15101593
uint8_t *read_buffer;
15111594
size_t read_size;
15121595
size_t size;
15131596
char *tmp;
15141597
int ret;
1598+
int lines;
15151599
struct flb_tail_config *ctx;
15161600

15171601
/* Check if we the engine issued a pause */
@@ -1529,6 +1613,29 @@ int flb_tail_file_chunk(struct flb_tail_file *file)
15291613
* If there is no more room for more data, try to increase the
15301614
* buffer under the limit of buffer_max_size.
15311615
*/
1616+
if (ctx->truncate_long_lines == FLB_TRUE) {
1617+
lines = process_content(file, &processed_bytes);
1618+
if (lines < 0) {
1619+
flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR",
1620+
file->inode, file->name);
1621+
return FLB_TAIL_ERROR;
1622+
}
1623+
1624+
if (lines > 0) {
1625+
file->stream_offset += processed_bytes;
1626+
file->last_processed_bytes = 0;
1627+
consume_bytes(file->buf_data, processed_bytes, file->buf_len);
1628+
file->buf_len -= processed_bytes;
1629+
file->buf_data[file->buf_len] = '\0';
1630+
1631+
#ifdef FLB_HAVE_SQLDB
1632+
if (file->config->db) {
1633+
flb_tail_db_file_offset(file, file->config);
1634+
}
1635+
#endif
1636+
return adjust_counters(ctx, file);
1637+
}
1638+
}
15321639
if (file->buf_size >= ctx->buf_max_size) {
15331640
if (ctx->skip_long_lines == FLB_FALSE) {
15341641
flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, "

0 commit comments

Comments
 (0)