Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class DbAction extends ActionSupport {
List<BulkUpdates> writesForTestingRunIssues;
List<BulkUpdates> writesForOverageInfo;
List<DependencyNode> dependencyNodeList;
List<AgentTrafficLog> agentTrafficLogs;
TestScript testScript;
@Setter @Getter
McpAuditInfo auditInfo;
Expand Down Expand Up @@ -1831,6 +1832,17 @@ public String insertMCPAuditDataLog() {
return Action.SUCCESS.toUpperCase();
}

public String bulkWriteAgentTrafficLogs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for changes in DbAction for mini-runtime. These actions are for cyborg.

Rather we need to complete the impl in DbActor (which extends DataActor)

try {
DbLayer.bulkWriteAgentTrafficLogs(agentTrafficLogs);
} catch (Exception e) {
e.printStackTrace();
loggerMaker.errorAndAddToDb(e, "Error bulkWriteAgentTrafficLogs " + e.toString());
return Action.ERROR.toUpperCase();
}
return Action.SUCCESS.toUpperCase();
}

public List<CustomDataTypeMapper> getCustomDataTypes() {
return customDataTypes;
}
Expand Down Expand Up @@ -2693,6 +2705,14 @@ public void setDependencyNodeList(List<DependencyNode> dependencyNodeList) {
this.dependencyNodeList = dependencyNodeList;
}

public List<AgentTrafficLog> getAgentTrafficLogs() {
return agentTrafficLogs;
}

public void setAgentTrafficLogs(List<AgentTrafficLog> agentTrafficLogs) {
this.agentTrafficLogs = agentTrafficLogs;
}

public int getStartTimestamp() {
return startTimestamp;
}
Expand Down
39 changes: 39 additions & 0 deletions apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,13 @@ public static void handleResponseParams(Map<String, List<HttpResponseParams>> re
}
}

// Save raw agent traffic logs to MongoDB for future training
try {
saveAgentTrafficLogs(accWiseResponse);
} catch (Exception e) {
loggerMaker.errorAndAddToDb(e, "Error saving agent traffic logs: " + e.getMessage());
}

accWiseResponse = filterBasedOnHeaders(accWiseResponse, accountInfo.accountSettings);
loggerMaker.infoAndAddToDb("Initiating sync function for account: " + accountId);
parser.syncFunction(accWiseResponse, syncImmediately, fetchAllSTI, accountInfo.accountSettings);
Expand Down Expand Up @@ -846,10 +853,42 @@ private static void sendToProtobufKafka(HttpResponseParams httpResponseParams) {
}
}

/**
* Save raw agent traffic logs to MongoDB for future training and analysis.
* Stores unprocessed request/response data with collection context.
*/
private static void saveAgentTrafficLogs(List<HttpResponseParams> responseParamsList) {
if (responseParamsList == null || responseParamsList.isEmpty()) {
return;
}

try {
List<AgentTrafficLog> trafficLogs = new ArrayList<>();

for (HttpResponseParams params : responseParamsList) {
// Convert HttpResponseParams to AgentTrafficLog
// Note: isBlocked and threatInfo can be enhanced later when threat detection is integrated
AgentTrafficLog log = AgentTrafficLog.fromHttpResponseParams(params, null, null);
trafficLogs.add(log);
}

// Use DataActor to save to MongoDB via cyborg
if (!trafficLogs.isEmpty()) {
List<Object> writesForAgentTrafficLogs = new ArrayList<>(trafficLogs);
dataActor.bulkWriteAgentTrafficLogs(writesForAgentTrafficLogs);
loggerMaker.infoAndAddToDb("Saved " + trafficLogs.size() + " agent traffic logs to MongoDB");
}

} catch (Exception e) {
loggerMaker.errorAndAddToDb(e, "Error in saveAgentTrafficLogs: " + e.getMessage());
}
}

public static void createIndices() {
SingleTypeInfoDao.instance.createIndicesIfAbsent();
SensitiveSampleDataDao.instance.createIndicesIfAbsent();
SampleDataDao.instance.createIndicesIfAbsent();
AgentTrafficLogDao.instance.createIndicesIfAbsent();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add it to DaoInit as well

}


Expand Down
59 changes: 59 additions & 0 deletions libs/dao/src/main/java/com/akto/dao/AgentTrafficLogDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.akto.dao;

import com.akto.dto.AgentTrafficLog;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;

import java.util.concurrent.TimeUnit;

/**
* DAO for agent_traffic_logs collection.
* Stores raw agent traffic data for future training and analysis.
*/
public class AgentTrafficLogDao extends AccountsContextDao<AgentTrafficLog> {

public static final AgentTrafficLogDao instance = new AgentTrafficLogDao();

@Override
public String getCollName() {
return "agent_traffic_logs";
}

@Override
public Class<AgentTrafficLog> getClassT() {
return AgentTrafficLog.class;
}

/**
* Create indexes for the collection:
* 1. TTL index on expiresAt - auto-deletes documents after expiry
* 2. Compound index on apiCollectionId + timestamp for efficient queries
* 3. Index on accountId for account-level queries
* 4. Index on isBlocked for filtering blocked/allowed traffic
*/
public void createIndicesIfAbsent() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync with cyborg

// TTL index - MongoDB will automatically delete documents after expiresAt
IndexOptions ttlOptions = new IndexOptions()
.name("expiresAt_ttl_idx")
.expireAfter(0L, TimeUnit.SECONDS); // Delete immediately after expiresAt date
MCollection.createIndexIfAbsent(
getDBName(),
getCollName(),
Indexes.ascending("expiresAt"),
ttlOptions
);

// Compound index for collection-based training queries
String[] collectionTimestampFields = {"apiCollectionId", "timestamp"};
MCollection.createIndexIfAbsent(getDBName(), getCollName(), collectionTimestampFields, false);

// Index on accountId for account isolation
String[] accountIdField = {"accountId"};
MCollection.createIndexIfAbsent(getDBName(), getCollName(), accountIdField, false);

// Compound index for collection + blocked status (useful for training)
String[] collectionBlockedFields = {"apiCollectionId", "isBlocked", "timestamp"};
MCollection.createIndexIfAbsent(getDBName(), getCollName(), collectionBlockedFields, false);
}
}

105 changes: 105 additions & 0 deletions libs/dao/src/main/java/com/akto/dto/AgentTrafficLog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.akto.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Date;
import java.util.List;
import java.util.Map;

/**
* DTO for storing raw agent traffic logs for future training and analysis.
* This stores unprocessed request/response data with collection context.
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class AgentTrafficLog {

// Collection and account context
private int apiCollectionId;
private String accountId;

// Core request/response data (raw/unprocessed)
private String requestPayload;
private String responsePayload;
private String method;
private String url;
private int statusCode;

// Headers (for context)
private Map<String, List<String>> requestHeaders;
private Map<String, List<String>> responseHeaders;

// Metadata
private int timestamp;
private String sourceIP;
private String destIP;
private String source; // HAR, PCAP, MIRRORING, SDK, OTHER, POSTMAN

// Threat/Guardrail information
private Boolean isBlocked;
private String threatInfo; // JSON string with threat details

// MCP context
private List<String> parentMcpToolNames;

// TTL for auto-cleanup (MongoDB will auto-delete based on this)
private Date expiresAt;

/**
* Create AgentTrafficLog from HttpResponseParams
*/
public static AgentTrafficLog fromHttpResponseParams(HttpResponseParams params, Boolean isBlocked, String threatInfo) {
AgentTrafficLog log = new AgentTrafficLog();

// Collection and account
if (params.getRequestParams() != null) {
log.setApiCollectionId(params.getRequestParams().getApiCollectionId());
}
log.setAccountId(params.getAccountId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync with cyborg here as well


// Request data
if (params.getRequestParams() != null) {
log.setRequestPayload(params.getRequestParams().getPayload());
log.setMethod(params.getRequestParams().getMethod());
log.setUrl(params.getRequestParams().getURL());
log.setRequestHeaders(params.getRequestParams().getHeaders());
}

// Response data
log.setResponsePayload(params.getPayload());
log.setStatusCode(params.getStatusCode());
log.setResponseHeaders(params.getHeaders());

// Metadata
log.setTimestamp(params.getTime());
log.setSourceIP(params.getSourceIP());
log.setDestIP(params.getDestIP());
log.setSource(params.getSource() != null ? params.getSource().name() : null);

// Threat info
log.setIsBlocked(isBlocked);
log.setThreatInfo(threatInfo);

// MCP context
log.setParentMcpToolNames(params.getParentMcpToolNames());

// Set TTL - 90 days from now
long expiryMillis = System.currentTimeMillis() + (2L * 24 * 60 * 60 * 1000);
log.setExpiresAt(new Date(expiryMillis));

return log;
}

/**
* Create AgentTrafficLog from HttpResponseParams with default values
*/
public static AgentTrafficLog fromHttpResponseParams(HttpResponseParams params) {
return fromHttpResponseParams(params, null, null);
}
}

25 changes: 25 additions & 0 deletions libs/utils/src/main/java/com/akto/data_actor/ClientActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4190,4 +4190,29 @@ public void storeConversationResults(List<AgentConversationResult> conversationR
}
}

@Override
public void bulkWriteAgentTrafficLogs(List<Object> writesForAgentTrafficLogs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impl in DBActor as well

// Convert List<Object> to List<AgentTrafficLog>
List<AgentTrafficLog> agentTrafficLogs = new ArrayList<>();
for (Object obj : writesForAgentTrafficLogs) {
agentTrafficLogs.add((AgentTrafficLog) obj);
}

BasicDBObject obj = new BasicDBObject();
obj.put("agentTrafficLogs", agentTrafficLogs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a safety check, we can define some upper limit of payload size, if it exceeds ingest traffic logs in smaller batches.


String objString = gson.toJson(obj);

Map<String, List<String>> headers = buildHeaders();
OriginalHttpRequest request = new OriginalHttpRequest(url + "/bulkWriteAgentTrafficLogs", "", "POST", objString, headers, "");
try {
OriginalHttpResponse response = ApiExecutor.sendRequestBackOff(request, true, null, false, null);
if (response.getStatusCode() != 200) {
loggerMaker.errorAndAddToDb("non 2xx response in bulkWriteAgentTrafficLogs", LoggerMaker.LogDb.RUNTIME);
}
} catch (Exception e) {
loggerMaker.errorAndAddToDb("error in bulkWriteAgentTrafficLogs" + e, LoggerMaker.LogDb.RUNTIME);
}
}

}
2 changes: 2 additions & 0 deletions libs/utils/src/main/java/com/akto/data_actor/DataActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,6 @@ public abstract class DataActor {
public abstract void storeMcpReconResultsBatch(List<McpReconResult> serverDataList);

public abstract void storeConversationResults(List<AgentConversationResult> conversationResults);

public abstract void bulkWriteAgentTrafficLogs(List<Object> writesForAgentTrafficLogs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writesForAgentTrafficLogs -> agentTrafficLogs (multiple places)

}
5 changes: 5 additions & 0 deletions libs/utils/src/main/java/com/akto/data_actor/DbLayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.akto.dao.test_editor.YamlTemplateDao;
import com.akto.dao.testing.AccessMatrixTaskInfosDao;
import com.akto.dao.testing.AccessMatrixUrlToRolesDao;
import com.akto.dao.AgentTrafficLogDao;
import com.akto.dao.testing.AgentConversationResultDao;
import com.akto.dao.testing.EndpointLogicalGroupDao;
import com.akto.dao.testing.LoginFlowStepsDao;
Expand Down Expand Up @@ -1227,4 +1228,8 @@ public static void storeMcpReconResultsBatch(List<McpReconResult> serverDataList
public static void storeConversationResults(List<AgentConversationResult> conversationResults) {
AgentConversationResultDao.instance.insertMany(conversationResults);
}

public static void bulkWriteAgentTrafficLogs(List<AgentTrafficLog> agentTrafficLogs) {
AgentTrafficLogDao.instance.insertMany(agentTrafficLogs);
}
}
Loading