-
Notifications
You must be signed in to change notification settings - Fork 266
Add AgentTrafficLog functionality for bulk writing and storage #3670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/mini-runtime-container-release
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -585,6 +585,13 @@ public static void handleResponseParams(Map<String, List<HttpResponseParams>> re | |
| } | ||
| } | ||
|
|
||
| // Save raw agent traffic logs to MongoDB for future training | ||
| try { | ||
| saveAgentTrafficLogs(accWiseResponse); | ||
| } catch (Exception e) { | ||
| loggerMaker.errorAndAddToDb(e, "Error saving agent traffic logs: " + e.getMessage()); | ||
| } | ||
|
|
||
| accWiseResponse = filterBasedOnHeaders(accWiseResponse, accountInfo.accountSettings); | ||
| loggerMaker.infoAndAddToDb("Initiating sync function for account: " + accountId); | ||
| parser.syncFunction(accWiseResponse, syncImmediately, fetchAllSTI, accountInfo.accountSettings); | ||
|
|
@@ -846,10 +853,42 @@ private static void sendToProtobufKafka(HttpResponseParams httpResponseParams) { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Save raw agent traffic logs to MongoDB for future training and analysis. | ||
| * Stores unprocessed request/response data with collection context. | ||
| */ | ||
| private static void saveAgentTrafficLogs(List<HttpResponseParams> responseParamsList) { | ||
| if (responseParamsList == null || responseParamsList.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| List<AgentTrafficLog> trafficLogs = new ArrayList<>(); | ||
|
|
||
| for (HttpResponseParams params : responseParamsList) { | ||
| // Convert HttpResponseParams to AgentTrafficLog | ||
| // Note: isBlocked and threatInfo can be enhanced later when threat detection is integrated | ||
| AgentTrafficLog log = AgentTrafficLog.fromHttpResponseParams(params, null, null); | ||
| trafficLogs.add(log); | ||
| } | ||
|
|
||
| // Use DataActor to save to MongoDB via cyborg | ||
| if (!trafficLogs.isEmpty()) { | ||
| List<Object> writesForAgentTrafficLogs = new ArrayList<>(trafficLogs); | ||
| dataActor.bulkWriteAgentTrafficLogs(writesForAgentTrafficLogs); | ||
| loggerMaker.infoAndAddToDb("Saved " + trafficLogs.size() + " agent traffic logs to MongoDB"); | ||
| } | ||
|
|
||
| } catch (Exception e) { | ||
| loggerMaker.errorAndAddToDb(e, "Error in saveAgentTrafficLogs: " + e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| public static void createIndices() { | ||
| SingleTypeInfoDao.instance.createIndicesIfAbsent(); | ||
| SensitiveSampleDataDao.instance.createIndicesIfAbsent(); | ||
| SampleDataDao.instance.createIndicesIfAbsent(); | ||
| AgentTrafficLogDao.instance.createIndicesIfAbsent(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should add it to |
||
| } | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| package com.akto.dao; | ||
|
|
||
| import com.akto.dto.AgentTrafficLog; | ||
| import com.mongodb.client.model.IndexOptions; | ||
| import com.mongodb.client.model.Indexes; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| /** | ||
| * DAO for agent_traffic_logs collection. | ||
| * Stores raw agent traffic data for future training and analysis. | ||
| */ | ||
| public class AgentTrafficLogDao extends AccountsContextDao<AgentTrafficLog> { | ||
|
|
||
| public static final AgentTrafficLogDao instance = new AgentTrafficLogDao(); | ||
|
|
||
| @Override | ||
| public String getCollName() { | ||
| return "agent_traffic_logs"; | ||
| } | ||
|
|
||
| @Override | ||
| public Class<AgentTrafficLog> getClassT() { | ||
| return AgentTrafficLog.class; | ||
| } | ||
|
|
||
| /** | ||
| * Create indexes for the collection: | ||
| * 1. TTL index on expiresAt - auto-deletes documents after expiry | ||
| * 2. Compound index on apiCollectionId + timestamp for efficient queries | ||
| * 3. Index on accountId for account-level queries | ||
| * 4. Index on isBlocked for filtering blocked/allowed traffic | ||
| */ | ||
| public void createIndicesIfAbsent() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sync with cyborg |
||
| // TTL index - MongoDB will automatically delete documents after expiresAt | ||
| IndexOptions ttlOptions = new IndexOptions() | ||
| .name("expiresAt_ttl_idx") | ||
| .expireAfter(0L, TimeUnit.SECONDS); // Delete immediately after expiresAt date | ||
| MCollection.createIndexIfAbsent( | ||
| getDBName(), | ||
| getCollName(), | ||
| Indexes.ascending("expiresAt"), | ||
| ttlOptions | ||
| ); | ||
|
|
||
| // Compound index for collection-based training queries | ||
| String[] collectionTimestampFields = {"apiCollectionId", "timestamp"}; | ||
| MCollection.createIndexIfAbsent(getDBName(), getCollName(), collectionTimestampFields, false); | ||
|
|
||
| // Index on accountId for account isolation | ||
| String[] accountIdField = {"accountId"}; | ||
| MCollection.createIndexIfAbsent(getDBName(), getCollName(), accountIdField, false); | ||
|
|
||
| // Compound index for collection + blocked status (useful for training) | ||
| String[] collectionBlockedFields = {"apiCollectionId", "isBlocked", "timestamp"}; | ||
| MCollection.createIndexIfAbsent(getDBName(), getCollName(), collectionBlockedFields, false); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| package com.akto.dto; | ||
|
|
||
| import lombok.AllArgsConstructor; | ||
| import lombok.Getter; | ||
| import lombok.NoArgsConstructor; | ||
| import lombok.Setter; | ||
|
|
||
| import java.util.Date; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * DTO for storing raw agent traffic logs for future training and analysis. | ||
| * This stores unprocessed request/response data with collection context. | ||
| */ | ||
| @Getter | ||
| @Setter | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class AgentTrafficLog { | ||
|
|
||
| // Collection and account context | ||
| private int apiCollectionId; | ||
| private String accountId; | ||
|
|
||
| // Core request/response data (raw/unprocessed) | ||
| private String requestPayload; | ||
| private String responsePayload; | ||
| private String method; | ||
| private String url; | ||
| private int statusCode; | ||
|
|
||
| // Headers (for context) | ||
| private Map<String, List<String>> requestHeaders; | ||
| private Map<String, List<String>> responseHeaders; | ||
|
|
||
| // Metadata | ||
| private int timestamp; | ||
| private String sourceIP; | ||
| private String destIP; | ||
| private String source; // HAR, PCAP, MIRRORING, SDK, OTHER, POSTMAN | ||
|
|
||
| // Threat/Guardrail information | ||
| private Boolean isBlocked; | ||
| private String threatInfo; // JSON string with threat details | ||
|
|
||
| // MCP context | ||
| private List<String> parentMcpToolNames; | ||
|
|
||
| // TTL for auto-cleanup (MongoDB will auto-delete based on this) | ||
| private Date expiresAt; | ||
|
|
||
| /** | ||
| * Create AgentTrafficLog from HttpResponseParams | ||
| */ | ||
| public static AgentTrafficLog fromHttpResponseParams(HttpResponseParams params, Boolean isBlocked, String threatInfo) { | ||
| AgentTrafficLog log = new AgentTrafficLog(); | ||
|
|
||
| // Collection and account | ||
| if (params.getRequestParams() != null) { | ||
| log.setApiCollectionId(params.getRequestParams().getApiCollectionId()); | ||
| } | ||
| log.setAccountId(params.getAccountId()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sync with cyborg here as well |
||
|
|
||
| // Request data | ||
| if (params.getRequestParams() != null) { | ||
| log.setRequestPayload(params.getRequestParams().getPayload()); | ||
| log.setMethod(params.getRequestParams().getMethod()); | ||
| log.setUrl(params.getRequestParams().getURL()); | ||
| log.setRequestHeaders(params.getRequestParams().getHeaders()); | ||
| } | ||
|
|
||
| // Response data | ||
| log.setResponsePayload(params.getPayload()); | ||
| log.setStatusCode(params.getStatusCode()); | ||
| log.setResponseHeaders(params.getHeaders()); | ||
|
|
||
| // Metadata | ||
| log.setTimestamp(params.getTime()); | ||
| log.setSourceIP(params.getSourceIP()); | ||
| log.setDestIP(params.getDestIP()); | ||
| log.setSource(params.getSource() != null ? params.getSource().name() : null); | ||
|
|
||
| // Threat info | ||
| log.setIsBlocked(isBlocked); | ||
| log.setThreatInfo(threatInfo); | ||
|
|
||
| // MCP context | ||
| log.setParentMcpToolNames(params.getParentMcpToolNames()); | ||
|
|
||
| // Set TTL - 90 days from now | ||
| long expiryMillis = System.currentTimeMillis() + (2L * 24 * 60 * 60 * 1000); | ||
| log.setExpiresAt(new Date(expiryMillis)); | ||
|
|
||
| return log; | ||
| } | ||
|
|
||
| /** | ||
| * Create AgentTrafficLog from HttpResponseParams with default values | ||
| */ | ||
| public static AgentTrafficLog fromHttpResponseParams(HttpResponseParams params) { | ||
| return fromHttpResponseParams(params, null, null); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4190,4 +4190,29 @@ public void storeConversationResults(List<AgentConversationResult> conversationR | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void bulkWriteAgentTrafficLogs(List<Object> writesForAgentTrafficLogs) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Impl in |
||
| // Convert List<Object> to List<AgentTrafficLog> | ||
| List<AgentTrafficLog> agentTrafficLogs = new ArrayList<>(); | ||
| for (Object obj : writesForAgentTrafficLogs) { | ||
| agentTrafficLogs.add((AgentTrafficLog) obj); | ||
| } | ||
|
|
||
| BasicDBObject obj = new BasicDBObject(); | ||
| obj.put("agentTrafficLogs", agentTrafficLogs); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a safety check, we can define some upper limit of payload size, if it exceeds ingest traffic logs in smaller batches. |
||
|
|
||
| String objString = gson.toJson(obj); | ||
|
|
||
| Map<String, List<String>> headers = buildHeaders(); | ||
| OriginalHttpRequest request = new OriginalHttpRequest(url + "/bulkWriteAgentTrafficLogs", "", "POST", objString, headers, ""); | ||
| try { | ||
| OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null); | ||
| if (response.getStatusCode() != 200) { | ||
| loggerMaker.errorAndAddToDb("non 2xx response in bulkWriteAgentTrafficLogs", LoggerMaker.LogDb.RUNTIME); | ||
| } | ||
| } catch (Exception e) { | ||
| loggerMaker.errorAndAddToDb("error in bulkWriteAgentTrafficLogs" + e, LoggerMaker.LogDb.RUNTIME); | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -316,4 +316,6 @@ public abstract class DataActor { | |
| public abstract void storeMcpReconResultsBatch(List<McpReconResult> serverDataList); | ||
|
|
||
| public abstract void storeConversationResults(List<AgentConversationResult> conversationResults); | ||
|
|
||
| public abstract void bulkWriteAgentTrafficLogs(List<Object> writesForAgentTrafficLogs); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. writesForAgentTrafficLogs -> agentTrafficLogs (multiple places) |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for changes in
DbActionfor mini-runtime. These actions are for cyborg.Rather we need to complete the impl in
DbActor(which extends DataActor)