From 0e51ca03bfb8288717de78c92c02f9e40ddcc9bb Mon Sep 17 00:00:00 2001 From: Karan Date: Mon, 1 Dec 2025 15:59:35 +0530 Subject: [PATCH] Add AgentTrafficLog functionality for bulk writing and storage --- .../main/java/com/akto/action/DbAction.java | 20 ++++ .../java/com/akto/hybrid_runtime/Main.java | 39 +++++++ .../java/com/akto/dao/AgentTrafficLogDao.java | 59 ++++++++++ .../java/com/akto/dto/AgentTrafficLog.java | 105 ++++++++++++++++++ .../java/com/akto/data_actor/ClientActor.java | 25 +++++ .../java/com/akto/data_actor/DataActor.java | 2 + .../java/com/akto/data_actor/DbLayer.java | 5 + 7 files changed, 255 insertions(+) create mode 100644 libs/dao/src/main/java/com/akto/dao/AgentTrafficLogDao.java create mode 100644 libs/dao/src/main/java/com/akto/dto/AgentTrafficLog.java diff --git a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java index 597ed65cd5..080f16c5e3 100644 --- a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java +++ b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java @@ -80,6 +80,7 @@ public class DbAction extends ActionSupport { List writesForTestingRunIssues; List writesForOverageInfo; List dependencyNodeList; + List agentTrafficLogs; TestScript testScript; @Setter @Getter McpAuditInfo auditInfo; @@ -1831,6 +1832,17 @@ public String insertMCPAuditDataLog() { return Action.SUCCESS.toUpperCase(); } + public String bulkWriteAgentTrafficLogs() { + try { + DbLayer.bulkWriteAgentTrafficLogs(agentTrafficLogs); + } catch (Exception e) { + e.printStackTrace(); + loggerMaker.errorAndAddToDb(e, "Error bulkWriteAgentTrafficLogs " + e.toString()); + return Action.ERROR.toUpperCase(); + } + return Action.SUCCESS.toUpperCase(); + } + public List getCustomDataTypes() { return customDataTypes; } @@ -2693,6 +2705,14 @@ public void setDependencyNodeList(List dependencyNodeList) { this.dependencyNodeList = dependencyNodeList; } + public List getAgentTrafficLogs() { + return agentTrafficLogs; + } + + public void setAgentTrafficLogs(List agentTrafficLogs) { + this.agentTrafficLogs = agentTrafficLogs; + } + public int getStartTimestamp() { return startTimestamp; } diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java index 19a5537237..ceadeed40a 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java @@ -585,6 +585,13 @@ public static void handleResponseParams(Map> re } } + // Save raw agent traffic logs to MongoDB for future training + try { + saveAgentTrafficLogs(accWiseResponse); + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "Error saving agent traffic logs: " + e.getMessage()); + } + accWiseResponse = filterBasedOnHeaders(accWiseResponse, accountInfo.accountSettings); loggerMaker.infoAndAddToDb("Initiating sync function for account: " + accountId); parser.syncFunction(accWiseResponse, syncImmediately, fetchAllSTI, accountInfo.accountSettings); @@ -846,10 +853,42 @@ private static void sendToProtobufKafka(HttpResponseParams httpResponseParams) { } } + /** + * Save raw agent traffic logs to MongoDB for future training and analysis. + * Stores unprocessed request/response data with collection context. + */ + private static void saveAgentTrafficLogs(List responseParamsList) { + if (responseParamsList == null || responseParamsList.isEmpty()) { + return; + } + + try { + List trafficLogs = new ArrayList<>(); + + for (HttpResponseParams params : responseParamsList) { + // Convert HttpResponseParams to AgentTrafficLog + // Note: isBlocked and threatInfo can be enhanced later when threat detection is integrated + AgentTrafficLog log = AgentTrafficLog.fromHttpResponseParams(params, null, null); + trafficLogs.add(log); + } + + // Use DataActor to save to MongoDB via cyborg + if (!trafficLogs.isEmpty()) { + List writesForAgentTrafficLogs = new ArrayList<>(trafficLogs); + dataActor.bulkWriteAgentTrafficLogs(writesForAgentTrafficLogs); + loggerMaker.infoAndAddToDb("Saved " + trafficLogs.size() + " agent traffic logs to MongoDB"); + } + + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "Error in saveAgentTrafficLogs: " + e.getMessage()); + } + } + public static void createIndices() { SingleTypeInfoDao.instance.createIndicesIfAbsent(); SensitiveSampleDataDao.instance.createIndicesIfAbsent(); SampleDataDao.instance.createIndicesIfAbsent(); + AgentTrafficLogDao.instance.createIndicesIfAbsent(); } diff --git a/libs/dao/src/main/java/com/akto/dao/AgentTrafficLogDao.java b/libs/dao/src/main/java/com/akto/dao/AgentTrafficLogDao.java new file mode 100644 index 0000000000..ea209a5012 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dao/AgentTrafficLogDao.java @@ -0,0 +1,59 @@ +package com.akto.dao; + +import com.akto.dto.AgentTrafficLog; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Indexes; + +import java.util.concurrent.TimeUnit; + +/** + * DAO for agent_traffic_logs collection. + * Stores raw agent traffic data for future training and analysis. + */ +public class AgentTrafficLogDao extends AccountsContextDao { + + public static final AgentTrafficLogDao instance = new AgentTrafficLogDao(); + + @Override + public String getCollName() { + return "agent_traffic_logs"; + } + + @Override + public Class getClassT() { + return AgentTrafficLog.class; + } + + /** + * Create indexes for the collection: + * 1. TTL index on expiresAt - auto-deletes documents after expiry + * 2. Compound index on apiCollectionId + timestamp for efficient queries + * 3. Index on accountId for account-level queries + * 4. Index on isBlocked for filtering blocked/allowed traffic + */ + public void createIndicesIfAbsent() { + // TTL index - MongoDB will automatically delete documents after expiresAt + IndexOptions ttlOptions = new IndexOptions() + .name("expiresAt_ttl_idx") + .expireAfter(0L, TimeUnit.SECONDS); // Delete immediately after expiresAt date + MCollection.createIndexIfAbsent( + getDBName(), + getCollName(), + Indexes.ascending("expiresAt"), + ttlOptions + ); + + // Compound index for collection-based training queries + String[] collectionTimestampFields = {"apiCollectionId", "timestamp"}; + MCollection.createIndexIfAbsent(getDBName(), getCollName(), collectionTimestampFields, false); + + // Index on accountId for account isolation + String[] accountIdField = {"accountId"}; + MCollection.createIndexIfAbsent(getDBName(), getCollName(), accountIdField, false); + + // Compound index for collection + blocked status (useful for training) + String[] collectionBlockedFields = {"apiCollectionId", "isBlocked", "timestamp"}; + MCollection.createIndexIfAbsent(getDBName(), getCollName(), collectionBlockedFields, false); + } +} + diff --git a/libs/dao/src/main/java/com/akto/dto/AgentTrafficLog.java b/libs/dao/src/main/java/com/akto/dto/AgentTrafficLog.java new file mode 100644 index 0000000000..175c07c5f5 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/AgentTrafficLog.java @@ -0,0 +1,105 @@ +package com.akto.dto; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * DTO for storing raw agent traffic logs for future training and analysis. + * This stores unprocessed request/response data with collection context. + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class AgentTrafficLog { + + // Collection and account context + private int apiCollectionId; + private String accountId; + + // Core request/response data (raw/unprocessed) + private String requestPayload; + private String responsePayload; + private String method; + private String url; + private int statusCode; + + // Headers (for context) + private Map> requestHeaders; + private Map> responseHeaders; + + // Metadata + private int timestamp; + private String sourceIP; + private String destIP; + private String source; // HAR, PCAP, MIRRORING, SDK, OTHER, POSTMAN + + // Threat/Guardrail information + private Boolean isBlocked; + private String threatInfo; // JSON string with threat details + + // MCP context + private List parentMcpToolNames; + + // TTL for auto-cleanup (MongoDB will auto-delete based on this) + private Date expiresAt; + + /** + * Create AgentTrafficLog from HttpResponseParams + */ + public static AgentTrafficLog fromHttpResponseParams(HttpResponseParams params, Boolean isBlocked, String threatInfo) { + AgentTrafficLog log = new AgentTrafficLog(); + + // Collection and account + if (params.getRequestParams() != null) { + log.setApiCollectionId(params.getRequestParams().getApiCollectionId()); + } + log.setAccountId(params.getAccountId()); + + // Request data + if (params.getRequestParams() != null) { + log.setRequestPayload(params.getRequestParams().getPayload()); + log.setMethod(params.getRequestParams().getMethod()); + log.setUrl(params.getRequestParams().getURL()); + log.setRequestHeaders(params.getRequestParams().getHeaders()); + } + + // Response data + log.setResponsePayload(params.getPayload()); + log.setStatusCode(params.getStatusCode()); + log.setResponseHeaders(params.getHeaders()); + + // Metadata + log.setTimestamp(params.getTime()); + log.setSourceIP(params.getSourceIP()); + log.setDestIP(params.getDestIP()); + log.setSource(params.getSource() != null ? params.getSource().name() : null); + + // Threat info + log.setIsBlocked(isBlocked); + log.setThreatInfo(threatInfo); + + // MCP context + log.setParentMcpToolNames(params.getParentMcpToolNames()); + + // Set TTL - 90 days from now + long expiryMillis = System.currentTimeMillis() + (2L * 24 * 60 * 60 * 1000); + log.setExpiresAt(new Date(expiryMillis)); + + return log; + } + + /** + * Create AgentTrafficLog from HttpResponseParams with default values + */ + public static AgentTrafficLog fromHttpResponseParams(HttpResponseParams params) { + return fromHttpResponseParams(params, null, null); + } +} + diff --git a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java index 6c4159fa86..0c08cc384b 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java @@ -4190,4 +4190,29 @@ public void storeConversationResults(List conversationR } } + @Override + public void bulkWriteAgentTrafficLogs(List writesForAgentTrafficLogs) { + // Convert List to List + List agentTrafficLogs = new ArrayList<>(); + for (Object obj : writesForAgentTrafficLogs) { + agentTrafficLogs.add((AgentTrafficLog) obj); + } + + BasicDBObject obj = new BasicDBObject(); + obj.put("agentTrafficLogs", agentTrafficLogs); + + String objString = gson.toJson(obj); + + Map> headers = buildHeaders(); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/bulkWriteAgentTrafficLogs", "", "POST", objString, headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null); + if (response.getStatusCode() != 200) { + loggerMaker.errorAndAddToDb("non 2xx response in bulkWriteAgentTrafficLogs", LoggerMaker.LogDb.RUNTIME); + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in bulkWriteAgentTrafficLogs" + e, LoggerMaker.LogDb.RUNTIME); + } + } + } diff --git a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java index b15a62f49e..04c612c0a3 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java @@ -316,4 +316,6 @@ public abstract class DataActor { public abstract void storeMcpReconResultsBatch(List serverDataList); public abstract void storeConversationResults(List conversationResults); + + public abstract void bulkWriteAgentTrafficLogs(List writesForAgentTrafficLogs); } diff --git a/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java b/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java index 6a293816c7..9fbbb7ed23 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java @@ -36,6 +36,7 @@ import com.akto.dao.test_editor.YamlTemplateDao; import com.akto.dao.testing.AccessMatrixTaskInfosDao; import com.akto.dao.testing.AccessMatrixUrlToRolesDao; +import com.akto.dao.AgentTrafficLogDao; import com.akto.dao.testing.AgentConversationResultDao; import com.akto.dao.testing.EndpointLogicalGroupDao; import com.akto.dao.testing.LoginFlowStepsDao; @@ -1227,4 +1228,8 @@ public static void storeMcpReconResultsBatch(List serverDataList public static void storeConversationResults(List conversationResults) { AgentConversationResultDao.instance.insertMany(conversationResults); } + + public static void bulkWriteAgentTrafficLogs(List agentTrafficLogs) { + AgentTrafficLogDao.instance.insertMany(agentTrafficLogs); + } }