From 9e90bf34ac822a70195b3b98015005211a27b2c7 Mon Sep 17 00:00:00 2001 From: Leonardo Albertovich Date: Sun, 9 Nov 2025 15:02:34 +0100 Subject: [PATCH] in_forward: fix connection release on pause memory corruption This change fixes a use after free issue related to connection disposal which caused the event handler to access invalid memory when the memory limits were exceeded during ingestion. In order to overcome this issue we track the plugin instances state and delay the connection cleanup process. Signed-off-by: Leonardo Albertovich --- plugins/in_forward/fw.c | 27 +++++++++++++++++++++++++- plugins/in_forward/fw.h | 8 ++++++++ plugins/in_forward/fw_conn.c | 37 +++++++++++++++++++++++++++++++----- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index 77da340d81e..d33b36fbadc 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -124,28 +124,37 @@ static int fw_unix_create(struct flb_in_fw_config *ctx) static int in_fw_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { + int state_backup; struct flb_connection *connection; struct fw_conn *conn; struct flb_in_fw_config *ctx; ctx = in_context; + state_backup = ctx->state; + ctx->state = FW_INSTANCE_STATE_ACCEPTING_CLIENT; + connection = flb_downstream_conn_get(ctx->downstream); if (connection == NULL) { flb_plg_error(ctx->ins, "could not accept new connection"); + ctx->state = state_backup; return -1; } if (!config->is_ingestion_active) { flb_downstream_conn_release(connection); + ctx->state = state_backup; + return -1; } if(ctx->is_paused) { flb_downstream_conn_release(connection); flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd); + ctx->state = state_backup; + return -1; } @@ -154,9 +163,17 @@ static int in_fw_collect(struct flb_input_instance *ins, conn = fw_conn_add(connection, ctx); if (!conn) { flb_downstream_conn_release(connection); + ctx->state = state_backup; + return -1; } + ctx->state = state_backup; + + if (ctx->state == FW_INSTANCE_STATE_PAUSED) { + fw_conn_del_all(ctx); + } + return 0; } @@ -263,6 +280,7 @@ static int in_fw_init(struct flb_input_instance *ins, return -1; } + ctx->state = FW_INSTANCE_STATE_RUNNING; ctx->coll_fd = -1; ctx->ins = ins; mk_list_init(&ctx->connections); @@ -386,7 +404,10 @@ static void in_fw_pause(void *data, struct flb_config *config) return; } - fw_conn_del_all(ctx); + if (ctx->state == FW_INSTANCE_STATE_RUNNING) { + fw_conn_del_all(ctx); + } + ctx->is_paused = FLB_TRUE; ret = pthread_mutex_unlock(&ctx->conn_mutex); if (ret != 0) { @@ -406,6 +427,8 @@ static void in_fw_pause(void *data, struct flb_config *config) if (config->is_ingestion_active == FLB_FALSE) { fw_conn_del_all(ctx); } + + ctx->state = FW_INSTANCE_STATE_PAUSED; } static void in_fw_resume(void *data, struct flb_config *config) { @@ -427,6 +450,8 @@ static void in_fw_resume(void *data, struct flb_config *config) { flb_plg_error(ctx->ins, "cannot unlock collector mutex"); return; } + + ctx->state = FW_INSTANCE_STATE_RUNNING; } } diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 527b4859dd9..42557ef4ee1 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -25,6 +25,12 @@ #include #include +#define FW_INSTANCE_STATE_RUNNING 0 +#define FW_INSTANCE_STATE_ACCEPTING_CLIENT 1 +#define FW_INSTANCE_STATE_PROCESSING_PACKET 2 +#define FW_INSTANCE_STATE_PAUSED 3 + + enum { FW_HANDSHAKE_HELO = 1, FW_HANDSHAKE_PINGPONG = 2, @@ -76,6 +82,8 @@ struct flb_in_fw_config { pthread_mutex_t conn_mutex; + int state; + /* Plugin is paused */ int is_paused; }; diff --git a/plugins/in_forward/fw_conn.c b/plugins/in_forward/fw_conn.c index ce83f3ce582..580425b915c 100644 --- a/plugins/in_forward/fw_conn.c +++ b/plugins/in_forward/fw_conn.c @@ -28,8 +28,7 @@ #include "fw_prot.h" #include "fw_conn.h" -/* Callback invoked every time an event is triggered for a connection */ -int fw_conn_event(void *data) +static int fw_conn_event_internal(struct flb_connection *connection) { int ret; int bytes; @@ -39,9 +38,6 @@ int fw_conn_event(void *data) struct fw_conn *conn; struct mk_event *event; struct flb_in_fw_config *ctx; - struct flb_connection *connection; - - connection = (struct flb_connection *) data; conn = connection->user_data; @@ -127,6 +123,37 @@ int fw_conn_event(void *data) return 0; } +/* Callback invoked every time an event is triggered for a connection */ +int fw_conn_event(void *data) +{ + struct flb_in_fw_config *ctx; + struct fw_conn *conn; + int result; + struct flb_connection *connection; + int state_backup; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + state_backup = ctx->state; + + ctx->state = FW_INSTANCE_STATE_PROCESSING_PACKET; + + result = fw_conn_event_internal(connection); + + if (ctx->state == FW_INSTANCE_STATE_PROCESSING_PACKET) { + ctx->state = state_backup; + } + else if (ctx->state == FW_INSTANCE_STATE_PAUSED) { + fw_conn_del_all(ctx); + } + + return result; +} + /* Create a new Forward request instance */ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx) {